refactor(session): effectify session processor (#19485)

pull/10292/head^2
Kit Langton 2026-03-28 12:09:47 -04:00 committed by GitHub
parent 2b86b36c8c
commit 860531c275
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 2159 additions and 593 deletions

View File

@ -9,6 +9,8 @@ export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: L
return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromiseExit(service.use(fn), options),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),

View File

@ -14,10 +14,10 @@ import { Agent } from "@/agent/agent"
import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
import { ProviderTransform } from "@/provider/transform"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, ServiceMap } from "effect"
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { isOverflow as overflow } from "./overflow"
export namespace SessionCompaction {
const log = Log.create({ service: "session.compaction" })
@ -31,7 +31,6 @@ export namespace SessionCompaction {
),
}
const COMPACTION_BUFFER = 20_000
export const PRUNE_MINIMUM = 20_000
export const PRUNE_PROTECT = 40_000
const PRUNE_PROTECTED_TOOLS = ["skill"]
@ -64,7 +63,7 @@ export namespace SessionCompaction {
export const layer: Layer.Layer<
Service,
never,
Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service
Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service | SessionProcessor.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
@ -73,26 +72,13 @@ export namespace SessionCompaction {
const session = yield* Session.Service
const agents = yield* Agent.Service
const plugin = yield* Plugin.Service
const processors = yield* SessionProcessor.Service
const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: {
tokens: MessageV2.Assistant["tokens"]
model: Provider.Model
}) {
const cfg = yield* config.get()
if (cfg.compaction?.auto === false) return false
const context = input.model.limit.context
if (context === 0) return false
const count =
input.tokens.total ||
input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
const reserved =
cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
const usable = input.model.limit.input
? input.model.limit.input - reserved
: context - ProviderTransform.maxOutputTokens(input.model)
return count >= usable
return overflow({ cfg: yield* config.get(), tokens: input.tokens, model: input.model })
})
// goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool
@ -181,38 +167,6 @@ export namespace SessionCompaction {
? Provider.getModel(agent.model.providerID, agent.model.modelID)
: Provider.getModel(userMessage.model.providerID, userMessage.model.modelID),
)
const msg = (yield* session.updateMessage({
id: MessageID.ascending(),
role: "assistant",
parentID: input.parentID,
sessionID: input.sessionID,
mode: "compaction",
agent: "compaction",
variant: userMessage.variant,
summary: true,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: model.id,
providerID: model.providerID,
time: {
created: Date.now(),
},
})) as MessageV2.Assistant
const processor = SessionProcessor.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
// Allow plugins to inject context or replace compaction prompt.
const compacting = yield* plugin.trigger(
"experimental.session.compacting",
@ -251,8 +205,47 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const result = yield* Effect.promise(() =>
processor.process({
const msg = (yield* session.updateMessage({
id: MessageID.ascending(),
role: "assistant",
parentID: input.parentID,
sessionID: input.sessionID,
mode: "compaction",
agent: "compaction",
variant: userMessage.variant,
summary: true,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: model.id,
providerID: model.providerID,
time: {
created: Date.now(),
},
})) as MessageV2.Assistant
const processor = yield* processors.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
if (!input.abort.aborted || msg.time.completed) return
msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject()
msg.finish = msg.finish ?? "error"
msg.time.completed = Date.now()
yield* session.updateMessage(msg)
})
const result = yield* processor
.process({
user: userMessage,
agent,
abort: input.abort,
@ -267,8 +260,8 @@ When constructing the summary, try to stick to this template:
},
],
model,
}),
)
})
.pipe(Effect.ensuring(cancel()))
if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({
@ -383,6 +376,7 @@ When constructing the summary, try to stick to this template:
Effect.sync(() =>
layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(SessionProcessor.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(Bus.layer),
@ -391,7 +385,7 @@ When constructing the summary, try to stick to this template:
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer)
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input))
@ -409,7 +403,12 @@ When constructing the summary, try to stick to this template:
auto: boolean
overflow?: boolean
}) {
return runPromise((svc) => svc.process(input))
const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
}
export const create = fn(

View File

@ -1,5 +1,7 @@
import { Provider } from "@/provider/provider"
import { Log } from "@/util/log"
import { Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda"
import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider"
@ -34,6 +36,35 @@ export namespace LLM {
toolChoice?: "auto" | "required" | "none"
}
export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
export interface Interface {
readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/LLM") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
return Service.of({
stream(input) {
return Stream.unwrap(
Effect.promise(() => LLM.stream(input)).pipe(
Effect.map((result) =>
Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe(
Stream.mapEffect((event) => Effect.succeed(event)),
),
),
),
)
},
})
}),
)
export const defaultLayer = layer
export async function stream(input: StreamInput) {
const l = log
.clone()

View File

@ -0,0 +1,22 @@
import type { Config } from "@/config/config"
import type { Provider } from "@/provider/provider"
import { ProviderTransform } from "@/provider/transform"
import type { MessageV2 } from "./message-v2"
const COMPACTION_BUFFER = 20_000
export function isOverflow(input: { cfg: Config.Info; tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
if (input.cfg.compaction?.auto === false) return false
const context = input.model.limit.context
if (context === 0) return false
const count =
input.tokens.total || input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
const reserved =
input.cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
const usable = input.model.limit.input
? input.model.limit.input - reserved
: context - ProviderTransform.maxOutputTokens(input.model)
return count >= usable
}

View File

@ -1,430 +1,554 @@
import { MessageV2 } from "./message-v2"
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
import { makeRuntime } from "@/effect/run-service"
import { Config } from "@/config/config"
import { Permission } from "@/permission"
import { Plugin } from "@/plugin"
import { Snapshot } from "@/snapshot"
import { Log } from "@/util/log"
import { Session } from "."
import { Agent } from "@/agent/agent"
import { Snapshot } from "@/snapshot"
import { SessionSummary } from "./summary"
import { Bus } from "@/bus"
import { LLM } from "./llm"
import { MessageV2 } from "./message-v2"
import { isOverflow } from "./overflow"
import { PartID } from "./schema"
import type { SessionID } from "./schema"
import { SessionRetry } from "./retry"
import { SessionStatus } from "./status"
import { Plugin } from "@/plugin"
import { SessionSummary } from "./summary"
import type { Provider } from "@/provider/provider"
import { LLM } from "./llm"
import { Config } from "@/config/config"
import { SessionCompaction } from "./compaction"
import { Permission } from "@/permission"
import { Question } from "@/question"
import { PartID } from "./schema"
import type { SessionID, MessageID } from "./schema"
export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
export type Info = Awaited<ReturnType<typeof create>>
export type Result = Awaited<ReturnType<Info["process"]>>
export type Result = "compact" | "stop" | "continue"
export function create(input: {
export type Event = LLM.Event
export interface Handle {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly abort: () => Effect.Effect<void>
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}
export interface Info {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly process: (streamInput: LLM.StreamInput) => Promise<Result>
}
type Input = {
assistantMessage: MessageV2.Assistant
sessionID: SessionID
model: Provider.Model
abort: AbortSignal
}) {
const toolcalls: Record<string, MessageV2.ToolPart> = {}
let snapshot: string | undefined
let blocked = false
let attempt = 0
let needsCompaction = false
}
const result = {
get message() {
return input.assistantMessage
},
partFromToolCall(toolCallID: string) {
return toolcalls[toolCallID]
},
async process(streamInput: LLM.StreamInput) {
log.info("process")
needsCompaction = false
const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true
while (true) {
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const stream = await LLM.stream(streamInput)
export interface Interface {
readonly create: (input: Input) => Effect.Effect<Handle>
}
for await (const value of stream.fullStream) {
input.abort.throwIfAborted()
switch (value.type) {
case "start":
await SessionStatus.set(input.sessionID, { type: "busy" })
break
interface ProcessorContext extends Input {
toolcalls: Record<string, MessageV2.ToolPart>
shouldBreak: boolean
snapshot: string | undefined
blocked: boolean
needsCompaction: boolean
currentText: MessageV2.TextPart | undefined
reasoningMap: Record<string, MessageV2.ReasoningPart>
}
case "reasoning-start":
if (value.id in reasoningMap) {
continue
}
const reasoningPart = {
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "reasoning" as const,
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
reasoningMap[value.id] = reasoningPart
await Session.updatePart(reasoningPart)
break
type StreamEvent = Event
case "reasoning-delta":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text += value.text
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePartDelta({
sessionID: part.sessionID,
messageID: part.messageID,
partID: part.id,
field: "text",
delta: value.text,
})
}
break
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionProcessor") {}
case "reasoning-end":
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text = part.text.trimEnd()
export const layer: Layer.Layer<
Service,
never,
| Session.Service
| Config.Service
| Bus.Service
| Snapshot.Service
| Agent.Service
| LLM.Service
| Permission.Service
| Plugin.Service
| SessionStatus.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
const session = yield* Session.Service
const config = yield* Config.Service
const bus = yield* Bus.Service
const snapshot = yield* Snapshot.Service
const agents = yield* Agent.Service
const llm = yield* LLM.Service
const permission = yield* Permission.Service
const plugin = yield* Plugin.Service
const status = yield* SessionStatus.Service
part.time = {
...part.time,
end: Date.now(),
}
if (value.providerMetadata) part.metadata = value.providerMetadata
await Session.updatePart(part)
delete reasoningMap[value.id]
}
break
const create = Effect.fn("SessionProcessor.create")(function* (input: Input) {
const ctx: ProcessorContext = {
assistantMessage: input.assistantMessage,
sessionID: input.sessionID,
model: input.model,
abort: input.abort,
toolcalls: {},
shouldBreak: false,
snapshot: undefined,
blocked: false,
needsCompaction: false,
currentText: undefined,
reasoningMap: {},
}
case "tool-input-start":
const part = await Session.updatePart({
id: toolcalls[value.id]?.id ?? PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "tool",
tool: value.toolName,
callID: value.id,
state: {
status: "pending",
input: {},
raw: "",
},
})
toolcalls[value.id] = part as MessageV2.ToolPart
break
const parse = (e: unknown) =>
MessageV2.fromError(e, {
providerID: input.model.providerID,
aborted: input.abort.aborted,
})
case "tool-input-delta":
break
const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
switch (value.type) {
case "start":
yield* status.set(ctx.sessionID, { type: "busy" })
return
case "tool-input-end":
break
case "tool-call": {
const match = toolcalls[value.toolCallId]
if (match) {
const part = await Session.updatePart({
...match,
tool: value.toolName,
state: {
status: "running",
input: value.input,
time: {
start: Date.now(),
},
},
metadata: value.providerMetadata,
})
toolcalls[value.toolCallId] = part as MessageV2.ToolPart
const parts = await MessageV2.parts(input.assistantMessage.id)
const lastThree = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
lastThree.length === DOOM_LOOP_THRESHOLD &&
lastThree.every(
(p) =>
p.type === "tool" &&
p.tool === value.toolName &&
p.state.status !== "pending" &&
JSON.stringify(p.state.input) === JSON.stringify(value.input),
)
) {
const agent = await Agent.get(input.assistantMessage.agent)
await Permission.ask({
permission: "doom_loop",
patterns: [value.toolName],
sessionID: input.assistantMessage.sessionID,
metadata: {
tool: value.toolName,
input: value.input,
},
always: [value.toolName],
ruleset: agent.permission,
})
}
}
break
}
case "tool-result": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
...match,
state: {
status: "completed",
input: value.input ?? match.state.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: {
start: match.state.time.start,
end: Date.now(),
},
attachments: value.output.attachments,
},
})
delete toolcalls[value.toolCallId]
}
break
}
case "tool-error": {
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
...match,
state: {
status: "error",
input: value.input ?? match.state.input,
error: value.error instanceof Error ? value.error.message : String(value.error),
time: {
start: match.state.time.start,
end: Date.now(),
},
},
})
if (
value.error instanceof Permission.RejectedError ||
value.error instanceof Question.RejectedError
) {
blocked = shouldBreak
}
delete toolcalls[value.toolCallId]
}
break
}
case "error":
throw value.error
case "start-step":
snapshot = await Snapshot.track()
await Session.updatePart({
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
snapshot,
type: "step-start",
})
break
case "finish-step":
const usage = Session.getUsage({
model: input.model,
usage: value.usage,
metadata: value.providerMetadata,
})
input.assistantMessage.finish = value.finishReason
input.assistantMessage.cost += usage.cost
input.assistantMessage.tokens = usage.tokens
await Session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
snapshot: await Snapshot.track(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
await Session.updateMessage(input.assistantMessage)
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
await Session.updatePart({
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
snapshot = undefined
}
SessionSummary.summarize({
sessionID: input.sessionID,
messageID: input.assistantMessage.parentID,
})
if (
!input.assistantMessage.summary &&
(await SessionCompaction.isOverflow({ tokens: usage.tokens, model: input.model }))
) {
needsCompaction = true
}
break
case "text-start":
currentText = {
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "text",
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
await Session.updatePart(currentText)
break
case "text-delta":
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePartDelta({
sessionID: currentText.sessionID,
messageID: currentText.messageID,
partID: currentText.id,
field: "text",
delta: value.text,
})
}
break
case "text-end":
if (currentText) {
currentText.text = currentText.text.trimEnd()
const textOutput = await Plugin.trigger(
"experimental.text.complete",
{
sessionID: input.sessionID,
messageID: input.assistantMessage.id,
partID: currentText.id,
},
{ text: currentText.text },
)
currentText.text = textOutput.text
currentText.time = {
start: Date.now(),
end: Date.now(),
}
if (value.providerMetadata) currentText.metadata = value.providerMetadata
await Session.updatePart(currentText)
}
currentText = undefined
break
case "finish":
break
default:
log.info("unhandled", {
...value,
})
continue
}
if (needsCompaction) break
}
} catch (e: any) {
log.error("process", {
error: e,
stack: JSON.stringify(e.stack),
})
const error = MessageV2.fromError(e, { providerID: input.model.providerID, aborted: input.abort.aborted })
if (MessageV2.ContextOverflowError.isInstance(error)) {
needsCompaction = true
Bus.publish(Session.Event.Error, {
sessionID: input.sessionID,
error,
})
} else {
const retry = SessionRetry.retryable(error)
if (retry !== undefined) {
attempt++
const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined)
await SessionStatus.set(input.sessionID, {
type: "retry",
attempt,
message: retry,
next: Date.now() + delay,
})
await SessionRetry.sleep(delay, input.abort).catch(() => {})
continue
}
input.assistantMessage.error = error
Bus.publish(Session.Event.Error, {
sessionID: input.assistantMessage.sessionID,
error: input.assistantMessage.error,
})
await SessionStatus.set(input.sessionID, { type: "idle" })
}
}
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
if (patch.files.length) {
await Session.updatePart({
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: input.assistantMessage.id,
sessionID: input.sessionID,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "reasoning",
text: "",
time: { start: Date.now() },
metadata: value.providerMetadata,
}
yield* session.updatePart(ctx.reasoningMap[value.id])
return
case "reasoning-delta":
if (!(value.id in ctx.reasoningMap)) return
ctx.reasoningMap[value.id].text += value.text
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePartDelta({
sessionID: ctx.reasoningMap[value.id].sessionID,
messageID: ctx.reasoningMap[value.id].messageID,
partID: ctx.reasoningMap[value.id].id,
field: "text",
delta: value.text,
})
return
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text.trimEnd()
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePart(ctx.reasoningMap[value.id])
delete ctx.reasoningMap[value.id]
return
case "tool-input-start":
ctx.toolcalls[value.id] = (yield* session.updatePart({
id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "tool",
tool: value.toolName,
callID: value.id,
state: { status: "pending", input: {}, raw: "" },
})) as MessageV2.ToolPart
return
case "tool-input-delta":
return
case "tool-input-end":
return
case "tool-call": {
const match = ctx.toolcalls[value.toolCallId]
if (!match) return
ctx.toolcalls[value.toolCallId] = (yield* session.updatePart({
...match,
tool: value.toolName,
state: { status: "running", input: value.input, time: { start: Date.now() } },
metadata: value.providerMetadata,
})) as MessageV2.ToolPart
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
recentParts.length !== DOOM_LOOP_THRESHOLD ||
!recentParts.every(
(part) =>
part.type === "tool" &&
part.tool === value.toolName &&
part.state.status !== "pending" &&
JSON.stringify(part.state.input) === JSON.stringify(value.input),
)
) {
return
}
const agent = yield* agents.get(ctx.assistantMessage.agent)
yield* permission.ask({
permission: "doom_loop",
patterns: [value.toolName],
sessionID: ctx.assistantMessage.sessionID,
metadata: { tool: value.toolName, input: value.input },
always: [value.toolName],
ruleset: agent.permission,
})
return
}
case "tool-result": {
const match = ctx.toolcalls[value.toolCallId]
if (!match || match.state.status !== "running") return
yield* session.updatePart({
...match,
state: {
status: "completed",
input: value.input ?? match.state.input,
output: value.output.output,
metadata: value.output.metadata,
title: value.output.title,
time: { start: match.state.time.start, end: Date.now() },
attachments: value.output.attachments,
},
})
delete ctx.toolcalls[value.toolCallId]
return
}
case "tool-error": {
const match = ctx.toolcalls[value.toolCallId]
if (!match || match.state.status !== "running") return
yield* session.updatePart({
...match,
state: {
status: "error",
input: value.input ?? match.state.input,
error: value.error instanceof Error ? value.error.message : String(value.error),
time: { start: match.state.time.start, end: Date.now() },
},
})
if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) {
ctx.blocked = ctx.shouldBreak
}
delete ctx.toolcalls[value.toolCallId]
return
}
case "error":
throw value.error
case "start-step":
ctx.snapshot = yield* snapshot.track()
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
snapshot: ctx.snapshot,
type: "step-start",
})
return
case "finish-step": {
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage,
metadata: value.providerMetadata,
})
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
snapshot: yield* snapshot.track(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
yield* session.updateMessage(ctx.assistantMessage)
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
ctx.snapshot = undefined
}
yield* Effect.promise(() =>
SessionSummary.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
}),
).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach)
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
) {
ctx.needsCompaction = true
}
return
}
case "text-start":
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "text",
text: "",
time: { start: Date.now() },
metadata: value.providerMetadata,
}
yield* session.updatePart(ctx.currentText)
return
case "text-delta":
if (!ctx.currentText) return
ctx.currentText.text += value.text
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePartDelta({
sessionID: ctx.currentText.sessionID,
messageID: ctx.currentText.messageID,
partID: ctx.currentText.id,
field: "text",
delta: value.text,
})
return
case "text-end":
if (!ctx.currentText) return
ctx.currentText.text = ctx.currentText.text.trimEnd()
ctx.currentText.text = (yield* plugin.trigger(
"experimental.text.complete",
{
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.id,
partID: ctx.currentText.id,
},
{ text: ctx.currentText.text },
)).text
ctx.currentText.time = { start: Date.now(), end: Date.now() }
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePart(ctx.currentText)
ctx.currentText = undefined
return
case "finish":
return
default:
log.info("unhandled", { ...value })
return
}
})
const cleanup = Effect.fn("SessionProcessor.cleanup")(function* () {
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
snapshot = undefined
ctx.snapshot = undefined
}
const p = await MessageV2.parts(input.assistantMessage.id)
for (const part of p) {
if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
await Session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: {
start: Date.now(),
end: Date.now(),
},
},
})
}
if (ctx.currentText) {
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
yield* session.updatePart(ctx.currentText)
ctx.currentText = undefined
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
if (needsCompaction) return "compact"
if (blocked) return "stop"
if (input.assistantMessage.error) return "stop"
for (const part of Object.values(ctx.reasoningMap)) {
const end = Date.now()
yield* session.updatePart({
...part,
time: { start: part.time.start ?? end, end },
})
}
ctx.reasoningMap = {}
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
for (const part of parts) {
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
yield* session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: { start: Date.now(), end: Date.now() },
},
})
}
ctx.assistantMessage.time.completed = Date.now()
yield* session.updateMessage(ctx.assistantMessage)
})
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) })
const error = parse(e)
if (MessageV2.ContextOverflowError.isInstance(error)) {
ctx.needsCompaction = true
yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
return
}
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,
error: ctx.assistantMessage.error,
})
yield* status.set(ctx.sessionID, { type: "idle" })
})
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
log.info("process")
ctx.needsCompaction = false
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
yield* Effect.gen(function* () {
ctx.currentText = undefined
ctx.reasoningMap = {}
const stream = llm.stream(streamInput)
yield* stream.pipe(
Stream.tap((event) =>
Effect.gen(function* () {
input.abort.throwIfAborted()
yield* handleEvent(event)
}),
),
Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain,
)
}).pipe(
Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)),
),
Effect.retry(
SessionRetry.policy({
parse,
set: (info) =>
status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
}),
}),
),
Effect.catchCause((cause) =>
Cause.hasInterruptsOnly(cause)
? halt(new DOMException("Aborted", "AbortError"))
: halt(Cause.squash(cause)),
),
Effect.ensuring(cleanup()),
)
if (input.abort.aborted && !ctx.assistantMessage.error) {
yield* abort()
}
if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop"
return "continue"
})
const abort = Effect.fn("SessionProcessor.abort")(() =>
Effect.gen(function* () {
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))
}
if (!ctx.assistantMessage.time.completed) {
yield* cleanup()
return
}
yield* session.updateMessage(ctx.assistantMessage)
}),
)
return {
get message() {
return ctx.assistantMessage
},
partFromToolCall(toolCallID: string) {
return ctx.toolcalls[toolCallID]
},
abort,
process,
} satisfies Handle
})
return Service.of({ create })
}),
)
export const defaultLayer = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(Snapshot.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(LLM.defaultLayer),
Layer.provide(Permission.layer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(SessionStatus.layer.pipe(Layer.provide(Bus.layer))),
Layer.provide(Bus.layer),
Layer.provide(Config.defaultLayer),
),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function create(input: Input): Promise<Info> {
const hit = await runPromise((svc) => svc.create(input))
return {
get message() {
return hit.message
},
partFromToolCall(toolCallID: string) {
return hit.partFromToolCall(toolCallID)
},
async process(streamInput: LLM.StreamInput) {
const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) {
await Effect.runPromise(hit.abort())
return "stop"
}
throw Cause.squash(exit.cause)
}
return exit.value
},
}
return result
}
}

