refactor(session): effectify SessionPrompt service (#19483)

pull/20125/head
Kit Langton 2026-03-30 16:06:51 -04:00 committed by GitHub
parent fa95a61c4e
commit c5442d418d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 4439 additions and 2142 deletions

View File

@ -0,0 +1,216 @@
import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect"
export interface Runner<A, E = never> {
readonly state: Runner.State<A, E>
readonly busy: boolean
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly cancel: Effect.Effect<void>
}
export namespace Runner {
export class Cancelled extends Schema.TaggedErrorClass<Cancelled>()("RunnerCancelled", {}) {}
interface RunHandle<A, E> {
id: number
done: Deferred.Deferred<A, E | Cancelled>
fiber: Fiber.Fiber<A, E>
}
interface ShellHandle<A, E> {
id: number
fiber: Fiber.Fiber<A, E>
abort: AbortController
}
interface PendingHandle<A, E> {
id: number
done: Deferred.Deferred<A, E | Cancelled>
work: Effect.Effect<A, E>
}
export type State<A, E> =
| { readonly _tag: "Idle" }
| { readonly _tag: "Running"; readonly run: RunHandle<A, E> }
| { readonly _tag: "Shell"; readonly shell: ShellHandle<A, E> }
| { readonly _tag: "ShellThenRun"; readonly shell: ShellHandle<A, E>; readonly run: PendingHandle<A, E> }
export const make = <A, E = never>(
scope: Scope.Scope,
opts?: {
onIdle?: Effect.Effect<void>
onBusy?: Effect.Effect<void>
onInterrupt?: Effect.Effect<A, E>
busy?: () => never
},
): Runner<A, E> => {
const ref = SynchronizedRef.makeUnsafe<State<A, E>>({ _tag: "Idle" })
const idle = opts?.onIdle ?? Effect.void
const busy = opts?.onBusy ?? Effect.void
const onInterrupt = opts?.onInterrupt
let ids = 0
const state = () => SynchronizedRef.getUnsafe(ref)
const next = () => {
ids += 1
return ids
}
const complete = (done: Deferred.Deferred<A, E | Cancelled>, exit: Exit.Exit<A, E>) =>
Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)
? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid)
: Deferred.done(done, exit).pipe(Effect.asVoid)
const idleIfCurrent = () =>
SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten)
const finishRun = (id: number, done: Deferred.Deferred<A, E | Cancelled>, exit: Exit.Exit<A, E>) =>
SynchronizedRef.modify(
ref,
(st) =>
[
Effect.gen(function* () {
if (st._tag === "Running" && st.run.id === id) yield* idle
yield* complete(done, exit)
}),
st._tag === "Running" && st.run.id === id ? ({ _tag: "Idle" } as const) : st,
] as const,
).pipe(Effect.flatten)
const startRun = (work: Effect.Effect<A, E>, done: Deferred.Deferred<A, E | Cancelled>) =>
Effect.gen(function* () {
const id = next()
const fiber = yield* work.pipe(
Effect.onExit((exit) => finishRun(id, done, exit)),
Effect.forkIn(scope),
)
return { id, done, fiber } satisfies RunHandle<A, E>
})
const finishShell = (id: number) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const
if (st._tag === "ShellThenRun" && st.shell.id === id) {
const run = yield* startRun(st.run.work, st.run.done)
return [Effect.void, { _tag: "Running", run }] as const
}
return [Effect.void, st] as const
}),
).pipe(Effect.flatten)
const stopShell = (shell: ShellHandle<A, E>) =>
Effect.gen(function* () {
shell.abort.abort()
const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
})
const ensureRunning = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
switch (st._tag) {
case "Running":
case "ShellThenRun":
return [Deferred.await(st.run.done), st] as const
case "Shell": {
const run = {
id: next(),
done: yield* Deferred.make<A, E | Cancelled>(),
work,
} satisfies PendingHandle<A, E>
return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const
}
case "Idle": {
const done = yield* Deferred.make<A, E | Cancelled>()
const run = yield* startRun(work, done)
return [Deferred.await(done), { _tag: "Running", run }] as const
}
}
}),
).pipe(
Effect.flatten,
Effect.catch((e): Effect.Effect<A, E> =>
e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E),
),
)
const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
if (st._tag !== "Idle") {
return [
Effect.sync(() => {
if (opts?.busy) opts.busy()
throw new Error("Runner is busy")
}),
st,
] as const
}
yield* busy
const id = next()
const abort = new AbortController()
const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
return [
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)
if (Exit.isSuccess(exit)) return exit.value
if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt
return yield* Effect.failCause(exit.cause)
}),
{ _tag: "Shell", shell },
] as const
}),
).pipe(Effect.flatten)
const cancel = SynchronizedRef.modify(ref, (st) => {
switch (st._tag) {
case "Idle":
return [Effect.void, st] as const
case "Running":
return [
Effect.gen(function* () {
yield* Fiber.interrupt(st.run.fiber)
yield* Deferred.await(st.run.done).pipe(Effect.exit, Effect.asVoid)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
] as const
case "Shell":
return [
Effect.gen(function* () {
yield* stopShell(st.shell)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
] as const
case "ShellThenRun":
return [
Effect.gen(function* () {
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
yield* stopShell(st.shell)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
] as const
}
}).pipe(Effect.flatten)
return {
get state() {
return state()
},
get busy() {
return state()._tag !== "Idle"
},
ensureRunning,
startShell,
cancel,
}
}
}

View File

@ -381,7 +381,7 @@ export const SessionRoutes = lazy(() =>
}), }),
), ),
async (c) => { async (c) => {
SessionPrompt.cancel(c.req.valid("param").sessionID) await SessionPrompt.cancel(c.req.valid("param").sessionID)
return c.json(true) return c.json(true)
}, },
) )
@ -699,7 +699,7 @@ export const SessionRoutes = lazy(() =>
), ),
async (c) => { async (c) => {
const params = c.req.valid("param") const params = c.req.valid("param")
SessionPrompt.assertNotBusy(params.sessionID) await SessionPrompt.assertNotBusy(params.sessionID)
await Session.removeMessage({ await Session.removeMessage({
sessionID: params.sessionID, sessionID: params.sessionID,
messageID: params.messageID, messageID: params.messageID,

View File

@ -15,7 +15,7 @@ import { Plugin } from "@/plugin"
import { Config } from "@/config/config" import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db" import { NotFoundError } from "@/storage/db"
import { ModelID, ProviderID } from "@/provider/schema" import { ModelID, ProviderID } from "@/provider/schema"
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect" import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service" import { makeRuntime } from "@/effect/run-service"
import { isOverflow as overflow } from "./overflow" import { isOverflow as overflow } from "./overflow"
@ -45,7 +45,6 @@ export namespace SessionCompaction {
parentID: MessageID parentID: MessageID
messages: MessageV2.WithParts[] messages: MessageV2.WithParts[]
sessionID: SessionID sessionID: SessionID
abort: AbortSignal
auto: boolean auto: boolean
overflow?: boolean overflow?: boolean
}) => Effect.Effect<"continue" | "stop"> }) => Effect.Effect<"continue" | "stop">
@ -135,20 +134,28 @@ export namespace SessionCompaction {
parentID: MessageID parentID: MessageID
messages: MessageV2.WithParts[] messages: MessageV2.WithParts[]
sessionID: SessionID sessionID: SessionID
abort: AbortSignal
auto: boolean auto: boolean
overflow?: boolean overflow?: boolean
}) { }) {
const userMessage = input.messages.findLast((m) => m.info.id === input.parentID)!.info as MessageV2.User const parent = input.messages.findLast((m) => m.info.id === input.parentID)
if (!parent || parent.info.role !== "user") {
throw new Error(`Compaction parent must be a user message: ${input.parentID}`)
}
const userMessage = parent.info
let messages = input.messages let messages = input.messages
let replay: MessageV2.WithParts | undefined let replay:
| {
info: MessageV2.User
parts: MessageV2.Part[]
}
| undefined
if (input.overflow) { if (input.overflow) {
const idx = input.messages.findIndex((m) => m.info.id === input.parentID) const idx = input.messages.findIndex((m) => m.info.id === input.parentID)
for (let i = idx - 1; i >= 0; i--) { for (let i = idx - 1; i >= 0; i--) {
const msg = input.messages[i] const msg = input.messages[i]
if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) { if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) {
replay = msg replay = { info: msg.info, parts: msg.parts }
messages = input.messages.slice(0, i) messages = input.messages.slice(0, i)
break break
} }
@ -206,7 +213,7 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages) const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const msg = (yield* session.updateMessage({ const msg: MessageV2.Assistant = {
id: MessageID.ascending(), id: MessageID.ascending(),
role: "assistant", role: "assistant",
parentID: input.parentID, parentID: input.parentID,
@ -231,25 +238,17 @@ When constructing the summary, try to stick to this template:
time: { time: {
created: Date.now(), created: Date.now(),
}, },
})) as MessageV2.Assistant }
yield* session.updateMessage(msg)
const processor = yield* processors.create({ const processor = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: input.sessionID, sessionID: input.sessionID,
model, model,
abort: input.abort,
})
const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
if (!input.abort.aborted || msg.time.completed) return
msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject()
msg.finish = msg.finish ?? "error"
msg.time.completed = Date.now()
yield* session.updateMessage(msg)
}) })
const result = yield* processor const result = yield* processor
.process({ .process({
user: userMessage, user: userMessage,
agent, agent,
abort: input.abort,
sessionID: input.sessionID, sessionID: input.sessionID,
tools: {}, tools: {},
system: [], system: [],
@ -262,7 +261,7 @@ When constructing the summary, try to stick to this template:
], ],
model, model,
}) })
.pipe(Effect.ensuring(cancel())) .pipe(Effect.onInterrupt(() => processor.abort()))
if (result === "compact") { if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({ processor.message.error = new MessageV2.ContextOverflowError({
@ -277,7 +276,7 @@ When constructing the summary, try to stick to this template:
if (result === "continue" && input.auto) { if (result === "continue" && input.auto) {
if (replay) { if (replay) {
const original = replay.info as MessageV2.User const original = replay.info
const replayMsg = yield* session.updateMessage({ const replayMsg = yield* session.updateMessage({
id: MessageID.ascending(), id: MessageID.ascending(),
role: "user", role: "user",
@ -386,7 +385,7 @@ When constructing the summary, try to stick to this template:
), ),
) )
const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer) const { runPromise } = makeRuntime(Service, defaultLayer)
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) { export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input)) return runPromise((svc) => svc.isOverflow(input))
@ -396,21 +395,16 @@ When constructing the summary, try to stick to this template:
return runPromise((svc) => svc.prune(input)) return runPromise((svc) => svc.prune(input))
} }
export async function process(input: { export const process = fn(
parentID: MessageID z.object({
messages: MessageV2.WithParts[] parentID: MessageID.zod,
sessionID: SessionID messages: z.custom<MessageV2.WithParts[]>(),
abort: AbortSignal sessionID: SessionID.zod,
auto: boolean auto: z.boolean(),
overflow?: boolean overflow: z.boolean().optional(),
}) { }),
const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort }) (input) => runPromise((svc) => svc.process(input)),
if (Exit.isFailure(exit)) { )
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
}
export const create = fn( export const create = fn(
z.object({ z.object({

View File

@ -334,14 +334,14 @@ export namespace Session {
readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[]> readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[]>
readonly children: (parentID: SessionID) => Effect.Effect<Info[]> readonly children: (parentID: SessionID) => Effect.Effect<Info[]>
readonly remove: (sessionID: SessionID) => Effect.Effect<void> readonly remove: (sessionID: SessionID) => Effect.Effect<void>
readonly updateMessage: (msg: MessageV2.Info) => Effect.Effect<MessageV2.Info> readonly updateMessage: <T extends MessageV2.Info>(msg: T) => Effect.Effect<T>
readonly removeMessage: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<MessageID> readonly removeMessage: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<MessageID>
readonly removePart: (input: { readonly removePart: (input: {
sessionID: SessionID sessionID: SessionID
messageID: MessageID messageID: MessageID
partID: PartID partID: PartID
}) => Effect.Effect<PartID> }) => Effect.Effect<PartID>
readonly updatePart: (part: MessageV2.Part) => Effect.Effect<MessageV2.Part> readonly updatePart: <T extends MessageV2.Part>(part: T) => Effect.Effect<T>
readonly updatePartDelta: (input: { readonly updatePartDelta: (input: {
sessionID: SessionID sessionID: SessionID
messageID: MessageID messageID: MessageID
@ -469,17 +469,14 @@ export namespace Session {
} }
}) })
const updateMessage = Effect.fn("Session.updateMessage")(function* (msg: MessageV2.Info) { const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
yield* Effect.sync(() => Effect.gen(function* () {
SyncEvent.run(MessageV2.Event.Updated, { yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
sessionID: msg.sessionID,
info: msg,
}),
)
return msg return msg
}) }).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = Effect.fn("Session.updatePart")(function* (part: MessageV2.Part) { const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() => yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, { SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID, sessionID: part.sessionID,
@ -488,7 +485,7 @@ export namespace Session {
}), }),
) )
return part return part
}) }).pipe(Effect.withSpan("Session.updatePart"))
const create = Effect.fn("Session.create")(function* (input?: { const create = Effect.fn("Session.create")(function* (input?: {
parentID?: SessionID parentID?: SessionID
@ -851,7 +848,9 @@ export namespace Session {
export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id))) export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id)))
export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id))) export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id)))
export const updateMessage = fn(MessageV2.Info, (msg) => runPromise((svc) => svc.updateMessage(msg))) export async function updateMessage<T extends MessageV2.Info>(msg: T): Promise<T> {
return runPromise((svc) => svc.updateMessage(MessageV2.Info.parse(msg) as T))
}
export const removeMessage = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod }), (input) => export const removeMessage = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod }), (input) =>
runPromise((svc) => svc.removeMessage(input)), runPromise((svc) => svc.removeMessage(input)),
@ -862,7 +861,9 @@ export namespace Session {
(input) => runPromise((svc) => svc.removePart(input)), (input) => runPromise((svc) => svc.removePart(input)),
) )
export const updatePart = fn(MessageV2.Part, (part) => runPromise((svc) => svc.updatePart(part))) export async function updatePart<T extends MessageV2.Part>(part: T): Promise<T> {
return runPromise((svc) => svc.updatePart(MessageV2.Part.parse(part) as T))
}
export const updatePartDelta = fn( export const updatePartDelta = fn(
z.object({ z.object({

View File

@ -1,6 +1,7 @@
import { Provider } from "@/provider/provider" import { Provider } from "@/provider/provider"
import { Log } from "@/util/log" import { Log } from "@/util/log"
import { Effect, Layer, ServiceMap } from "effect" import { Cause, Effect, Layer, Record, ServiceMap } from "effect"
import * as Queue from "effect/Queue"
import * as Stream from "effect/Stream" import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda" import { mergeDeep, pipe } from "remeda"
@ -28,7 +29,6 @@ export namespace LLM {
agent: Agent.Info agent: Agent.Info
permission?: Permission.Ruleset permission?: Permission.Ruleset
system: string[] system: string[]
abort: AbortSignal
messages: ModelMessage[] messages: ModelMessage[]
small?: boolean small?: boolean
tools: Record<string, Tool> tools: Record<string, Tool>
@ -36,6 +36,10 @@ export namespace LLM {
toolChoice?: "auto" | "required" | "none" toolChoice?: "auto" | "required" | "none"
} }
export type StreamRequest = StreamInput & {
abort: AbortSignal
}
export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
export interface Interface { export interface Interface {
@ -49,15 +53,32 @@ export namespace LLM {
Effect.gen(function* () { Effect.gen(function* () {
return Service.of({ return Service.of({
stream(input) { stream(input) {
return Stream.unwrap( const stream: Stream.Stream<Event, unknown> = Stream.scoped(
Effect.promise(() => LLM.stream(input)).pipe( Stream.unwrap(
Effect.map((result) => Effect.gen(function* () {
Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe( const ctrl = yield* Effect.acquireRelease(
Stream.mapEffect((event) => Effect.succeed(event)), Effect.sync(() => new AbortController()),
), (ctrl) => Effect.sync(() => ctrl.abort()),
), )
const queue = yield* Queue.unbounded<Event, unknown | Cause.Done>()
yield* Effect.promise(async () => {
const result = await LLM.stream({ ...input, abort: ctrl.signal })
for await (const event of result.fullStream) {
if (!Queue.offerUnsafe(queue, event)) break
}
Queue.endUnsafe(queue)
}).pipe(
Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))),
Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())),
Effect.forkScoped,
)
return Stream.fromQueue(queue)
}),
), ),
) )
return stream
}, },
}) })
}), }),
@ -65,7 +86,7 @@ export namespace LLM {
export const defaultLayer = layer export const defaultLayer = layer
export async function stream(input: StreamInput) { export async function stream(input: StreamRequest) {
const l = log const l = log
.clone() .clone()
.tag("providerID", input.model.providerID) .tag("providerID", input.model.providerID)
@ -322,17 +343,12 @@ export namespace LLM {
}) })
} }
async function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) { function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
const disabled = Permission.disabled( const disabled = Permission.disabled(
Object.keys(input.tools), Object.keys(input.tools),
Permission.merge(input.agent.permission, input.permission ?? []), Permission.merge(input.agent.permission, input.permission ?? []),
) )
for (const tool of Object.keys(input.tools)) { return Record.filter(input.tools, (_, k) => input.user.tools?.[k] !== false && !disabled.has(k))
if (input.user.tools?.[tool] === false || disabled.has(tool)) {
delete input.tools[tool]
}
}
return input.tools
} }
// Check if messages contain any tool-call content // Check if messages contain any tool-call content

