From bb039496d52acba530bf02eea347da71c4c93a80 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 31 Mar 2026 13:47:17 -0400 Subject: [PATCH] refactor: propagate Instance context through Effect fibers via InstanceRef Add a ServiceMap.Reference that carries InstanceContext through the Effect service graph so child fibers retain instance context even when resumed by external I/O events outside the ALS boundary. - Add InstanceRef to instance-state.ts; InstanceState.get/has/invalidate try the Reference first, fall back to ALS - makeRuntime automatically captures ALS into InstanceRef at the boundary - provideInstance (test fixture) sets InstanceRef for Effect.runPromiseWith - Remove all redundant provideInstance(dir) wrappers from prompt tests - Fix test/lib/effect.ts type params (drop unnecessary S/T generics) --- .../opencode/src/effect/instance-state.ts | 24 +- packages/opencode/src/effect/run-service.ts | 22 +- .../test/effect/instance-state.test.ts | 101 ++- packages/opencode/test/fixture/fixture.ts | 4 +- packages/opencode/test/lib/effect.ts | 6 +- .../test/session/prompt-effect.test.ts | 722 ++++++------------ 6 files changed, 391 insertions(+), 488 deletions(-) diff --git a/packages/opencode/src/effect/instance-state.ts b/packages/opencode/src/effect/instance-state.ts index 6873ec255c..ebf49d9986 100644 --- a/packages/opencode/src/effect/instance-state.ts +++ b/packages/opencode/src/effect/instance-state.ts @@ -1,9 +1,23 @@ -import { Effect, ScopedCache, Scope } from "effect" +import { Effect, ScopedCache, Scope, ServiceMap } from "effect" import { Instance, type InstanceContext } from "@/project/instance" import { registerDisposer } from "./instance-registry" const TypeId = "~opencode/InstanceState" +export const InstanceRef = ServiceMap.Reference("~opencode/InstanceRef", { + defaultValue: () => undefined, +}) + +const context = Effect.gen(function* () { + const ref = yield* InstanceRef + return ref ?? Instance.current +}) + +const directory = Effect.gen(function* () { + const ref = yield* InstanceRef + return ref ? ref.directory : Instance.directory +}) + export interface InstanceState { readonly [TypeId]: typeof TypeId readonly cache: ScopedCache.ScopedCache @@ -16,7 +30,7 @@ export namespace InstanceState { Effect.gen(function* () { const cache = yield* ScopedCache.make({ capacity: Number.POSITIVE_INFINITY, - lookup: () => init(Instance.current), + lookup: () => Effect.gen(function* () { return yield* init(yield* context) }), }) const off = registerDisposer((directory) => Effect.runPromise(ScopedCache.invalidate(cache, directory))) @@ -29,7 +43,7 @@ export namespace InstanceState { }) export const get = (self: InstanceState) => - Effect.suspend(() => ScopedCache.get(self.cache, Instance.directory)) + Effect.gen(function* () { return yield* ScopedCache.get(self.cache, yield* directory) }) export const use = (self: InstanceState, select: (value: A) => B) => Effect.map(get(self), select) @@ -40,8 +54,8 @@ export namespace InstanceState { ) => Effect.flatMap(get(self), select) export const has = (self: InstanceState) => - Effect.suspend(() => ScopedCache.has(self.cache, Instance.directory)) + Effect.gen(function* () { return yield* ScopedCache.has(self.cache, yield* directory) }) export const invalidate = (self: InstanceState) => - Effect.suspend(() => ScopedCache.invalidate(self.cache, Instance.directory)) + Effect.gen(function* () { return yield* ScopedCache.invalidate(self.cache, yield* directory) }) } diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 2daa29fde8..164f9d05c7 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -1,19 +1,31 @@ import { Effect, Layer, ManagedRuntime } from "effect" import * as ServiceMap from "effect/ServiceMap" +import { Instance } from "@/project/instance" +import { InstanceRef } from "./instance-state" export const memoMap = Layer.makeMemoMapUnsafe() +function provide(effect: Effect.Effect): Effect.Effect { + try { + const ctx = Instance.current + return Effect.provideService(effect, InstanceRef, ctx) + } catch { + return effect + } +} + export function makeRuntime(service: ServiceMap.Service, layer: Layer.Layer) { let rt: ManagedRuntime.ManagedRuntime | undefined const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap })) return { - runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(service.use(fn)), + runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(provide(service.use(fn))), runPromiseExit: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => - getRuntime().runPromiseExit(service.use(fn), options), + getRuntime().runPromiseExit(provide(service.use(fn)), options), runPromise: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => - getRuntime().runPromise(service.use(fn), options), - runFork: (fn: (svc: S) => Effect.Effect) => getRuntime().runFork(service.use(fn)), - runCallback: (fn: (svc: S) => Effect.Effect) => getRuntime().runCallback(service.use(fn)), + getRuntime().runPromise(provide(service.use(fn)), options), + runFork: (fn: (svc: S) => Effect.Effect) => getRuntime().runFork(provide(service.use(fn))), + runCallback: (fn: (svc: S) => Effect.Effect) => + getRuntime().runCallback(provide(service.use(fn))), } } diff --git a/packages/opencode/test/effect/instance-state.test.ts b/packages/opencode/test/effect/instance-state.test.ts index 2d527482ba..b6b590c6af 100644 --- a/packages/opencode/test/effect/instance-state.test.ts +++ b/packages/opencode/test/effect/instance-state.test.ts @@ -1,6 +1,6 @@ import { afterEach, expect, test } from "bun:test" -import { Duration, Effect, Layer, ManagedRuntime, ServiceMap } from "effect" -import { InstanceState } from "../../src/effect/instance-state" +import { Cause, Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, ServiceMap } from "effect" +import { InstanceRef, InstanceState } from "../../src/effect/instance-state" import { Instance } from "../../src/project/instance" import { tmpdir } from "../fixture/fixture" @@ -382,3 +382,100 @@ test("InstanceState dedupes concurrent lookups", async () => { ), ) }) + +test("InstanceState survives deferred resume from the same instance context", async () => { + await using tmp = await tmpdir({ git: true }) + + interface Api { + readonly get: (gate: Deferred.Deferred) => Effect.Effect + } + + class Test extends ServiceMap.Service()("@test/DeferredResume") { + static readonly layer = Layer.effect( + Test, + Effect.gen(function* () { + const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory)) + + return Test.of({ + get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred) { + yield* Deferred.await(gate) + return yield* InstanceState.get(state) + }), + }) + }), + ) + } + + const rt = ManagedRuntime.make(Test.layer) + + try { + const gate = await Effect.runPromise(Deferred.make()) + const fiber = await Instance.provide({ + directory: tmp.path, + fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))), + }) + + await Instance.provide({ + directory: tmp.path, + fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)), + }) + const exit = await Effect.runPromise(Fiber.await(fiber)) + + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value).toBe(tmp.path) + } + } finally { + await rt.dispose() + } +}) + +test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => { + await using tmp = await tmpdir({ git: true }) + + interface Api { + readonly get: (gate: Deferred.Deferred) => Effect.Effect + } + + class Test extends ServiceMap.Service()("@test/DeferredResumeOutside") { + static readonly layer = Layer.effect( + Test, + Effect.gen(function* () { + const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory)) + + return Test.of({ + get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred) { + yield* Deferred.await(gate) + return yield* InstanceState.get(state) + }), + }) + }), + ) + } + + const rt = ManagedRuntime.make(Test.layer) + + try { + const gate = await Effect.runPromise(Deferred.make()) + // Provide InstanceRef so the fiber carries the context even when + // the deferred is resolved from outside Instance.provide ALS. + const fiber = await Instance.provide({ + directory: tmp.path, + fn: () => + Promise.resolve( + rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))), + ), + }) + + // Resume from outside any Instance.provide — ALS is NOT set here + await Effect.runPromise(Deferred.succeed(gate, void 0)) + const exit = await Effect.runPromise(Fiber.await(fiber)) + + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value).toBe(tmp.path) + } + } finally { + await rt.dispose() + } +}) diff --git a/packages/opencode/test/fixture/fixture.ts b/packages/opencode/test/fixture/fixture.ts index 96f5e3f084..99b3852715 100644 --- a/packages/opencode/test/fixture/fixture.ts +++ b/packages/opencode/test/fixture/fixture.ts @@ -7,6 +7,7 @@ import type * as PlatformError from "effect/PlatformError" import type * as Scope from "effect/Scope" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import type { Config } from "../../src/config/config" +import { InstanceRef } from "../../src/effect/instance-state" import { Instance } from "../../src/project/instance" import { TestLLMServer } from "../lib/llm-server" @@ -114,7 +115,8 @@ export const provideInstance = Effect.promise(async () => Instance.provide({ directory, - fn: () => Effect.runPromiseWith(services)(self), + fn: () => + Effect.runPromiseWith(services)(self.pipe(Effect.provideService(InstanceRef, Instance.current))), }), ), ) diff --git a/packages/opencode/test/lib/effect.ts b/packages/opencode/test/lib/effect.ts index 20a5d69cc8..131ec5cc6b 100644 --- a/packages/opencode/test/lib/effect.ts +++ b/packages/opencode/test/lib/effect.ts @@ -8,7 +8,7 @@ type Body = Effect.Effect | (() => Effect.Effect) const body = (value: Body) => Effect.suspend(() => (typeof value === "function" ? value() : value)) -const run = (value: Body, layer: Layer.Layer) => +const run = (value: Body, layer: Layer.Layer) => Effect.gen(function* () { const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit) if (Exit.isFailure(exit)) { @@ -19,7 +19,7 @@ const run = (value: Body, layer: Layer.La return yield* exit }).pipe(Effect.runPromise) -const make = (testLayer: Layer.Layer, liveLayer: Layer.Layer) => { +const make = (testLayer: Layer.Layer, liveLayer: Layer.Layer) => { const effect = (name: string, value: Body, opts?: number | TestOptions) => test(name, () => run(value, testLayer), opts) @@ -49,5 +49,5 @@ const liveEnv = TestConsole.layer export const it = make(testEnv, liveEnv) -export const testEffect = (layer: Layer.Layer) => +export const testEffect = (layer: Layer.Layer) => make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv)) diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 567bf26e1b..3dd1754d43 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -1,7 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect, spyOn } from "bun:test" -import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect" -import * as Stream from "effect/Stream" +import { Cause, Effect, Exit, Fiber, Layer } from "effect" import z from "zod" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -42,105 +41,6 @@ const ref = { modelID: ModelID.make("test-model"), } -type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) - -class TestLLM extends ServiceMap.Service< - TestLLM, - { - readonly push: (stream: Script) => Effect.Effect - readonly reply: (...items: LLM.Event[]) => Effect.Effect - readonly calls: Effect.Effect - readonly inputs: Effect.Effect - } ->()("@test/PromptLLM") {} - -function stream(...items: LLM.Event[]) { - return Stream.make(...items) -} - -function usage(input = 1, output = 1, total = input + output) { - return { - inputTokens: input, - outputTokens: output, - totalTokens: total, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - } -} - -function start(): LLM.Event { - return { type: "start" } -} - -function textStart(id = "t"): LLM.Event { - return { type: "text-start", id } -} - -function textDelta(id: string, text: string): LLM.Event { - return { type: "text-delta", id, text } -} - -function textEnd(id = "t"): LLM.Event { - return { type: "text-end", id } -} - -function finishStep(): LLM.Event { - return { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - } -} - -function finish(): LLM.Event { - return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() } -} - -function finishToolCallsStep(): LLM.Event { - return { - type: "finish-step", - finishReason: "tool-calls", - rawFinishReason: "tool_calls", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - } -} - -function finishToolCalls(): LLM.Event { - return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() } -} - -function replyStop(text: string, id = "t") { - return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const -} - -function replyToolCalls(text: string, id = "t") { - return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const -} - -function toolInputStart(id: string, toolName: string): LLM.Event { - return { type: "tool-input-start", id, toolName } -} - -function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event { - return { type: "tool-call", toolCallId, toolName, input } -} - -function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) { - return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never))) -} - function defer() { let resolve!: (value: T | PromiseLike) => void const promise = new Promise((done) => { @@ -149,10 +49,6 @@ function defer() { return { promise, resolve } } -function waitMs(ms: number) { - return Effect.promise(() => new Promise((done) => setTimeout(done, ms))) -} - function withSh(fx: () => Effect.Effect) { return Effect.acquireUseRelease( Effect.sync(() => { @@ -190,43 +86,6 @@ function errorTool(parts: MessageV2.Part[]) { return part?.state.status === "error" ? (part as ErrorToolPart) : undefined } -const llm = Layer.unwrap( - Effect.gen(function* () { - const queue: Script[] = [] - const inputs: LLM.StreamInput[] = [] - let calls = 0 - - const push = Effect.fn("TestLLM.push")((item: Script) => { - queue.push(item) - return Effect.void - }) - - const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items))) - return Layer.mergeAll( - Layer.succeed( - LLM.Service, - LLM.Service.of({ - stream: (input) => { - calls += 1 - inputs.push(input) - const item = queue.shift() ?? Stream.empty - return typeof item === "function" ? item(input) : item - }, - }), - ), - Layer.succeed( - TestLLM, - TestLLM.of({ - push, - reply, - calls: Effect.sync(() => calls), - inputs: Effect.sync(() => [...inputs]), - }), - ), - ) - }), -) - const mcp = Layer.succeed( MCP.Service, MCP.Service.of({ @@ -282,33 +141,6 @@ const filetime = Layer.succeed( const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) -const deps = Layer.mergeAll( - Session.defaultLayer, - Snapshot.defaultLayer, - AgentSvc.defaultLayer, - Command.defaultLayer, - Permission.layer, - Plugin.defaultLayer, - Config.defaultLayer, - filetime, - lsp, - mcp, - AppFileSystem.defaultLayer, - status, - llm, -).pipe(Layer.provideMerge(infra)) -const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps)) -const trunc = Truncate.layer.pipe(Layer.provideMerge(deps)) -const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) -const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps)) -const env = SessionPrompt.layer.pipe( - Layer.provideMerge(compact), - Layer.provideMerge(proc), - Layer.provideMerge(registry), - Layer.provideMerge(trunc), - Layer.provideMerge(deps), -) - function makeHttp() { const deps = Layer.mergeAll( Session.defaultLayer, @@ -341,9 +173,8 @@ function makeHttp() { ) } -const it = testEffect(env) -const http = testEffect(makeHttp()) -const unix = process.platform !== "win32" ? it.effect : it.effect.skip +const it = testEffect(makeHttp()) +const unix = process.platform !== "win32" ? it.live : it.live.skip // Config that registers a custom "test" provider with a "test-model" model // so Provider.getModel("test", "test-model") succeeds inside the loop. @@ -457,32 +288,32 @@ const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) => }) const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) { - const test = yield* TestLLM const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create(input ?? { title: "Pinned" }) - return { test, prompt, sessions, chat } + return { prompt, sessions, chat } }) // Loop semantics it.live("loop exits immediately when last assistant has stop finish", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* seed(chat.id, { finish: "stop" }) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* seed(chat.id, { finish: "stop" }) - const result = yield* prompt.loop({ sessionID: chat.id }) - expect(result.info.role).toBe("assistant") - if (result.info.role === "assistant") expect(result.info.finish).toBe("stop") - expect(yield* test.calls).toBe(0) - }), - { git: true }, + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") expect(result.info.finish).toBe("stop") + expect(yield* llm.calls).toBe(0) + }), + { git: true, config: providerCfg }, ), ) -http.live("loop calls LLM and returns assistant message", () => +it.live("loop calls LLM and returns assistant message", () => provideTmpdirServer( Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service @@ -509,7 +340,7 @@ http.live("loop calls LLM and returns assistant message", () => ), ) -http.live("loop continues when finish is tool-calls", () => +it.live("loop continues when finish is tool-calls", () => provideTmpdirServer( Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service @@ -540,97 +371,78 @@ http.live("loop continues when finish is tool-calls", () => ) it.live("failed subtask preserves metadata on error tool state", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot({ title: "Pinned" }) - yield* test.reply( - start(), - toolInputStart("task-1", "task"), - toolCall("task-1", "task", { - description: "inspect bug", - prompt: "look into the cache key path", - subagent_type: "general", - }), - { - type: "finish-step", - finishReason: "tool-calls", - rawFinishReason: "tool_calls", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - }, - { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }, - ) - yield* test.reply(...replyStop("done")) - const msg = yield* user(chat.id, "hello") - yield* addSubtask(chat.id, msg.id) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.tool("task", { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + }) + yield* llm.text("done") + const msg = yield* user(chat.id, "hello") + yield* addSubtask(chat.id, msg.id) - const result = yield* prompt.loop({ sessionID: chat.id }) - expect(result.info.role).toBe("assistant") - expect(yield* test.calls).toBe(2) + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + expect(yield* llm.calls).toBe(2) - const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) - const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") - expect(taskMsg?.info.role).toBe("assistant") - if (!taskMsg || taskMsg.info.role !== "assistant") return + const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") + expect(taskMsg?.info.role).toBe("assistant") + if (!taskMsg || taskMsg.info.role !== "assistant") return - const tool = errorTool(taskMsg.parts) - if (!tool) return + const tool = errorTool(taskMsg.parts) + if (!tool) return - expect(tool.state.error).toContain("Tool execution failed") - expect(tool.state.metadata).toBeDefined() - expect(tool.state.metadata?.sessionId).toBeDefined() - expect(tool.state.metadata?.model).toEqual({ - providerID: ProviderID.make("test"), - modelID: ModelID.make("missing-model"), - }) - }), + expect(tool.state.error).toContain("Tool execution failed") + expect(tool.state.metadata).toBeDefined() + expect(tool.state.metadata?.sessionId).toBeDefined() + expect(tool.state.metadata?.model).toEqual({ + providerID: ProviderID.make("test"), + modelID: ModelID.make("missing-model"), + }) + }), { git: true, - config: { - ...cfg, + config: (url) => ({ + ...providerCfg(url), agent: { general: { model: "test/missing-model", }, }, - }, + }), }, ), ) -it.live("loop sets status to busy then idle", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const test = yield* TestLLM +it.live( + "loop sets status to busy then idle", + () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service - const bus = yield* Bus.Service + const status = yield* SessionStatus.Service - yield* test.reply(start(), textStart(), textDelta("t", "ok"), textEnd(), finishStep(), finish()) + yield* llm.hang const chat = yield* sessions.create({}) yield* user(chat.id, "hi") - const types: string[] = [] - const idle = defer() - const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { - if (evt.properties.sessionID !== chat.id) return - types.push(evt.properties.status.type) - if (evt.properties.status.type === "idle") idle.resolve() - }) - - yield* prompt.loop({ sessionID: chat.id }) - yield* Effect.promise(() => idle.promise) - off() - - expect(types).toContain("busy") - expect(types[types.length - 1]).toBe("idle") + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + expect((yield* status.get(chat.id)).type).toBe("busy") + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + expect((yield* status.get(chat.id)).type).toBe("idle") }), - { git: true, config: cfg }, - ), + { git: true, config: providerCfg }, + ), + 3_000, ) // Cancel semantics @@ -638,66 +450,57 @@ it.live("loop sets status to busy then idle", () => it.live( "cancel interrupts loop and resolves with an assistant message", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* seed(chat.id) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* seed(chat.id) - // Make LLM hang so the loop blocks - yield* test.push((input) => hang(input, start())) + yield* llm.hang - // Seed a new user message so the loop enters the LLM path - yield* user(chat.id, "more") + yield* user(chat.id, "more") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - // Give the loop time to start - yield* waitMs(200) - yield* prompt.cancel(chat.id) - - const exit = yield* Fiber.await(fiber) - expect(Exit.isSuccess(exit)).toBe(true) - if (Exit.isSuccess(exit)) { - expect(exit.value.info.role).toBe("assistant") - } - }), - { git: true, config: cfg }, + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + yield* prompt.cancel(chat.id) + const exit = yield* Fiber.await(fiber) + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value.info.role).toBe("assistant") + } + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) it.live( "cancel records MessageAbortedError on interrupted process", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const { test, prompt, chat } = yield* boot() + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + yield* user(chat.id, "hello") - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) - yield* user(chat.id, "hello") - - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) - yield* prompt.cancel(chat.id) - - const exit = yield* Fiber.await(fiber) - expect(Exit.isSuccess(exit)).toBe(true) - if (Exit.isSuccess(exit)) { - const info = exit.value.info - if (info.role === "assistant") { - expect(info.error?.name).toBe("MessageAbortedError") - } + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + yield* prompt.cancel(chat.id) + const exit = yield* Fiber.await(fiber) + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + const info = exit.value.info + if (info.role === "assistant") { + expect(info.error?.name).toBe("MessageAbortedError") } - }), - { git: true, config: cfg }, + } + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) it.live( @@ -766,37 +569,30 @@ it.live( it.live( "cancel with queued callers resolves all cleanly", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const { test, prompt, chat } = yield* boot() + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + yield* user(chat.id, "hello") - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) - yield* user(chat.id, "hello") + const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) - const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) - // Queue a second caller - const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) - - yield* prompt.cancel(chat.id) - - const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) - expect(Exit.isSuccess(exitA)).toBe(true) - expect(Exit.isSuccess(exitB)).toBe(true) - if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) { - expect(exitA.value.info.id).toBe(exitB.value.info.id) - } - }), - { git: true, config: cfg }, + yield* prompt.cancel(chat.id) + const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) + expect(Exit.isSuccess(exitA)).toBe(true) + expect(Exit.isSuccess(exitB)).toBe(true) + if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) { + expect(exitA.value.info.id).toBe(exitB.value.info.id) + } + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) // Queue semantics @@ -820,13 +616,16 @@ it.live("concurrent loop callers get same result", () => ), ) -it.live("concurrent loop callers all receive same error result", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() +it.live( + "concurrent loop callers all receive same error result", + () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) - yield* test.push(Stream.fail(new Error("boom"))) + yield* llm.fail("boom") yield* user(chat.id, "hello") const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], { @@ -842,125 +641,107 @@ it.live("concurrent loop callers all receive same error result", () => expect(b.info.error).toBeDefined() } }), - { git: true, config: cfg }, - ), + { git: true, config: providerCfg }, + ), + 3_000, ) it.live( "prompt submitted during an active run is included in the next LLM input", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const gate = defer() - const { test, prompt, sessions, chat } = yield* boot() + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const gate = defer() + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) - yield* test.push((_input) => - stream(start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - Stream.concat( - Stream.fromEffect(Effect.promise(() => gate.promise)).pipe( - Stream.flatMap(() => - stream(textStart("a"), textDelta("a", "first"), textEnd("a"), finishStep(), finish()), - ), - ), - ), - ), - ) + yield* llm.hold("first", gate.promise) + yield* llm.text("second") - const a = yield* prompt - .prompt({ - sessionID: chat.id, - agent: "build", - model: ref, - parts: [{ type: "text", text: "first" }], - }) - .pipe(Effect.forkChild) - - yield* Effect.promise(() => ready.promise) - - const id = MessageID.ascending() - const b = yield* prompt - .prompt({ - sessionID: chat.id, - messageID: id, - agent: "build", - model: ref, - parts: [{ type: "text", text: "second" }], - }) - .pipe(Effect.forkChild) - - yield* Effect.promise(async () => { - const end = Date.now() + 5000 - while (Date.now() < end) { - const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id })) - if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return - await new Promise((done) => setTimeout(done, 20)) - } - throw new Error("timed out waiting for second prompt to save") + const a = yield* prompt + .prompt({ + sessionID: chat.id, + agent: "build", + model: ref, + parts: [{ type: "text", text: "first" }], }) + .pipe(Effect.forkChild) - yield* test.reply(...replyStop("second")) - gate.resolve() + yield* llm.wait(1) - const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) - expect(Exit.isSuccess(ea)).toBe(true) - expect(Exit.isSuccess(eb)).toBe(true) - expect(yield* test.calls).toBe(2) + const id = MessageID.ascending() + const b = yield* prompt + .prompt({ + sessionID: chat.id, + messageID: id, + agent: "build", + model: ref, + parts: [{ type: "text", text: "second" }], + }) + .pipe(Effect.forkChild) - const msgs = yield* sessions.messages({ sessionID: chat.id }) - const assistants = msgs.filter((msg) => msg.info.role === "assistant") - expect(assistants).toHaveLength(2) - const last = assistants.at(-1) - if (!last || last.info.role !== "assistant") throw new Error("expected second assistant") - expect(last.info.parentID).toBe(id) - expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) + yield* Effect.promise(async () => { + const end = Date.now() + 5000 + while (Date.now() < end) { + const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id })) + if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return + await new Promise((done) => setTimeout(done, 20)) + } + throw new Error("timed out waiting for second prompt to save") + }) - const inputs = yield* test.inputs - expect(inputs).toHaveLength(2) - expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second") - }), - { git: true, config: cfg }, + gate.resolve() + + const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) + expect(Exit.isSuccess(ea)).toBe(true) + expect(Exit.isSuccess(eb)).toBe(true) + expect(yield* llm.calls).toBe(2) + + const msgs = yield* sessions.messages({ sessionID: chat.id }) + const assistants = msgs.filter((msg) => msg.info.role === "assistant") + expect(assistants).toHaveLength(2) + const last = assistants.at(-1) + if (!last || last.info.role !== "assistant") throw new Error("expected second assistant") + expect(last.info.parentID).toBe(id) + expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) + + const inputs = yield* llm.inputs + expect(inputs).toHaveLength(2) + expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second") + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) it.live( "assertNotBusy throws BusyError when loop running", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + yield* llm.hang - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + const chat = yield* sessions.create({}) + yield* user(chat.id, "hi") - const chat = yield* sessions.create({}) - yield* user(chat.id, "hi") + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } - const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit) - expect(Exit.isFailure(exit)).toBe(true) - if (Exit.isFailure(exit)) { - expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) - } - - yield* prompt.cancel(chat.id) - yield* Fiber.await(fiber) - }), - { git: true, config: cfg }, + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) it.live("assertNotBusy succeeds when idle", () => @@ -983,34 +764,31 @@ it.live("assertNotBusy succeeds when idle", () => it.live( "shell rejects with BusyError when loop running", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const { test, prompt, chat } = yield* boot() + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + yield* user(chat.id, "hi") - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) - yield* user(chat.id, "hi") + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + const exit = yield* prompt + .shell({ sessionID: chat.id, agent: "build", command: "echo hi" }) + .pipe(Effect.exit) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } - const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit) - expect(Exit.isFailure(exit)).toBe(true) - if (Exit.isFailure(exit)) { - expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) - } - - yield* prompt.cancel(chat.id) - yield* Fiber.await(fiber) - }), - { git: true, config: cfg }, + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) unix("shell captures stdout and stderr in completed tool output", () => @@ -1072,7 +850,7 @@ unix( 30_000, ) -http.live( +it.live( "loop waits while shell runs and starts after shell exits", () => provideTmpdirServer( @@ -1088,15 +866,15 @@ http.live( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) - const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) + const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) expect(yield* llm.calls).toBe(0) yield* Fiber.await(sh) - const exit = yield* Fiber.await(run) + const exit = yield* Fiber.await(loop) expect(Exit.isSuccess(exit)).toBe(true) if (Exit.isSuccess(exit)) { @@ -1110,7 +888,7 @@ http.live( 3_000, ) -http.live( +it.live( "shell completion resumes queued loop callers", () => provideTmpdirServer( @@ -1126,11 +904,11 @@ http.live( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) expect(yield* llm.calls).toBe(0) @@ -1162,7 +940,7 @@ unix( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) yield* prompt.cancel(chat.id) @@ -1199,7 +977,7 @@ unix( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) yield* prompt.cancel(chat.id) @@ -1230,14 +1008,14 @@ unix( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) - const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) + const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) yield* prompt.cancel(chat.id) - const exit = yield* Fiber.await(run) + const exit = yield* Fiber.await(loop) expect(Exit.isSuccess(exit)).toBe(true) yield* Fiber.await(sh) @@ -1259,7 +1037,7 @@ unix( const a = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) const exit = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "echo hi" })