View File

@ -594,7 +594,7 @@ export namespace SessionPrompt {
session,
})
const processor = SessionProcessor.create({
const processor = await SessionProcessor.create({
assistantMessage: (await Session.updateMessage({
id: MessageID.ascending(),
parentID: lastUser.id,

View File

@ -1,28 +1,18 @@
import type { NamedError } from "@opencode-ai/util/error"
import { Cause, Clock, Duration, Effect, Schedule } from "effect"
import { MessageV2 } from "./message-v2"
import { iife } from "@/util/iife"
export namespace SessionRetry {
export type Err = ReturnType<NamedError["toObject"]>
export const RETRY_INITIAL_DELAY = 2000
export const RETRY_BACKOFF_FACTOR = 2
export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds
export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout
export async function sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const abortHandler = () => {
clearTimeout(timeout)
reject(new DOMException("Aborted", "AbortError"))
}
const timeout = setTimeout(
() => {
signal.removeEventListener("abort", abortHandler)
resolve()
},
Math.min(ms, RETRY_MAX_DELAY),
)
signal.addEventListener("abort", abortHandler, { once: true })
})
function cap(ms: number) {
return Math.min(ms, RETRY_MAX_DELAY)
}
export function delay(attempt: number, error?: MessageV2.APIError) {
@ -33,7 +23,7 @@ export namespace SessionRetry {
if (retryAfterMs) {
const parsedMs = Number.parseFloat(retryAfterMs)
if (!Number.isNaN(parsedMs)) {
return parsedMs
return cap(parsedMs)
}
}
@ -42,23 +32,23 @@ export namespace SessionRetry {
const parsedSeconds = Number.parseFloat(retryAfter)
if (!Number.isNaN(parsedSeconds)) {
// convert seconds to milliseconds
return Math.ceil(parsedSeconds * 1000)
return cap(Math.ceil(parsedSeconds * 1000))
}
// Try parsing as HTTP date format
const parsed = Date.parse(retryAfter) - Date.now()
if (!Number.isNaN(parsed) && parsed > 0) {
return Math.ceil(parsed)
return cap(Math.ceil(parsed))
}
}
return RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1)
return cap(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1))
}
}
return Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS)
return cap(Math.min(RETRY_INITIAL_DELAY * Math.pow(RETRY_BACKOFF_FACTOR, attempt - 1), RETRY_MAX_DELAY_NO_HEADERS))
}
export function retryable(error: ReturnType<NamedError["toObject"]>) {
export function retryable(error: Err) {
// context overflow errors should not be retried
if (MessageV2.ContextOverflowError.isInstance(error)) return undefined
if (MessageV2.APIError.isInstance(error)) {
@ -80,22 +70,37 @@ export namespace SessionRetry {
return undefined
}
})
try {
if (!json || typeof json !== "object") return undefined
const code = typeof json.code === "string" ? json.code : ""
if (!json || typeof json !== "object") return undefined
const code = typeof json.code === "string" ? json.code : ""
if (json.type === "error" && json.error?.type === "too_many_requests") {
return "Too Many Requests"
}
if (code.includes("exhausted") || code.includes("unavailable")) {
return "Provider is overloaded"
}
if (json.type === "error" && json.error?.code?.includes("rate_limit")) {
return "Rate Limited"
}
return JSON.stringify(json)
} catch {
return undefined
if (json.type === "error" && json.error?.type === "too_many_requests") {
return "Too Many Requests"
}
if (code.includes("exhausted") || code.includes("unavailable")) {
return "Provider is overloaded"
}
if (json.type === "error" && typeof json.error?.code === "string" && json.error.code.includes("rate_limit")) {
return "Rate Limited"
}
return undefined
}
export function policy(opts: {
parse: (error: unknown) => Err
set: (input: { attempt: number; message: string; next: number }) => Effect.Effect<void>
}) {
return Schedule.fromStepWithMetadata(
Effect.succeed((meta: Schedule.InputMetadata<unknown>) => {
const error = opts.parse(meta.input)
const message = retryable(error)
if (!message) return Cause.done(meta.attempt)
return Effect.gen(function* () {
const wait = delay(meta.attempt, MessageV2.APIError.isInstance(error) ? error : undefined)
const now = yield* Clock.currentTimeMillis
yield* opts.set({ attempt: meta.attempt, message, next: now + wait })
return [meta.attempt, Duration.millis(wait)] as [number, Duration.Duration]
})
}),
)
}
}

View File

@ -1,18 +1,28 @@
import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
import { APICallError } from "ai"
import { Cause, Effect, Exit, Layer, ManagedRuntime } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import { Bus } from "../../src/bus"
import { Config } from "../../src/config/config"
import { Agent } from "../../src/agent/agent"
import { LLM } from "../../src/session/llm"
import { SessionCompaction } from "../../src/session/compaction"
import { Token } from "../../src/util/token"
import { Instance } from "../../src/project/instance"
import { Log } from "../../src/util/log"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
import { tmpdir } from "../fixture/fixture"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { ModelID, ProviderID } from "../../src/provider/schema"
import type { Provider } from "../../src/provider/provider"
import * as ProviderModule from "../../src/provider/provider"
import * as SessionProcessorModule from "../../src/session/processor"
import { Snapshot } from "../../src/snapshot"
Log.init({ print: false })
@ -121,12 +131,13 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou
function fake(
input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0],
result: "continue" | "compact",
): ReturnType<(typeof SessionProcessorModule.SessionProcessor)["create"]> {
) {
const msg = input.assistantMessage
return {
get message() {
return msg
},
abort: Effect.fn("TestSessionProcessor.abort")(() => Effect.void),
partFromToolCall() {
return {
id: PartID.ascending(),
@ -138,10 +149,74 @@ function fake(
state: { status: "pending", input: {}, raw: "" },
}
},
process: async () => result,
process: Effect.fn("TestSessionProcessor.process")(() => Effect.succeed(result)),
} satisfies SessionProcessorModule.SessionProcessor.Handle
}
function layer(result: "continue" | "compact") {
return Layer.succeed(
SessionProcessorModule.SessionProcessor.Service,
SessionProcessorModule.SessionProcessor.Service.of({
create: Effect.fn("TestSessionProcessor.create")((input) => Effect.succeed(fake(input, result))),
}),
)
}
function runtime(result: "continue" | "compact", plugin = Plugin.defaultLayer) {
const bus = Bus.layer
return ManagedRuntime.make(
Layer.mergeAll(SessionCompaction.layer, bus).pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(layer(result)),
Layer.provide(Agent.defaultLayer),
Layer.provide(plugin),
Layer.provide(bus),
Layer.provide(Config.defaultLayer),
),
)
}
function llm() {
const queue: Array<
Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
> = []
return {
push(stream: Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)) {
queue.push(stream)
},
layer: Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: (input) => {
const item = queue.shift() ?? Stream.empty
const stream = typeof item === "function" ? item(input) : item
return stream.pipe(Stream.mapEffect((event) => Effect.succeed(event)))
},
}),
),
}
}
function liveRuntime(layer: Layer.Layer<LLM.Service>) {
const bus = Bus.layer
const status = SessionStatus.layer.pipe(Layer.provide(bus))
const processor = SessionProcessorModule.SessionProcessor.layer
return ManagedRuntime.make(
Layer.mergeAll(SessionCompaction.layer.pipe(Layer.provide(processor)), processor, bus, status).pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(Snapshot.defaultLayer),
Layer.provide(layer),
Layer.provide(Permission.layer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(status),
Layer.provide(bus),
Layer.provide(Config.defaultLayer),
),
)
}
function wait(ms = 50) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
@ -154,6 +229,17 @@ function defer() {
return { promise, resolve }
}
function plugin(ready: ReturnType<typeof defer>) {
return Layer.mock(Plugin.Service)({
trigger: <Name extends string, Input, Output>(name: Name, _input: Input, output: Output) => {
if (name !== "experimental.session.compacting") return Effect.succeed(output)
return Effect.sync(() => ready.resolve()).pipe(Effect.andThen(Effect.never), Effect.as(output))
},
list: () => Effect.succeed([]),
init: () => Effect.void,
})
}
describe("session.compaction.isOverflow", () => {
test("returns true when token count exceeds usable context", async () => {
await using tmp = await tmpdir()
@ -429,37 +515,49 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const msgs = await Session.messages({ sessionID: session.id })
const done = defer()
let seen = false
const unsub = Bus.subscribe(SessionCompaction.Event.Compacted, (evt) => {
if (evt.properties.sessionID !== session.id) return
seen = true
done.resolve()
})
const rt = runtime("continue")
let unsub: (() => void) | undefined
try {
unsub = await rt.runPromise(
Bus.Service.use((svc) =>
svc.subscribeCallback(SessionCompaction.Event.Compacted, (evt) => {
if (evt.properties.sessionID !== session.id) return
seen = true
done.resolve()
}),
),
)
const result = await SessionCompaction.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: new AbortController().signal,
auto: false,
})
const result = await rt.runPromise(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: new AbortController().signal,
auto: false,
}),
),
)
await Promise.race([
done.promise,
wait(500).then(() => {
throw new Error("timed out waiting for compacted event")
}),
])
unsub()
expect(result).toBe("continue")
expect(seen).toBe(true)
await Promise.race([
done.promise,
wait(500).then(() => {
throw new Error("timed out waiting for compacted event")
}),
])
expect(result).toBe("continue")
expect(seen).toBe(true)
} finally {
unsub?.()
await rt.dispose()
}
},
})
})
@ -470,27 +568,36 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "compact"))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const result = await SessionCompaction.process({
parentID: msg.id,
messages: await Session.messages({ sessionID: session.id }),
sessionID: session.id,
abort: new AbortController().signal,
auto: false,
})
const rt = runtime("compact")
try {
const msgs = await Session.messages({ sessionID: session.id })
const result = await rt.runPromise(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: new AbortController().signal,
auto: false,
}),
),
)
const summary = (await Session.messages({ sessionID: session.id })).find(
(msg) => msg.info.role === "assistant" && msg.info.summary,
)
const summary = (await Session.messages({ sessionID: session.id })).find(
(msg) => msg.info.role === "assistant" && msg.info.summary,
)
expect(result).toBe("stop")
expect(summary?.info.role).toBe("assistant")
if (summary?.info.role === "assistant") {
expect(summary.info.finish).toBe("error")
expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact")
expect(result).toBe("stop")
expect(summary?.info.role).toBe("assistant")
if (summary?.info.role === "assistant") {
expect(summary.info.finish).toBe("error")
expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact")
}
} finally {
await rt.dispose()
}
},
})
@ -502,30 +609,38 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const rt = runtime("continue")
try {
const msgs = await Session.messages({ sessionID: session.id })
const result = await rt.runPromise(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: new AbortController().signal,
auto: true,
}),
),
)
const result = await SessionCompaction.process({
parentID: msg.id,
messages: await Session.messages({ sessionID: session.id }),
sessionID: session.id,
abort: new AbortController().signal,
auto: true,
})
const all = await Session.messages({ sessionID: session.id })
const last = all.at(-1)
const msgs = await Session.messages({ sessionID: session.id })
const last = msgs.at(-1)
expect(result).toBe("continue")
expect(last?.info.role).toBe("user")
expect(last?.parts[0]).toMatchObject({
type: "text",
synthetic: true,
})
if (last?.parts[0]?.type === "text") {
expect(last.parts[0].text).toContain("Continue if you have next steps")
expect(result).toBe("continue")
expect(last?.info.role).toBe("user")
expect(last?.parts[0]).toMatchObject({
type: "text",
synthetic: true,
})
if (last?.parts[0]?.type === "text") {
expect(last.parts[0].text).toContain("Continue if you have next steps")
}
} finally {
await rt.dispose()
}
},
})
@ -537,7 +652,6 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
await user(session.id, "root")
@ -552,24 +666,33 @@ describe("session.compaction.process", () => {
url: "https://example.com/cat.png",
})
const msg = await user(session.id, "current")
const rt = runtime("continue")
try {
const msgs = await Session.messages({ sessionID: session.id })
const result = await rt.runPromise(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: new AbortController().signal,
auto: true,
overflow: true,
}),
),
)
const result = await SessionCompaction.process({
parentID: msg.id,
messages: await Session.messages({ sessionID: session.id }),
sessionID: session.id,
abort: new AbortController().signal,
auto: true,
overflow: true,
})
const last = (await Session.messages({ sessionID: session.id })).at(-1)
const last = (await Session.messages({ sessionID: session.id })).at(-1)
expect(result).toBe("continue")
expect(last?.info.role).toBe("user")
expect(last?.parts.some((part) => part.type === "file")).toBe(false)
expect(
last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")),
).toBe(true)
expect(result).toBe("continue")
expect(last?.info.role).toBe("user")
expect(last?.parts.some((part) => part.type === "file")).toBe(false)
expect(
last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")),
).toBe(true)
} finally {
await rt.dispose()
}
},
})
})
@ -580,27 +703,191 @@ describe("session.compaction.process", () => {
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
const session = await Session.create({})
await user(session.id, "earlier")
const msg = await user(session.id, "current")
const result = await SessionCompaction.process({
parentID: msg.id,
messages: await Session.messages({ sessionID: session.id }),
sessionID: session.id,
abort: new AbortController().signal,
auto: true,
overflow: true,
})
const rt = runtime("continue")
try {
const msgs = await Session.messages({ sessionID: session.id })
const result = await rt.runPromise(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: new AbortController().signal,
auto: true,
overflow: true,
}),
),
)
const last = (await Session.messages({ sessionID: session.id })).at(-1)
const last = (await Session.messages({ sessionID: session.id })).at(-1)
expect(result).toBe("continue")
expect(last?.info.role).toBe("user")
if (last?.parts[0]?.type === "text") {
expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit")
expect(result).toBe("continue")
expect(last?.info.role).toBe("user")
if (last?.parts[0]?.type === "text") {
expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit")
}
} finally {
await rt.dispose()
}
},
})
})
test("stops quickly when aborted during retry backoff", async () => {
const stub = llm()
const ready = defer()
stub.push(
Stream.fromAsyncIterable(
{
async *[Symbol.asyncIterator]() {
yield { type: "start" } as LLM.Event
throw new APICallError({
message: "boom",
url: "https://example.com/v1/chat/completions",
requestBodyValues: {},
statusCode: 503,
responseHeaders: { "retry-after-ms": "10000" },
responseBody: '{"error":"boom"}',
isRetryable: true,
})
},
},
(err) => err,
),
)
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const msgs = await Session.messages({ sessionID: session.id })
const abort = new AbortController()
const rt = liveRuntime(stub.layer)
let off: (() => void) | undefined
let run: Promise<"continue" | "stop"> | undefined
try {
off = await rt.runPromise(
Bus.Service.use((svc) =>
svc.subscribeCallback(SessionStatus.Event.Status, (evt) => {
if (evt.properties.sessionID !== session.id) return
if (evt.properties.status.type !== "retry") return
ready.resolve()
}),
),
)
run = rt
.runPromiseExit(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: abort.signal,
auto: false,
}),
),
{ signal: abort.signal },
)
.then((exit) => {
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
})
await Promise.race([
ready.promise,
wait(1000).then(() => {
throw new Error("timed out waiting for retry status")
}),
])
const start = Date.now()
abort.abort()
const result = await Promise.race([
run.then((value) => ({ kind: "done" as const, value, ms: Date.now() - start })),
wait(250).then(() => ({ kind: "timeout" as const })),
])
expect(result.kind).toBe("done")
if (result.kind === "done") {
expect(result.value).toBe("stop")
expect(result.ms).toBeLessThan(250)
}
} finally {
off?.()
abort.abort()
await rt.dispose()
await run?.catch(() => undefined)
}
},
})
})
test("does not leave a summary assistant when aborted before processor setup", async () => {
const ready = defer()
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const msgs = await Session.messages({ sessionID: session.id })
const abort = new AbortController()
const rt = runtime("continue", plugin(ready))
let run: Promise<"continue" | "stop"> | undefined
try {
run = rt
.runPromiseExit(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
abort: abort.signal,
auto: false,
}),
),
{ signal: abort.signal },
)
.then((exit) => {
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
})
await Promise.race([
ready.promise,
wait(1000).then(() => {
throw new Error("timed out waiting for compaction hook")
}),
])
abort.abort()
expect(await run).toBe("stop")
const all = await Session.messages({ sessionID: session.id })
expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false)
} finally {
abort.abort()
await rt.dispose()
await run?.catch(() => undefined)
}
},
})