View File

@ -1,8 +1,7 @@
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect" import { Cause, Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream" import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent" import { Agent } from "@/agent/agent"
import { Bus } from "@/bus" import { Bus } from "@/bus"
import { makeRuntime } from "@/effect/run-service"
import { Config } from "@/config/config" import { Config } from "@/config/config"
import { Permission } from "@/permission" import { Permission } from "@/permission"
import { Plugin } from "@/plugin" import { Plugin } from "@/plugin"
@ -35,17 +34,10 @@ export namespace SessionProcessor {
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result> readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
} }
export interface Info {
readonly message: MessageV2.Assistant
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
readonly process: (streamInput: LLM.StreamInput) => Promise<Result>
}
type Input = { type Input = {
assistantMessage: MessageV2.Assistant assistantMessage: MessageV2.Assistant
sessionID: SessionID sessionID: SessionID
model: Provider.Model model: Provider.Model
abort: AbortSignal
} }
export interface Interface { export interface Interface {
@ -96,7 +88,6 @@ export namespace SessionProcessor {
assistantMessage: input.assistantMessage, assistantMessage: input.assistantMessage,
sessionID: input.sessionID, sessionID: input.sessionID,
model: input.model, model: input.model,
abort: input.abort,
toolcalls: {}, toolcalls: {},
shouldBreak: false, shouldBreak: false,
snapshot: undefined, snapshot: undefined,
@ -105,11 +96,12 @@ export namespace SessionProcessor {
currentText: undefined, currentText: undefined,
reasoningMap: {}, reasoningMap: {},
} }
let aborted = false
const parse = (e: unknown) => const parse = (e: unknown) =>
MessageV2.fromError(e, { MessageV2.fromError(e, {
providerID: input.model.providerID, providerID: input.model.providerID,
aborted: input.abort.aborted, aborted,
}) })
const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) { const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
@ -155,7 +147,10 @@ export namespace SessionProcessor {
return return
case "tool-input-start": case "tool-input-start":
ctx.toolcalls[value.id] = (yield* session.updatePart({ if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
ctx.toolcalls[value.id] = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(), id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
messageID: ctx.assistantMessage.id, messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID, sessionID: ctx.assistantMessage.sessionID,
@ -163,7 +158,7 @@ export namespace SessionProcessor {
tool: value.toolName, tool: value.toolName,
callID: value.id, callID: value.id,
state: { status: "pending", input: {}, raw: "" }, state: { status: "pending", input: {}, raw: "" },
})) as MessageV2.ToolPart } satisfies MessageV2.ToolPart)
return return
case "tool-input-delta": case "tool-input-delta":
@ -173,14 +168,17 @@ export namespace SessionProcessor {
return return
case "tool-call": { case "tool-call": {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
const match = ctx.toolcalls[value.toolCallId] const match = ctx.toolcalls[value.toolCallId]
if (!match) return if (!match) return
ctx.toolcalls[value.toolCallId] = (yield* session.updatePart({ ctx.toolcalls[value.toolCallId] = yield* session.updatePart({
...match, ...match,
tool: value.toolName, tool: value.toolName,
state: { status: "running", input: value.input, time: { start: Date.now() } }, state: { status: "running", input: value.input, time: { start: Date.now() } },
metadata: value.providerMetadata, metadata: value.providerMetadata,
})) as MessageV2.ToolPart } satisfies MessageV2.ToolPart)
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
@ -414,7 +412,7 @@ export namespace SessionProcessor {
}) })
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) }) log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined })
const error = parse(e) const error = parse(e)
if (MessageV2.ContextOverflowError.isInstance(error)) { if (MessageV2.ContextOverflowError.isInstance(error)) {
ctx.needsCompaction = true ctx.needsCompaction = true
@ -429,27 +427,37 @@ export namespace SessionProcessor {
yield* status.set(ctx.sessionID, { type: "idle" }) yield* status.set(ctx.sessionID, { type: "idle" })
}) })
const abort = Effect.fn("SessionProcessor.abort")(() =>
Effect.gen(function* () {
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))
}
if (!ctx.assistantMessage.time.completed) {
yield* cleanup()
return
}
yield* session.updateMessage(ctx.assistantMessage)
}),
)
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) { const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
log.info("process") log.info("process")
ctx.needsCompaction = false ctx.needsCompaction = false
ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
return yield* Effect.gen(function* () {
yield* Effect.gen(function* () { yield* Effect.gen(function* () {
ctx.currentText = undefined ctx.currentText = undefined
ctx.reasoningMap = {} ctx.reasoningMap = {}
const stream = llm.stream(streamInput) const stream = llm.stream(streamInput)
yield* stream.pipe( yield* stream.pipe(
Stream.tap((event) => Stream.tap((event) => handleEvent(event)),
Effect.gen(function* () {
input.abort.throwIfAborted()
yield* handleEvent(event)
}),
),
Stream.takeUntil(() => ctx.needsCompaction), Stream.takeUntil(() => ctx.needsCompaction),
Stream.runDrain, Stream.runDrain,
) )
}).pipe( }).pipe(
Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
Effect.catchCauseIf( Effect.catchCauseIf(
(cause) => !Cause.hasInterruptsOnly(cause), (cause) => !Cause.hasInterruptsOnly(cause),
(cause) => Effect.fail(Cause.squash(cause)), (cause) => Effect.fail(Cause.squash(cause)),
@ -466,35 +474,19 @@ export namespace SessionProcessor {
}), }),
}), }),
), ),
Effect.catchCause((cause) => Effect.catch(halt),
Cause.hasInterruptsOnly(cause)
? halt(new DOMException("Aborted", "AbortError"))
: halt(Cause.squash(cause)),
),
Effect.ensuring(cleanup()), Effect.ensuring(cleanup()),
) )
if (input.abort.aborted && !ctx.assistantMessage.error) { if (aborted && !ctx.assistantMessage.error) {
yield* abort() yield* abort()
} }
if (ctx.needsCompaction) return "compact" if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop" if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
return "continue" return "continue"
}).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid)))
}) })
const abort = Effect.fn("SessionProcessor.abort")(() =>
Effect.gen(function* () {
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))
}
if (!ctx.assistantMessage.time.completed) {
yield* cleanup()
return
}
yield* session.updateMessage(ctx.assistantMessage)
}),
)
return { return {
get message() { get message() {
return ctx.assistantMessage return ctx.assistantMessage
@ -526,29 +518,4 @@ export namespace SessionProcessor {
), ),
), ),
) )
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function create(input: Input): Promise<Info> {
const hit = await runPromise((svc) => svc.create(input))
return {
get message() {
return hit.message
},
partFromToolCall(toolCallID: string) {
return hit.partFromToolCall(toolCallID)
},
async process(streamInput: LLM.StreamInput) {
const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) {
await Effect.runPromise(hit.abort())
return "stop"
}
throw Cause.squash(exit.cause)
}
return exit.value
},
}
}
} }