View File

@ -0,0 +1,838 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { APICallError } from "ai"
import { Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
import { Bus } from "../../src/bus"
import { Config } from "../../src/config/config"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
import { Instance } from "../../src/project/instance"
import type { Provider } from "../../src/provider/provider"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { Session } from "../../src/session"
import { LLM } from "../../src/session/llm"
import { MessageV2 } from "../../src/session/message-v2"
import { SessionProcessor } from "../../src/session/processor"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { Snapshot } from "../../src/snapshot"
import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { provideTmpdirInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
Log.init({ print: false })
const ref = {
providerID: ProviderID.make("test"),
modelID: ModelID.make("test-model"),
}
type Script = Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
class TestLLM extends ServiceMap.Service<
TestLLM,
{
readonly push: (stream: Script) => Effect.Effect<void>
readonly reply: (...items: LLM.Event[]) => Effect.Effect<void>
readonly calls: Effect.Effect<number>
readonly inputs: Effect.Effect<LLM.StreamInput[]>
}
>()("@test/SessionProcessorLLM") {}
function stream(...items: LLM.Event[]) {
return Stream.make(...items)
}
function usage(input = 1, output = 1, total = input + output) {
return {
inputTokens: input,
outputTokens: output,
totalTokens: total,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
}
}
function start(): LLM.Event {
return { type: "start" }
}
function textStart(id = "t"): LLM.Event {
return { type: "text-start", id }
}
function textDelta(id: string, text: string): LLM.Event {
return { type: "text-delta", id, text }
}
function textEnd(id = "t"): LLM.Event {
return { type: "text-end", id }
}
function reasoningStart(id: string): LLM.Event {
return { type: "reasoning-start", id }
}
function reasoningDelta(id: string, text: string): LLM.Event {
return { type: "reasoning-delta", id, text }
}
function reasoningEnd(id: string): LLM.Event {
return { type: "reasoning-end", id }
}
function finishStep(): LLM.Event {
return {
type: "finish-step",
finishReason: "stop",
rawFinishReason: "stop",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: usage(),
}
}
function finish(): LLM.Event {
return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
}
function toolInputStart(id: string, toolName: string): LLM.Event {
return { type: "tool-input-start", id, toolName }
}
function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
return { type: "tool-call", toolCallId, toolName, input }
}
function fail<E>(err: E, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fail(err)))
}
function wait(abort: AbortSignal) {
return Effect.promise(
() =>
new Promise<void>((done) => {
abort.addEventListener("abort", () => done(), { once: true })
}),
)
}
function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(
Stream.concat(
Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
),
)
}
function model(context: number): Provider.Model {
return {
id: "test-model",
providerID: "test",
name: "Test",
limit: { context, output: 10 },
cost: { input: 0, output: 0, cache: { read: 0, write: 0 } },
capabilities: {
toolcall: true,
attachment: false,
reasoning: false,
temperature: true,
input: { text: true, image: false, audio: false, video: false },
output: { text: true, image: false, audio: false, video: false },
},
api: { npm: "@ai-sdk/anthropic" },
options: {},
} as Provider.Model
}
function agent(): Agent.Info {
return {
name: "build",
mode: "primary",
options: {},
permission: [{ permission: "*", pattern: "*", action: "allow" }],
}
}
function defer<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((done) => {
resolve = done
})
return { promise, resolve }
}
const user = Effect.fn("TestSession.user")(function* (sessionID: SessionID, text: string) {
const session = yield* Session.Service
const msg = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
sessionID,
agent: "build",
model: ref,
time: { created: Date.now() },
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: msg.id,
sessionID,
type: "text",
text,
})
return msg
})
const assistant = Effect.fn("TestSession.assistant")(function* (
sessionID: SessionID,
parentID: MessageID,
root: string,
) {
const session = yield* Session.Service
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
sessionID,
mode: "build",
agent: "build",
path: { cwd: root, root },
cost: 0,
tokens: {
total: 0,
input: 0,
output: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: ref.modelID,
providerID: ref.providerID,
parentID,
time: { created: Date.now() },
finish: "end_turn",
}
yield* session.updateMessage(msg)
return msg
})
const llm = Layer.unwrap(
Effect.gen(function* () {
const queue: Script[] = []
const inputs: LLM.StreamInput[] = []
let calls = 0
const push = Effect.fn("TestLLM.push")((item: Script) => {
queue.push(item)
return Effect.void
})
const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
return Layer.mergeAll(
Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: (input) => {
calls += 1
inputs.push(input)
const item = queue.shift() ?? Stream.empty
return typeof item === "function" ? item(input) : item
},
}),
),
Layer.succeed(
TestLLM,
TestLLM.of({
push,
reply,
calls: Effect.sync(() => calls),
inputs: Effect.sync(() => [...inputs]),
}),
),
)
}),
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
const deps = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
AgentSvc.defaultLayer,
Permission.layer,
Plugin.defaultLayer,
Config.defaultLayer,
status,
llm,
).pipe(Layer.provideMerge(infra))
const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
const it = testEffect(env)
it.effect("session.processor effect tests capture llm input cleanly", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.reply(start(), textStart(), textDelta("t", "hello"), textEnd(), finishStep(), finish())
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const input = {
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "hi" }],
tools: {},
} satisfies LLM.StreamInput
const value = yield* handle.process(input)
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const calls = yield* test.calls
const inputs = yield* test.inputs
expect(value).toBe("continue")
expect(calls).toBe(1)
expect(inputs).toHaveLength(1)
expect(inputs[0].messages).toStrictEqual([{ role: "user", content: "hi" }])
expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true)
}),
{ git: true },
)
})
it.effect("session.processor effect tests stop after token overflow requests compaction", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.reply(
start(),
{
type: "finish-step",
finishReason: "stop",
rawFinishReason: "stop",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: usage(100, 0, 100),
},
textStart(),
textDelta("t", "after"),
textEnd(),
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(20)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "compact" }],
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
expect(value).toBe("compact")
expect(parts.some((part) => part.type === "text")).toBe(false)
expect(parts.some((part) => part.type === "step-finish")).toBe(true)
}),
{ git: true },
)
})
it.effect("session.processor effect tests reset reasoning state across retries", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.push(
fail(
new APICallError({
message: "boom",
url: "https://example.com/v1/chat/completions",
requestBodyValues: {},
statusCode: 503,
responseHeaders: { "retry-after-ms": "0" },
responseBody: '{"error":"boom"}',
isRetryable: true,
}),
start(),
reasoningStart("r"),
reasoningDelta("r", "one"),
),
)
yield* test.reply(
start(),
reasoningStart("r"),
reasoningDelta("r", "two"),
reasoningEnd("r"),
finishStep(),
finish(),
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "reason")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "reason" }],
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
expect(value).toBe("continue")
expect(yield* test.calls).toBe(2)
expect(reasoning.some((part) => part.text === "two")).toBe(true)
expect(reasoning.some((part) => part.text === "onetwo")).toBe(false)
}),
{ git: true },
)
})
it.effect("session.processor effect tests do not retry unknown json errors", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.push(fail({ error: { message: "no_kv_space" } }, start()))
const chat = yield* session.create({})
const parent = yield* user(chat.id, "json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "json" }],
tools: {},
})
expect(value).toBe("stop")
expect(yield* test.calls).toBe(1)
expect(yield* test.inputs).toHaveLength(1)
expect(handle.message.error?.name).toBe("UnknownError")
}),
{ git: true },
)
})
it.effect("session.processor effect tests retry recognized structured json errors", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.push(fail({ type: "error", error: { type: "too_many_requests" } }, start()))
yield* test.reply(start(), textStart(), textDelta("t", "after"), textEnd(), finishStep(), finish())
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "retry json" }],
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
expect(value).toBe("continue")
expect(yield* test.calls).toBe(2)
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
expect(handle.message.error).toBeUndefined()
}),
{ git: true },
)
})
it.effect("session.processor effect tests publish retry status updates", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const bus = yield* Bus.Service
yield* test.push(
fail(
new APICallError({
message: "boom",
url: "https://example.com/v1/chat/completions",
requestBodyValues: {},
statusCode: 503,
responseHeaders: { "retry-after-ms": "0" },
responseBody: '{"error":"boom"}',
isRetryable: true,
}),
start(),
),
)
yield* test.reply(start(), finishStep(), finish())
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const states: number[] = []
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
if (evt.properties.sessionID !== chat.id) return
if (evt.properties.status.type === "retry") states.push(evt.properties.status.attempt)
})
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "retry" }],
tools: {},
})
off()
expect(value).toBe("continue")
expect(yield* test.calls).toBe(2)
expect(states).toStrictEqual([1])
}),
{ git: true },
)
})
it.effect("session.processor effect tests compact on structured context overflow", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.push(fail({ type: "error", error: { code: "context_length_exceeded" } }, start()))
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "compact json" }],
tools: {},
})
expect(value).toBe("compact")
expect(yield* test.calls).toBe(1)
expect(handle.message.error).toBeUndefined()
}),
{ git: true },
)
})
it.effect("session.processor effect tests mark pending tools as aborted on cleanup", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const seen = defer<void>()
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
yield* test.push((input) =>
hang(input, start(), toolInputStart("tool-1", "bash"), toolCall("tool-1", "bash", { cmd: "pwd" })).pipe(
Stream.tap((event) => (event.type === "tool-call" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "tool abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const run = Effect.runPromise(
handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "tool abort" }],
tools: {},
}),
)
yield* Effect.promise(() => ready.promise)
abort.abort()
const value = yield* Effect.promise(() => run)
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
expect(value).toBe("stop")
expect(yield* test.calls).toBe(1)
expect(tool?.state.status).toBe("error")
if (tool?.state.status === "error") {
expect(tool.state.error).toBe("Tool execution aborted")
expect(tool.state.time.end).toBeDefined()
}
}),
{ git: true },
)
})
it.effect("session.processor effect tests record aborted errors and idle state", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const seen = defer<void>()
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const bus = yield* Bus.Service
const status = yield* SessionStatus.Service
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100)
const errs: string[] = []
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
if (evt.properties.sessionID !== chat.id) return
if (!evt.properties.error) return
errs.push(evt.properties.error.name)
seen.resolve()
})
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
abort: abort.signal,
})
const run = Effect.runPromise(
handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
abort: abort.signal,
messages: [{ role: "user", content: "abort" }],
tools: {},
}),
)
yield* Effect.promise(() => ready.promise)
abort.abort()
const value = yield* Effect.promise(() => run)
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const state = yield* status.get(chat.id)
off()
expect(value).toBe("stop")
expect(handle.message.error?.name).toBe("MessageAbortedError")
expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") {
expect(stored.info.error?.name).toBe("MessageAbortedError")
}
expect(state).toMatchObject({ type: "idle" })
expect(errs).toContain("MessageAbortedError")
}),
{ git: true },
)
})

View File

@ -12,6 +12,83 @@ import { tmpdir } from "../fixture/fixture"
Log.init({ print: false })
function defer<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((done) => {
resolve = done
})
return { promise, resolve }
}
function chat(text: string) {
const payload =
[
`data: ${JSON.stringify({
id: "chatcmpl-1",
object: "chat.completion.chunk",
choices: [{ delta: { role: "assistant" } }],
})}`,
`data: ${JSON.stringify({
id: "chatcmpl-1",
object: "chat.completion.chunk",
choices: [{ delta: { content: text } }],
})}`,
`data: ${JSON.stringify({
id: "chatcmpl-1",
object: "chat.completion.chunk",
choices: [{ delta: {}, finish_reason: "stop" }],
})}`,
"data: [DONE]",
].join("\n\n") + "\n\n"
const encoder = new TextEncoder()
return new ReadableStream<Uint8Array>({
start(ctrl) {
ctrl.enqueue(encoder.encode(payload))
ctrl.close()
},
})
}
function hanging(ready: () => void) {
const encoder = new TextEncoder()
let timer: ReturnType<typeof setTimeout> | undefined
const first =
`data: ${JSON.stringify({
id: "chatcmpl-1",
object: "chat.completion.chunk",
choices: [{ delta: { role: "assistant" } }],
})}` + "\n\n"
const rest =
[
`data: ${JSON.stringify({
id: "chatcmpl-1",
object: "chat.completion.chunk",
choices: [{ delta: { content: "late" } }],
})}`,
`data: ${JSON.stringify({
id: "chatcmpl-1",
object: "chat.completion.chunk",
choices: [{ delta: {}, finish_reason: "stop" }],
})}`,
"data: [DONE]",
].join("\n\n") + "\n\n"
return new ReadableStream<Uint8Array>({
start(ctrl) {
ctrl.enqueue(encoder.encode(first))
ready()
timer = setTimeout(() => {
ctrl.enqueue(encoder.encode(rest))
ctrl.close()
}, 10000)
},
cancel() {
if (timer) clearTimeout(timer)
},
})
}
describe("session.prompt missing file", () => {
test("does not fail the prompt when a file part is missing", async () => {
await using tmp = await tmpdir({
@ -149,6 +226,159 @@ describe("session.prompt special characters", () => {
})
})
describe("session.prompt regression", () => {
test("does not loop empty assistant turns for a simple reply", async () => {
let calls = 0
const server = Bun.serve({
port: 0,
fetch(req) {
const url = new URL(req.url)
if (!url.pathname.endsWith("/chat/completions")) {
return new Response("not found", { status: 404 })
}
calls++
return new Response(chat("packages/opencode/src/session/processor.ts"), {
status: 200,
headers: { "Content-Type": "text/event-stream" },
})
},
})
try {
await using tmp = await tmpdir({
git: true,
init: async (dir) => {
await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify({
$schema: "https://opencode.ai/config.json",
enabled_providers: ["alibaba"],
provider: {
alibaba: {
options: {
apiKey: "test-key",
baseURL: `${server.url.origin}/v1`,
},
},
},
agent: {
build: {
model: "alibaba/qwen-plus",
},
},
}),
)
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({ title: "Prompt regression" })
const result = await SessionPrompt.prompt({
sessionID: session.id,
agent: "build",
parts: [{ type: "text", text: "Where is SessionProcessor?" }],
})
expect(result.info.role).toBe("assistant")
expect(result.parts.some((part) => part.type === "text" && part.text.includes("processor.ts"))).toBe(true)
const msgs = await Session.messages({ sessionID: session.id })
expect(msgs.filter((msg) => msg.info.role === "assistant")).toHaveLength(1)
expect(calls).toBe(1)
},
})
} finally {
server.stop(true)
}
})
test("records aborted errors when prompt is cancelled mid-stream", async () => {
const ready = defer<void>()
const server = Bun.serve({
port: 0,
fetch(req) {
const url = new URL(req.url)
if (!url.pathname.endsWith("/chat/completions")) {
return new Response("not found", { status: 404 })
}
return new Response(
hanging(() => ready.resolve()),
{
status: 200,
headers: { "Content-Type": "text/event-stream" },
},
)
},
})
try {
await using tmp = await tmpdir({
git: true,
init: async (dir) => {
await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify({
$schema: "https://opencode.ai/config.json",
enabled_providers: ["alibaba"],
provider: {
alibaba: {
options: {
apiKey: "test-key",
baseURL: `${server.url.origin}/v1`,
},
},
},
agent: {
build: {
model: "alibaba/qwen-plus",
},
},
}),
)
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({ title: "Prompt cancel regression" })
const run = SessionPrompt.prompt({
sessionID: session.id,
agent: "build",
parts: [{ type: "text", text: "Cancel me" }],
})
await ready.promise
await SessionPrompt.cancel(session.id)
const result = await Promise.race([
run,
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("timed out waiting for cancel")), 1000),
),
])
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") {
expect(result.info.error?.name).toBe("MessageAbortedError")
}
const msgs = await Session.messages({ sessionID: session.id })
const last = msgs.findLast((msg) => msg.info.role === "assistant")
expect(last?.info.role).toBe("assistant")
if (last?.info.role === "assistant") {
expect(last.info.error?.name).toBe("MessageAbortedError")
}
},
})
} finally {
server.stop(true)
}
})
})
describe("session.prompt agent variant", () => {
test("applies agent variant only when using agent model", async () => {
const prev = process.env.OPENAI_API_KEY

View File

@ -2,9 +2,14 @@ import { describe, expect, test } from "bun:test"
import type { NamedError } from "@opencode-ai/util/error"
import { APICallError } from "ai"
import { setTimeout as sleep } from "node:timers/promises"
import { Effect, Schedule } from "effect"
import { SessionRetry } from "../../src/session/retry"
import { MessageV2 } from "../../src/session/message-v2"
import { ProviderID } from "../../src/provider/schema"
import { SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
const providerID = ProviderID.make("test")
@ -69,24 +74,47 @@ describe("session.retry.delay", () => {
expect(SessionRetry.delay(1, longError)).toBe(700000)
})
test("sleep caps delay to max 32-bit signed integer to avoid TimeoutOverflowWarning", async () => {
const controller = new AbortController()
test("caps oversized header delays to the runtime timer limit", () => {
const error = apiError({ "retry-after-ms": "999999999999" })
expect(SessionRetry.delay(1, error)).toBe(SessionRetry.RETRY_MAX_DELAY)
})
const warnings: string[] = []
const originalWarn = process.emitWarning
process.emitWarning = (warning: string | Error) => {
warnings.push(typeof warning === "string" ? warning : warning.message)
}
test("policy updates retry status and increments attempts", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const sessionID = SessionID.make("session-retry-test")
const error = apiError({ "retry-after-ms": "0" })
const promise = SessionRetry.sleep(2_560_914_000, controller.signal)
controller.abort()
await Effect.runPromise(
Effect.gen(function* () {
const step = yield* Schedule.toStepWithMetadata(
SessionRetry.policy({
parse: (err) => err as MessageV2.APIError,
set: (info) =>
Effect.promise(() =>
SessionStatus.set(sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
}),
),
}),
)
yield* step(error)
yield* step(error)
}),
)
try {
await promise
} catch {}
process.emitWarning = originalWarn
expect(warnings.some((w) => w.includes("TimeoutOverflowWarning"))).toBe(false)
expect(await SessionStatus.get(sessionID)).toMatchObject({
type: "retry",
attempt: 2,
message: "boom",
})
},
})
})
})
@ -101,9 +129,9 @@ describe("session.retry.retryable", () => {
expect(SessionRetry.retryable(error)).toBe("Provider is overloaded")
})
test("handles json messages without code", () => {
test("does not retry unknown json messages", () => {
const error = wrap(JSON.stringify({ error: { message: "no_kv_space" } }))
expect(SessionRetry.retryable(error)).toBe(`{"error":{"message":"no_kv_space"}}`)
expect(SessionRetry.retryable(error)).toBeUndefined()
})
test("does not throw on numeric error codes", () => {