File diff suppressed because it is too large Load Diff

View File

@ -21,7 +21,7 @@ export namespace SessionRevert {
export type RevertInput = z.infer<typeof RevertInput> export type RevertInput = z.infer<typeof RevertInput>
export async function revert(input: RevertInput) { export async function revert(input: RevertInput) {
SessionPrompt.assertNotBusy(input.sessionID) await SessionPrompt.assertNotBusy(input.sessionID)
const all = await Session.messages({ sessionID: input.sessionID }) const all = await Session.messages({ sessionID: input.sessionID })
let lastUser: MessageV2.User | undefined let lastUser: MessageV2.User | undefined
const session = await Session.get(input.sessionID) const session = await Session.get(input.sessionID)
@ -80,7 +80,7 @@ export namespace SessionRevert {
export async function unrevert(input: { sessionID: SessionID }) { export async function unrevert(input: { sessionID: SessionID }) {
log.info("unreverting", input) log.info("unreverting", input)
SessionPrompt.assertNotBusy(input.sessionID) await SessionPrompt.assertNotBusy(input.sessionID)
const session = await Session.get(input.sessionID) const session = await Session.get(input.sessionID)
if (!session.revert) return session if (!session.revert) return session
if (session.revert.snapshot) await Snapshot.restore(session.revert.snapshot) if (session.revert.snapshot) await Snapshot.restore(session.revert.snapshot)

View File

@ -3,7 +3,6 @@ import z from "zod"
import { Session } from "." import { Session } from "."
import { MessageV2 } from "./message-v2" import { MessageV2 } from "./message-v2"
import { Identifier } from "@/id/id"
import { SessionID, MessageID } from "./schema" import { SessionID, MessageID } from "./schema"
import { Snapshot } from "@/snapshot" import { Snapshot } from "@/snapshot"
@ -110,8 +109,8 @@ export namespace SessionSummary {
(m) => m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID), (m) => m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID),
) )
const msgWithParts = messages.find((m) => m.info.id === input.messageID) const msgWithParts = messages.find((m) => m.info.id === input.messageID)
if (!msgWithParts) return if (!msgWithParts || msgWithParts.info.role !== "user") return
const userMsg = msgWithParts.info as MessageV2.User const userMsg = msgWithParts.info
const diffs = await computeDiff({ messages }) const diffs = await computeDiff({ messages })
userMsg.summary = { userMsg.summary = {
...userMsg.summary, ...userMsg.summary,

View File

@ -46,12 +46,12 @@ export namespace ToolRegistry {
readonly tools: ( readonly tools: (
model: { providerID: ProviderID; modelID: ModelID }, model: { providerID: ProviderID; modelID: ModelID },
agent?: Agent.Info, agent?: Agent.Info,
) => Effect.Effect<(Awaited<ReturnType<Tool.Info["init"]>> & { id: string })[]> ) => Effect.Effect<(Tool.Def & { id: string })[]>
} }
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/ToolRegistry") {} export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/ToolRegistry") {}
export const layer = Layer.effect( export const layer: Layer.Layer<Service, never, Config.Service | Plugin.Service> = Layer.effect(
Service, Service,
Effect.gen(function* () { Effect.gen(function* () {
const config = yield* Config.Service const config = yield* Config.Service
@ -174,7 +174,7 @@ export namespace ToolRegistry {
}) })
return yield* Effect.forEach( return yield* Effect.forEach(
filtered, filtered,
Effect.fnUntraced(function* (tool) { Effect.fnUntraced(function* (tool: Tool.Info) {
using _ = log.time(tool.id) using _ = log.time(tool.id)
const next = yield* Effect.promise(() => tool.init({ agent })) const next = yield* Effect.promise(() => tool.init({ agent }))
const output = { const output = {
@ -184,10 +184,11 @@ export namespace ToolRegistry {
yield* plugin.trigger("tool.definition", { toolID: tool.id }, output) yield* plugin.trigger("tool.definition", { toolID: tool.id }, output)
return { return {
id: tool.id, id: tool.id,
...next,
description: output.description, description: output.description,
parameters: output.parameters, parameters: output.parameters,
} as Awaited<ReturnType<Tool.Info["init"]>> & { id: string } execute: next.execute,
formatValidationError: next.formatValidationError,
}
}), }),
{ concurrency: "unbounded" }, { concurrency: "unbounded" },
) )
@ -217,7 +218,7 @@ export namespace ToolRegistry {
modelID: ModelID modelID: ModelID
}, },
agent?: Agent.Info, agent?: Agent.Info,
): Promise<(Awaited<ReturnType<Tool.Info["init"]>> & { id: string })[]> { ): Promise<(Tool.Def & { id: string })[]> {
return runPromise((svc) => svc.tools(model, agent)) return runPromise((svc) => svc.tools(model, agent))
} }
} }

View File

@ -25,9 +25,7 @@ export namespace Tool {
metadata(input: { title?: string; metadata?: M }): void metadata(input: { title?: string; metadata?: M }): void
ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void> ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void>
} }
export interface Info<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> { export interface Def<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
id: string
init: (ctx?: InitContext) => Promise<{
description: string description: string
parameters: Parameters parameters: Parameters
execute( execute(
@ -40,7 +38,11 @@ export namespace Tool {
attachments?: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] attachments?: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[]
}> }>
formatValidationError?(error: z.ZodError): string formatValidationError?(error: z.ZodError): string
}> }
export interface Info<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
id: string
init: (ctx?: InitContext) => Promise<Def<Parameters, M>>
} }
export type InferParameters<T extends Info> = T extends Info<infer P> ? z.infer<P> : never export type InferParameters<T extends Info> = T extends Info<infer P> ? z.infer<P> : never
@ -48,7 +50,7 @@ export namespace Tool {
export function define<Parameters extends z.ZodType, Result extends Metadata>( export function define<Parameters extends z.ZodType, Result extends Metadata>(
id: string, id: string,
init: Info<Parameters, Result>["init"] | Awaited<ReturnType<Info<Parameters, Result>["init"]>>, init: Info<Parameters, Result>["init"] | Def<Parameters, Result>,
): Info<Parameters, Result> { ): Info<Parameters, Result> {
return { return {
id, id,

View File

@ -0,0 +1,523 @@
import { describe, expect, test } from "bun:test"
import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
import { Runner } from "../../src/effect/runner"
import { it } from "../lib/effect"
describe("Runner", () => {
// --- ensureRunning semantics ---
it.effect(
"ensureRunning starts work and returns result",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const result = yield* runner.ensureRunning(Effect.succeed("hello"))
expect(result).toBe("hello")
expect(runner.state._tag).toBe("Idle")
expect(runner.busy).toBe(false)
}),
)
it.effect(
"ensureRunning propagates work failures",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string, string>(s)
const exit = yield* runner.ensureRunning(Effect.fail("boom")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
expect(runner.state._tag).toBe("Idle")
}),
)
it.effect(
"concurrent callers share the same run",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const calls = yield* Ref.make(0)
const work = Effect.gen(function* () {
yield* Ref.update(calls, (n) => n + 1)
yield* Effect.sleep("10 millis")
return "shared"
})
const [a, b] = yield* Effect.all([runner.ensureRunning(work), runner.ensureRunning(work)], {
concurrency: "unbounded",
})
expect(a).toBe("shared")
expect(b).toBe("shared")
expect(yield* Ref.get(calls)).toBe(1)
}),
)
it.effect(
"concurrent callers all receive same error",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string, string>(s)
const work = Effect.gen(function* () {
yield* Effect.sleep("10 millis")
return yield* Effect.fail("boom")
})
const [a, b] = yield* Effect.all(
[runner.ensureRunning(work).pipe(Effect.exit), runner.ensureRunning(work).pipe(Effect.exit)],
{ concurrency: "unbounded" },
)
expect(Exit.isFailure(a)).toBe(true)
expect(Exit.isFailure(b)).toBe(true)
}),
)
it.effect(
"ensureRunning can be called again after previous run completes",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
expect(yield* runner.ensureRunning(Effect.succeed("first"))).toBe("first")
expect(yield* runner.ensureRunning(Effect.succeed("second"))).toBe("second")
}),
)
it.effect(
"second ensureRunning ignores new work if already running",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const ran = yield* Ref.make<string[]>([])
const first = Effect.gen(function* () {
yield* Ref.update(ran, (a) => [...a, "first"])
yield* Effect.sleep("50 millis")
return "first-result"
})
const second = Effect.gen(function* () {
yield* Ref.update(ran, (a) => [...a, "second"])
return "second-result"
})
const [a, b] = yield* Effect.all([runner.ensureRunning(first), runner.ensureRunning(second)], {
concurrency: "unbounded",
})
expect(a).toBe("first-result")
expect(b).toBe("first-result")
expect(yield* Ref.get(ran)).toEqual(["first"])
}),
)
// --- cancel semantics ---
it.effect(
"cancel interrupts running work",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.busy).toBe(true)
expect(runner.state._tag).toBe("Running")
yield* runner.cancel
expect(runner.busy).toBe(false)
const exit = yield* Fiber.await(fiber)
expect(Exit.isFailure(exit)).toBe(true)
}),
)
it.effect(
"cancel on idle is a no-op",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
yield* runner.cancel
expect(runner.busy).toBe(false)
}),
)
it.effect(
"cancel with onInterrupt resolves callers gracefully",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* runner.cancel
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) expect(exit.value).toBe("fallback")
}),
)
it.effect(
"cancel with queued callers resolves all",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* runner.cancel
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
if (Exit.isSuccess(exitA)) expect(exitA.value).toBe("fallback")
if (Exit.isSuccess(exitB)) expect(exitB.value).toBe("fallback")
}),
)
it.effect(
"work can be started after cancel",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* runner.cancel
yield* Fiber.await(fiber)
const result = yield* runner.ensureRunning(Effect.succeed("after-cancel"))
expect(result).toBe("after-cancel")
}),
)
test("cancel does not deadlock when replacement work starts before interrupted run exits", async () => {
function defer() {
let resolve!: () => void
const promise = new Promise<void>((done) => {
resolve = done
})
return { promise, resolve }
}
function fail(ms: number, msg: string) {
return new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(msg)), ms)
})
}
const s = await Effect.runPromise(Scope.make())
const hit = defer()
const hold = defer()
const done = defer()
try {
const runner = Runner.make<string>(s)
const first = Effect.never.pipe(
Effect.onInterrupt(() => Effect.sync(() => hit.resolve())),
Effect.ensuring(Effect.promise(() => hold.promise)),
Effect.as("first"),
)
const a = Effect.runPromiseExit(runner.ensureRunning(first))
await Bun.sleep(10)
const stop = Effect.runPromise(runner.cancel)
await Promise.race([hit.promise, fail(250, "cancel did not interrupt running work")])
const b = Effect.runPromise(runner.ensureRunning(Effect.promise(() => done.promise).pipe(Effect.as("second"))))
expect(runner.busy).toBe(true)
hold.resolve()
await Promise.race([stop, fail(250, "cancel deadlocked while replacement run was active")])
expect(runner.busy).toBe(true)
done.resolve()
expect(await b).toBe("second")
expect(runner.busy).toBe(false)
const exit = await a
expect(Exit.isFailure(exit)).toBe(true)
} finally {
hold.resolve()
done.resolve()
await Promise.race([Effect.runPromise(Scope.close(s, Exit.void)), fail(1000, "runner scope did not close")])
}
})
// --- shell semantics ---
it.effect(
"shell runs exclusively",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done"))
expect(result).toBe("shell-done")
expect(runner.busy).toBe(false)
}),
)
it.effect(
"shell rejects when run is active",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel
yield* Fiber.await(fiber)
}),
)
it.effect(
"shell rejects when another shell is running",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(sh)
}),
)
it.effect(
"shell rejects via busy callback and cancel still stops the first shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, {
busy: () => {
throw new Error("busy")
},
})
const sh = yield* runner
.startShell((signal) =>
Effect.promise(
() =>
new Promise<string>((resolve) => {
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
}),
),
)
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel
const done = yield* Fiber.await(sh)
expect(Exit.isSuccess(done)).toBe(true)
}),
)
it.effect(
"cancel interrupts shell that ignores abort signal",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const stop = yield* runner.cancel.pipe(Effect.forkChild)
const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
expect(Exit.isSuccess(stopExit)).toBe(true)
expect(runner.busy).toBe(false)
const shellExit = yield* Fiber.await(sh)
expect(Exit.isFailure(shellExit)).toBe(true)
yield* Deferred.succeed(gate, undefined).pipe(Effect.ignore)
}),
)
// --- shell→run handoff ---
it.effect(
"ensureRunning queues behind shell then runs after",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.state._tag).toBe("Shell")
const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.state._tag).toBe("ShellThenRun")
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(sh)
const exit = yield* Fiber.await(run)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) expect(exit.value).toBe("run-result")
expect(runner.state._tag).toBe("Idle")
}),
)
it.effect(
"multiple ensureRunning callers share the queued run behind shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const calls = yield* Ref.make(0)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const work = Effect.gen(function* () {
yield* Ref.update(calls, (n) => n + 1)
return "run"
})
const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(sh)
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
expect(yield* Ref.get(calls)).toBe(1)
}),
)
it.effect(
"cancel during shell_then_run cancels both",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const sh = yield* runner
.startShell((signal) =>
Effect.promise(
() =>
new Promise<string>((resolve) => {
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
}),
),
)
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.state._tag).toBe("ShellThenRun")
yield* runner.cancel
expect(runner.busy).toBe(false)
yield* Fiber.await(sh)
const exit = yield* Fiber.await(run)
expect(Exit.isFailure(exit)).toBe(true)
}),
)
// --- lifecycle callbacks ---
it.effect(
"onIdle fires when returning to idle from running",
Effect.gen(function* () {
const s = yield* Scope.Scope
const count = yield* Ref.make(0)
const runner = Runner.make<string>(s, {
onIdle: Ref.update(count, (n) => n + 1),
})
yield* runner.ensureRunning(Effect.succeed("ok"))
expect(yield* Ref.get(count)).toBe(1)
}),
)
it.effect(
"onIdle fires on cancel",
Effect.gen(function* () {
const s = yield* Scope.Scope
const count = yield* Ref.make(0)
const runner = Runner.make<string>(s, {
onIdle: Ref.update(count, (n) => n + 1),
})
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* runner.cancel
yield* Fiber.await(fiber)
expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
}),
)
it.effect(
"onBusy fires when shell starts",
Effect.gen(function* () {
const s = yield* Scope.Scope
const count = yield* Ref.make(0)
const runner = Runner.make<string>(s, {
onBusy: Ref.update(count, (n) => n + 1),
})
yield* runner.startShell((_signal) => Effect.succeed("done"))
expect(yield* Ref.get(count)).toBe(1)
}),
)
// --- busy flag ---
it.effect(
"busy is true during run",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.busy).toBe(true)
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(fiber)
expect(runner.busy).toBe(false)
}),
)
it.effect(
"busy is true during shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>()
const fiber = yield* runner
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
expect(runner.busy).toBe(true)
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(fiber)
expect(runner.busy).toBe(false)
}),
)
})

View File

@ -1,26 +1,30 @@
import { describe, expect, test } from "bun:test" import { afterEach, describe, expect, test } from "bun:test"
import path from "path"
import { Instance } from "../../src/project/instance" import { Instance } from "../../src/project/instance"
import { Session } from "../../src/session" import { Session } from "../../src/session"
import { Log } from "../../src/util/log" import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false }) Log.init({ print: false })
afterEach(async () => {
await Instance.disposeAll()
})
describe("Session.list", () => { describe("Session.list", () => {
test("filters by directory", async () => { test("filters by directory", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
const first = await Session.create({}) const first = await Session.create({})
const otherDir = path.join(projectRoot, "..", "__session_list_other") await using other = await tmpdir({ git: true })
const second = await Instance.provide({ const second = await Instance.provide({
directory: otherDir, directory: other.path,
fn: async () => Session.create({}), fn: async () => Session.create({}),
}) })
const sessions = [...Session.list({ directory: projectRoot })] const sessions = [...Session.list({ directory: tmp.path })]
const ids = sessions.map((s) => s.id) const ids = sessions.map((s) => s.id)
expect(ids).toContain(first.id) expect(ids).toContain(first.id)
@ -30,8 +34,9 @@ describe("Session.list", () => {
}) })
test("filters root sessions", async () => { test("filters root sessions", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
const root = await Session.create({ title: "root-session" }) const root = await Session.create({ title: "root-session" })
const child = await Session.create({ title: "child-session", parentID: root.id }) const child = await Session.create({ title: "child-session", parentID: root.id })
@ -46,8 +51,9 @@ describe("Session.list", () => {
}) })
test("filters by start time", async () => { test("filters by start time", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
const session = await Session.create({ title: "new-session" }) const session = await Session.create({ title: "new-session" })
const futureStart = Date.now() + 86400000 const futureStart = Date.now() + 86400000
@ -59,8 +65,9 @@ describe("Session.list", () => {
}) })
test("filters by search term", async () => { test("filters by search term", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
await Session.create({ title: "unique-search-term-abc" }) await Session.create({ title: "unique-search-term-abc" })
await Session.create({ title: "other-session-xyz" }) await Session.create({ title: "other-session-xyz" })
@ -75,8 +82,9 @@ describe("Session.list", () => {
}) })
test("respects limit parameter", async () => { test("respects limit parameter", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
await Session.create({ title: "session-1" }) await Session.create({ title: "session-1" })
await Session.create({ title: "session-2" }) await Session.create({ title: "session-2" })

View File

@ -1,15 +1,18 @@
import { describe, expect, test } from "bun:test" import { afterEach, describe, expect, test } from "bun:test"
import path from "path"
import { Instance } from "../../src/project/instance" import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server" import { Server } from "../../src/server/server"
import { Session } from "../../src/session" import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2" import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, type SessionID } from "../../src/session/schema" import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { Log } from "../../src/util/log" import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
const root = path.join(__dirname, "../..")
Log.init({ print: false }) Log.init({ print: false })
afterEach(async () => {
await Instance.disposeAll()
})
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) { async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
const ids = [] as MessageID[] const ids = [] as MessageID[]
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
@ -38,8 +41,9 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D
describe("session messages endpoint", () => { describe("session messages endpoint", () => {
test("returns cursor headers for older pages", async () => { test("returns cursor headers for older pages", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: root, directory: tmp.path,
fn: async () => { fn: async () => {
const session = await Session.create({}) const session = await Session.create({})
const ids = await fill(session.id, 5) const ids = await fill(session.id, 5)
@ -64,8 +68,9 @@ describe("session messages endpoint", () => {
}) })
test("keeps full-history responses when limit is omitted", async () => { test("keeps full-history responses when limit is omitted", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: root, directory: tmp.path,
fn: async () => { fn: async () => {
const session = await Session.create({}) const session = await Session.create({})
const ids = await fill(session.id, 3) const ids = await fill(session.id, 3)
@ -82,8 +87,9 @@ describe("session messages endpoint", () => {
}) })
test("rejects invalid cursors and missing sessions", async () => { test("rejects invalid cursors and missing sessions", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: root, directory: tmp.path,
fn: async () => { fn: async () => {
const session = await Session.create({}) const session = await Session.create({})
const app = Server.Default() const app = Server.Default()
@ -100,8 +106,9 @@ describe("session messages endpoint", () => {
}) })
test("does not truncate large legacy limit requests", async () => { test("does not truncate large legacy limit requests", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: root, directory: tmp.path,
fn: async () => { fn: async () => {
const session = await Session.create({}) const session = await Session.create({})
await fill(session.id, 520) await fill(session.id, 520)
@ -120,7 +127,7 @@ describe("session messages endpoint", () => {
describe("session.prompt_async error handling", () => { describe("session.prompt_async error handling", () => {
test("prompt_async route has error handler for detached prompt call", async () => { test("prompt_async route has error handler for detached prompt call", async () => {
const src = await Bun.file(path.join(import.meta.dir, "../../src/server/routes/session.ts")).text() const src = await Bun.file(new URL("../../src/server/routes/session.ts", import.meta.url)).text()
const start = src.indexOf('"/:sessionID/prompt_async"') const start = src.indexOf('"/:sessionID/prompt_async"')
const end = src.indexOf('"/:sessionID/command"', start) const end = src.indexOf('"/:sessionID/command"', start)
expect(start).toBeGreaterThan(-1) expect(start).toBeGreaterThan(-1)

View File

@ -1,17 +1,21 @@
import { describe, expect, test } from "bun:test" import { afterEach, describe, expect, test } from "bun:test"
import path from "path"
import { Session } from "../../src/session" import { Session } from "../../src/session"
import { Log } from "../../src/util/log" import { Log } from "../../src/util/log"
import { Instance } from "../../src/project/instance" import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server" import { Server } from "../../src/server/server"
import { tmpdir } from "../fixture/fixture"
const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false }) Log.init({ print: false })
afterEach(async () => {
await Instance.disposeAll()
})
describe("tui.selectSession endpoint", () => { describe("tui.selectSession endpoint", () => {
test("should return 200 when called with valid session", async () => { test("should return 200 when called with valid session", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
// #given // #given
const session = await Session.create({}) const session = await Session.create({})
@ -35,8 +39,9 @@ describe("tui.selectSession endpoint", () => {
}) })
test("should return 404 when session does not exist", async () => { test("should return 404 when session does not exist", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
// #given // #given
const nonExistentSessionID = "ses_nonexistent123" const nonExistentSessionID = "ses_nonexistent123"
@ -56,8 +61,9 @@ describe("tui.selectSession endpoint", () => {
}) })
test("should return 400 when session ID format is invalid", async () => { test("should return 400 when session ID format is invalid", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({ await Instance.provide({
directory: projectRoot, directory: tmp.path,
fn: async () => { fn: async () => {
// #given // #given
const invalidSessionID = "invalid_session_id" const invalidSessionID = "invalid_session_id"

View File

@ -129,7 +129,7 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou
} }
function fake( function fake(
input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0], input: Parameters<SessionProcessorModule.SessionProcessor.Interface["create"]>[0],
result: "continue" | "compact", result: "continue" | "compact",
) { ) {
const msg = input.assistantMessage const msg = input.assistantMessage
@ -540,7 +540,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: new AbortController().signal,
auto: false, auto: false,
}), }),
), ),
@ -580,7 +579,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: new AbortController().signal,
auto: false, auto: false,
}), }),
), ),
@ -621,7 +619,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: new AbortController().signal,
auto: true, auto: true,
}), }),
), ),
@ -675,7 +672,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: new AbortController().signal,
auto: true, auto: true,
overflow: true, overflow: true,
}), }),
@ -717,7 +713,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: new AbortController().signal,
auto: true, auto: true,
overflow: true, overflow: true,
}), }),
@ -792,7 +787,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: abort.signal,
auto: false, auto: false,
}), }),
), ),
@ -858,7 +852,6 @@ describe("session.compaction.process", () => {
parentID: msg.id, parentID: msg.id,
messages: msgs, messages: msgs,
sessionID: session.id, sessionID: session.id,
abort: abort.signal,
auto: false, auto: false,
}), }),
), ),
@ -892,6 +885,91 @@ describe("session.compaction.process", () => {
}, },
}) })
}) })
test("does not allow tool calls while generating the summary", async () => {
const stub = llm()
stub.push(
Stream.make(
{ type: "start" } satisfies LLM.Event,
{ type: "tool-input-start", id: "call-1", toolName: "_noop" } satisfies LLM.Event,
{ type: "tool-call", toolCallId: "call-1", toolName: "_noop", input: {} } satisfies LLM.Event,
{
type: "finish-step",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
} satisfies LLM.Event,
{
type: "finish",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
totalUsage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
} satisfies LLM.Event,
),
)
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
const session = await Session.create({})
const msg = await user(session.id, "hello")
const rt = liveRuntime(stub.layer)
try {
const msgs = await Session.messages({ sessionID: session.id })
await rt.runPromise(
SessionCompaction.Service.use((svc) =>
svc.process({
parentID: msg.id,
messages: msgs,
sessionID: session.id,
auto: false,
}),
),
)
const summary = (await Session.messages({ sessionID: session.id })).find(
(item) => item.info.role === "assistant" && item.info.summary,
)
expect(summary?.info.role).toBe("assistant")
expect(summary?.parts.some((part) => part.type === "tool")).toBe(false)
} finally {
await rt.dispose()
}
},
})
})
}) })
describe("util.token.estimate", () => { describe("util.token.estimate", () => {

View File

@ -1,7 +1,9 @@
import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test" import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test"
import path from "path" import path from "path"
import { tool, type ModelMessage } from "ai" import { tool, type ModelMessage } from "ai"
import { Cause, Exit, Stream } from "effect"
import z from "zod" import z from "zod"
import { makeRuntime } from "../../src/effect/run-service"
import { LLM } from "../../src/session/llm" import { LLM } from "../../src/session/llm"
import { Instance } from "../../src/project/instance" import { Instance } from "../../src/project/instance"
import { Provider } from "../../src/provider/provider" import { Provider } from "../../src/provider/provider"
@ -109,7 +111,11 @@ type Capture = {
const state = { const state = {
server: null as ReturnType<typeof Bun.serve> | null, server: null as ReturnType<typeof Bun.serve> | null,
queue: [] as Array<{ path: string; response: Response; resolve: (value: Capture) => void }>, queue: [] as Array<{
path: string
response: Response | ((req: Request, capture: Capture) => Response)
resolve: (value: Capture) => void
}>,
} }
function deferred<T>() { function deferred<T>() {
@ -126,6 +132,58 @@ function waitRequest(pathname: string, response: Response) {
return pending.promise return pending.promise
} }
function timeout(ms: number) {
return new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`timed out after ${ms}ms`)), ms)
})
}
function waitStreamingRequest(pathname: string) {
const request = deferred<Capture>()
const requestAborted = deferred<void>()
const responseCanceled = deferred<void>()
const encoder = new TextEncoder()
state.queue.push({
path: pathname,
resolve: request.resolve,
response(req: Request) {
req.signal.addEventListener("abort", () => requestAborted.resolve(), { once: true })
return new Response(
new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(
encoder.encode(
[
`data: ${JSON.stringify({
id: "chatcmpl-abort",
object: "chat.completion.chunk",
choices: [{ delta: { role: "assistant" } }],
})}`,
].join("\n\n") + "\n\n",
),
)
},
cancel() {
responseCanceled.resolve()
},
}),
{
status: 200,
headers: { "Content-Type": "text/event-stream" },
},
)
},
})
return {
request: request.promise,
requestAborted: requestAborted.promise,
responseCanceled: responseCanceled.promise,
}
}
beforeAll(() => { beforeAll(() => {
state.server = Bun.serve({ state.server = Bun.serve({
port: 0, port: 0,
@ -143,7 +201,9 @@ beforeAll(() => {
return new Response("not found", { status: 404 }) return new Response("not found", { status: 404 })
} }
return next.response return typeof next.response === "function"
? next.response(req, { url, headers: req.headers, body })
: next.response
}, },
}) })
}) })
@ -325,6 +385,162 @@ describe("session.llm.stream", () => {
}) })
}) })
test("raw stream abort signal cancels provider response body promptly", async () => {
const server = state.server
if (!server) throw new Error("Server not initialized")
const providerID = "alibaba"
const modelID = "qwen-plus"
const fixture = await loadFixture(providerID, modelID)
const model = fixture.model
const pending = waitStreamingRequest("/chat/completions")
await using tmp = await tmpdir({
init: async (dir) => {
await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify({
$schema: "https://opencode.ai/config.json",
enabled_providers: [providerID],
provider: {
[providerID]: {
options: {
apiKey: "test-key",
baseURL: `${server.url.origin}/v1`,
},
},
},
}),
)
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const resolved = await Provider.getModel(ProviderID.make(providerID), ModelID.make(model.id))
const sessionID = SessionID.make("session-test-raw-abort")
const agent = {
name: "test",
mode: "primary",
options: {},
permission: [{ permission: "*", pattern: "*", action: "allow" }],
} satisfies Agent.Info
const user = {
id: MessageID.make("user-raw-abort"),
sessionID,
role: "user",
time: { created: Date.now() },
agent: agent.name,
model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
} satisfies MessageV2.User
const ctrl = new AbortController()
const result = await LLM.stream({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
abort: ctrl.signal,
messages: [{ role: "user", content: "Hello" }],
tools: {},
})
const iter = result.fullStream[Symbol.asyncIterator]()
await pending.request
await iter.next()
ctrl.abort()
await Promise.race([pending.responseCanceled, timeout(500)])
await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
await iter.return?.()
},
})
})
test("service stream cancellation cancels provider response body promptly", async () => {
const server = state.server
if (!server) throw new Error("Server not initialized")
const providerID = "alibaba"
const modelID = "qwen-plus"
const fixture = await loadFixture(providerID, modelID)
const model = fixture.model
const pending = waitStreamingRequest("/chat/completions")
await using tmp = await tmpdir({
init: async (dir) => {
await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify({
$schema: "https://opencode.ai/config.json",
enabled_providers: [providerID],
provider: {
[providerID]: {
options: {
apiKey: "test-key",
baseURL: `${server.url.origin}/v1`,
},
},
},
}),
)
},
})
await Instance.provide({
directory: tmp.path,
fn: async () => {
const resolved = await Provider.getModel(ProviderID.make(providerID), ModelID.make(model.id))
const sessionID = SessionID.make("session-test-service-abort")
const agent = {
name: "test",
mode: "primary",
options: {},
permission: [{ permission: "*", pattern: "*", action: "allow" }],
} satisfies Agent.Info
const user = {
id: MessageID.make("user-service-abort"),
sessionID,
role: "user",
time: { created: Date.now() },
agent: agent.name,
model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
} satisfies MessageV2.User
const ctrl = new AbortController()
const { runPromiseExit } = makeRuntime(LLM.Service, LLM.defaultLayer)
const run = runPromiseExit(
(svc) =>
svc
.stream({
user,
sessionID,
model: resolved,
agent,
system: ["You are a helpful assistant."],
messages: [{ role: "user", content: "Hello" }],
tools: {},
})
.pipe(Stream.runDrain),
{ signal: ctrl.signal },
)
await pending.request
ctrl.abort()
await Promise.race([pending.responseCanceled, timeout(500)])
const exit = await run
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterrupts(exit.cause)).toBe(true)
}
await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
},
})
})
test("keeps tools enabled by prompt permissions", async () => { test("keeps tools enabled by prompt permissions", async () => {
const server = state.server const server = state.server
if (!server) { if (!server) {

View File

@ -1,7 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node" import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test" import { expect } from "bun:test"
import { APICallError } from "ai" import { APICallError } from "ai"
import { Effect, Layer, ServiceMap } from "effect" import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream" import * as Stream from "effect/Stream"
import path from "path" import path from "path"
import type { Agent } from "../../src/agent/agent" import type { Agent } from "../../src/agent/agent"
@ -10,7 +10,6 @@ import { Bus } from "../../src/bus"
import { Config } from "../../src/config/config" import { Config } from "../../src/config/config"
import { Permission } from "../../src/permission" import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin" import { Plugin } from "../../src/plugin"
import { Instance } from "../../src/project/instance"
import type { Provider } from "../../src/provider/provider" import type { Provider } from "../../src/provider/provider"
import { ModelID, ProviderID } from "../../src/provider/schema" import { ModelID, ProviderID } from "../../src/provider/schema"
import { Session } from "../../src/session" import { Session } from "../../src/session"
@ -120,21 +119,8 @@ function fail<E>(err: E, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fail(err))) return stream(...items).pipe(Stream.concat(Stream.fail(err)))
} }
function wait(abort: AbortSignal) { function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
return Effect.promise( return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
() =>
new Promise<void>((done) => {
abort.addEventListener("abort", () => done(), { once: true })
}),
)
}
function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(
Stream.concat(
Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
),
)
} }
function model(context: number): Provider.Model { function model(context: number): Provider.Model {
@ -291,13 +277,11 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi") const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const input = { const input = {
@ -313,7 +297,6 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "hi" }], messages: [{ role: "user", content: "hi" }],
tools: {}, tools: {},
} satisfies LLM.StreamInput } satisfies LLM.StreamInput
@ -359,13 +342,11 @@ it.effect("session.processor effect tests stop after token overflow requests com
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact") const parent = yield* user(chat.id, "compact")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(20) const mdl = model(20)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const value = yield* handle.process({ const value = yield* handle.process({
@ -381,7 +362,6 @@ it.effect("session.processor effect tests stop after token overflow requests com
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "compact" }], messages: [{ role: "user", content: "compact" }],
tools: {}, tools: {},
}) })
@ -433,13 +413,11 @@ it.effect("session.processor effect tests reset reasoning state across retries",
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "reason") const parent = yield* user(chat.id, "reason")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const value = yield* handle.process({ const value = yield* handle.process({
@ -455,7 +433,6 @@ it.effect("session.processor effect tests reset reasoning state across retries",
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "reason" }], messages: [{ role: "user", content: "reason" }],
tools: {}, tools: {},
}) })
@ -485,13 +462,11 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "json") const parent = yield* user(chat.id, "json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const value = yield* handle.process({ const value = yield* handle.process({
@ -507,7 +482,6 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "json" }], messages: [{ role: "user", content: "json" }],
tools: {}, tools: {},
}) })
@ -535,13 +509,11 @@ it.effect("session.processor effect tests retry recognized structured json error
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry json") const parent = yield* user(chat.id, "retry json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const value = yield* handle.process({ const value = yield* handle.process({
@ -557,7 +529,6 @@ it.effect("session.processor effect tests retry recognized structured json error
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "retry json" }], messages: [{ role: "user", content: "retry json" }],
tools: {}, tools: {},
}) })
@ -601,7 +572,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry") const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const states: number[] = [] const states: number[] = []
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
@ -612,7 +582,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const value = yield* handle.process({ const value = yield* handle.process({
@ -628,7 +597,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "retry" }], messages: [{ role: "user", content: "retry" }],
tools: {}, tools: {},
}) })
@ -656,13 +624,11 @@ it.effect("session.processor effect tests compact on structured context overflow
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact json") const parent = yield* user(chat.id, "compact json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const value = yield* handle.process({ const value = yield* handle.process({
@ -678,7 +644,6 @@ it.effect("session.processor effect tests compact on structured context overflow
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "compact json" }], messages: [{ role: "user", content: "compact json" }],
tools: {}, tools: {},
}) })
@ -696,7 +661,6 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
(dir) => (dir) =>
Effect.gen(function* () { Effect.gen(function* () {
const ready = defer<void>() const ready = defer<void>()
const seen = defer<void>()
const test = yield* TestLLM const test = yield* TestLLM
const processors = yield* SessionProcessor.Service const processors = yield* SessionProcessor.Service
const session = yield* Session.Service const session = yield* Session.Service
@ -710,17 +674,15 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "tool abort") const parent = yield* user(chat.id, "tool abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const handle = yield* processors.create({ const handle = yield* processors.create({
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const run = Effect.runPromise( const run = yield* handle
handle.process({ .process({
user: { user: {
id: parent.id, id: parent.id,
sessionID: chat.id, sessionID: chat.id,
@ -733,20 +695,25 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "tool abort" }], messages: [{ role: "user", content: "tool abort" }],
tools: {}, tools: {},
}), })
) .pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise) yield* Effect.promise(() => ready.promise)
abort.abort() yield* Fiber.interrupt(run)
const value = yield* Effect.promise(() => run) const exit = yield* Fiber.await(run)
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
expect(value).toBe("stop") expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
}
expect(yield* test.calls).toBe(1) expect(yield* test.calls).toBe(1)
expect(tool?.state.status).toBe("error") expect(tool?.state.status).toBe("error")
if (tool?.state.status === "error") { if (tool?.state.status === "error") {
@ -779,7 +746,6 @@ it.effect("session.processor effect tests record aborted errors and idle state",
const chat = yield* session.create({}) const chat = yield* session.create({})
const parent = yield* user(chat.id, "abort") const parent = yield* user(chat.id, "abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const abort = new AbortController()
const mdl = model(100) const mdl = model(100)
const errs: string[] = [] const errs: string[] = []
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => { const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
@ -792,11 +758,10 @@ it.effect("session.processor effect tests record aborted errors and idle state",
assistantMessage: msg, assistantMessage: msg,
sessionID: chat.id, sessionID: chat.id,
model: mdl, model: mdl,
abort: abort.signal,
}) })
const run = Effect.runPromise( const run = yield* handle
handle.process({ .process({
user: { user: {
id: parent.id, id: parent.id,
sessionID: chat.id, sessionID: chat.id,
@ -809,22 +774,27 @@ it.effect("session.processor effect tests record aborted errors and idle state",
model: mdl, model: mdl,
agent: agent(), agent: agent(),
system: [], system: [],
abort: abort.signal,
messages: [{ role: "user", content: "abort" }], messages: [{ role: "user", content: "abort" }],
tools: {}, tools: {},
}), })
) .pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise) yield* Effect.promise(() => ready.promise)
abort.abort() yield* Fiber.interrupt(run)
const value = yield* Effect.promise(() => run) const exit = yield* Fiber.await(run)
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
yield* Effect.promise(() => seen.promise) yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const state = yield* status.get(chat.id) const state = yield* status.get(chat.id)
off() off()
expect(value).toBe("stop") expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
}
expect(handle.message.error?.name).toBe("MessageAbortedError") expect(handle.message.error?.name).toBe("MessageAbortedError")
expect(stored.info.role).toBe("assistant") expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") { if (stored.info.role === "assistant") {
@ -836,3 +806,67 @@ it.effect("session.processor effect tests record aborted errors and idle state",
{ git: true }, { git: true },
) )
}) })
it.effect("session.processor effect tests mark interruptions aborted without manual abort", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const status = yield* SessionStatus.Service
const test = yield* TestLLM
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "interrupt")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})
const run = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "interrupt" }],
tools: {},
})
.pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const state = yield* status.get(chat.id)
expect(Exit.isFailure(exit)).toBe(true)
expect(handle.message.error?.name).toBe("MessageAbortedError")
expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") {
expect(stored.info.error?.name).toBe("MessageAbortedError")
}
expect(state).toMatchObject({ type: "idle" })
}),
{ git: true },
)
})

View File

@ -0,0 +1,247 @@
import { describe, expect, spyOn, test } from "bun:test"
import { Instance } from "../../src/project/instance"
import { Provider } from "../../src/provider/provider"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { SessionPrompt } from "../../src/session/prompt"
import { SessionStatus } from "../../src/session/status"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
Log.init({ print: false })
function deferred() {
let resolve!: () => void
const promise = new Promise<void>((done) => {
resolve = done
})
return { promise, resolve }
}
// Helper: seed a session with a user message + finished assistant message
// so loop() exits immediately without calling any LLM
async function seed(sessionID: SessionID) {
const userMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "user",
sessionID,
time: { created: Date.now() },
agent: "build",
model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
}
await Session.updateMessage(userMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: userMsg.id,
sessionID,
type: "text",
text: "hello",
})
const assistantMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "assistant",
parentID: userMsg.id,
sessionID,
mode: "build",
agent: "build",
cost: 0,
path: { cwd: "/tmp", root: "/tmp" },
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: "gpt-5.2" as any,
providerID: "openai" as any,
time: { created: Date.now(), completed: Date.now() },
finish: "stop",
}
await Session.updateMessage(assistantMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: assistantMsg.id,
sessionID,
type: "text",
text: "hi there",
})
return { userMsg, assistantMsg }
}
describe("session.prompt concurrency", () => {
test("loop returns assistant message and sets status to idle", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
await seed(session.id)
const result = await SessionPrompt.loop({ sessionID: session.id })
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
const status = await SessionStatus.get(session.id)
expect(status.type).toBe("idle")
},
})
})
test("concurrent loop callers get the same result", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
await seed(session.id)
const [a, b] = await Promise.all([
SessionPrompt.loop({ sessionID: session.id }),
SessionPrompt.loop({ sessionID: session.id }),
])
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
},
})
})
test("assertNotBusy throws when loop is running", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const userMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "user",
sessionID: session.id,
time: { created: Date.now() },
agent: "build",
model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
}
await Session.updateMessage(userMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: userMsg.id,
sessionID: session.id,
type: "text",
text: "hello",
})
const ready = deferred()
const gate = deferred()
const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
ready.resolve()
await gate.promise
throw new Error("test stop")
})
try {
const loopPromise = SessionPrompt.loop({ sessionID: session.id }).catch(() => undefined)
await ready.promise
await expect(SessionPrompt.assertNotBusy(session.id)).rejects.toBeInstanceOf(Session.BusyError)
gate.resolve()
await loopPromise
} finally {
gate.resolve()
getModel.mockRestore()
}
// After loop completes, assertNotBusy should succeed
await SessionPrompt.assertNotBusy(session.id)
},
})
})
test("cancel sets status to idle", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
// Seed only a user message — loop must call getModel to proceed
const userMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "user",
sessionID: session.id,
time: { created: Date.now() },
agent: "build",
model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
}
await Session.updateMessage(userMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: userMsg.id,
sessionID: session.id,
type: "text",
text: "hello",
})
// Also seed an assistant message so lastAssistant() fallback can find it
const assistantMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "assistant",
parentID: userMsg.id,
sessionID: session.id,
mode: "build",
agent: "build",
cost: 0,
path: { cwd: "/tmp", root: "/tmp" },
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: "gpt-5.2" as any,
providerID: "openai" as any,
time: { created: Date.now() },
}
await Session.updateMessage(assistantMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: assistantMsg.id,
sessionID: session.id,
type: "text",
text: "hi there",
})
const ready = deferred()
const gate = deferred()
const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
ready.resolve()
await gate.promise
throw new Error("test stop")
})
try {
// Start loop — it will block in getModel (assistant has no finish, so loop continues)
const loopPromise = SessionPrompt.loop({ sessionID: session.id })
await ready.promise
await SessionPrompt.cancel(session.id)
const status = await SessionStatus.get(session.id)
expect(status.type).toBe("idle")
// loop should resolve cleanly, not throw "All fibers interrupted"
const result = await loopPromise
expect(result.info.role).toBe("assistant")
expect(result.info.id).toBe(assistantMsg.id)
} finally {
gate.resolve()
getModel.mockRestore()
}
},
})
}, 10000)
test("cancel on idle session just sets idle", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
await SessionPrompt.cancel(session.id)
const status = await SessionStatus.get(session.id)
expect(status.type).toBe("idle")
},
})
})
})

File diff suppressed because it is too large Load Diff