refactor: propagate Instance context through Effect fibers via InstanceRef

Add a ServiceMap.Reference that carries InstanceContext through the
Effect service graph so child fibers retain instance context even when
resumed by external I/O events outside the ALS boundary.

- Add InstanceRef to instance-state.ts; InstanceState.get/has/invalidate
  try the Reference first, fall back to ALS
- makeRuntime automatically captures ALS into InstanceRef at the boundary
- provideInstance (test fixture) sets InstanceRef for Effect.runPromiseWith
- Remove all redundant provideInstance(dir) wrappers from prompt tests
- Fix test/lib/effect.ts type params (drop unnecessary S/T generics)
pull/20304/head
Kit Langton 2026-03-31 13:47:17 -04:00
parent f2fa1a681d
commit bb039496d5
6 changed files with 391 additions and 488 deletions

View File

@ -1,9 +1,23 @@
import { Effect, ScopedCache, Scope } from "effect"
import { Effect, ScopedCache, Scope, ServiceMap } from "effect"
import { Instance, type InstanceContext } from "@/project/instance"
import { registerDisposer } from "./instance-registry"
const TypeId = "~opencode/InstanceState"
export const InstanceRef = ServiceMap.Reference<InstanceContext | undefined>("~opencode/InstanceRef", {
defaultValue: () => undefined,
})
const context = Effect.gen(function* () {
const ref = yield* InstanceRef
return ref ?? Instance.current
})
const directory = Effect.gen(function* () {
const ref = yield* InstanceRef
return ref ? ref.directory : Instance.directory
})
export interface InstanceState<A, E = never, R = never> {
readonly [TypeId]: typeof TypeId
readonly cache: ScopedCache.ScopedCache<string, A, E, R>
@ -16,7 +30,7 @@ export namespace InstanceState {
Effect.gen(function* () {
const cache = yield* ScopedCache.make<string, A, E, R>({
capacity: Number.POSITIVE_INFINITY,
lookup: () => init(Instance.current),
lookup: () => Effect.gen(function* () { return yield* init(yield* context) }),
})
const off = registerDisposer((directory) => Effect.runPromise(ScopedCache.invalidate(cache, directory)))
@ -29,7 +43,7 @@ export namespace InstanceState {
})
export const get = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.suspend(() => ScopedCache.get(self.cache, Instance.directory))
Effect.gen(function* () { return yield* ScopedCache.get(self.cache, yield* directory) })
export const use = <A, E, R, B>(self: InstanceState<A, E, R>, select: (value: A) => B) =>
Effect.map(get(self), select)
@ -40,8 +54,8 @@ export namespace InstanceState {
) => Effect.flatMap(get(self), select)
export const has = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.suspend(() => ScopedCache.has(self.cache, Instance.directory))
Effect.gen(function* () { return yield* ScopedCache.has(self.cache, yield* directory) })
export const invalidate = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.suspend(() => ScopedCache.invalidate(self.cache, Instance.directory))
Effect.gen(function* () { return yield* ScopedCache.invalidate(self.cache, yield* directory) })
}

View File

@ -1,19 +1,31 @@
import { Effect, Layer, ManagedRuntime } from "effect"
import * as ServiceMap from "effect/ServiceMap"
import { Instance } from "@/project/instance"
import { InstanceRef } from "./instance-state"
export const memoMap = Layer.makeMemoMapUnsafe()
function provide<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
try {
const ctx = Instance.current
return Effect.provideService(effect, InstanceRef, ctx)
} catch {
return effect
}
}
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(provide(service.use(fn))),
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromiseExit(service.use(fn), options),
getRuntime().runPromiseExit(provide(service.use(fn)), options),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
getRuntime().runPromise(provide(service.use(fn)), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(provide(service.use(fn))),
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) =>
getRuntime().runCallback(provide(service.use(fn))),
}
}

View File

@ -1,6 +1,6 @@
import { afterEach, expect, test } from "bun:test"
import { Duration, Effect, Layer, ManagedRuntime, ServiceMap } from "effect"
import { InstanceState } from "../../src/effect/instance-state"
import { Cause, Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, ServiceMap } from "effect"
import { InstanceRef, InstanceState } from "../../src/effect/instance-state"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
@ -382,3 +382,100 @@ test("InstanceState dedupes concurrent lookups", async () => {
),
)
})
test("InstanceState survives deferred resume from the same instance context", async () => {
await using tmp = await tmpdir({ git: true })
interface Api {
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
}
class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResume") {
static readonly layer = Layer.effect(
Test,
Effect.gen(function* () {
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
return Test.of({
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
yield* Deferred.await(gate)
return yield* InstanceState.get(state)
}),
})
}),
)
}
const rt = ManagedRuntime.make(Test.layer)
try {
const gate = await Effect.runPromise(Deferred.make<void>())
const fiber = await Instance.provide({
directory: tmp.path,
fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))),
})
await Instance.provide({
directory: tmp.path,
fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)),
})
const exit = await Effect.runPromise(Fiber.await(fiber))
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value).toBe(tmp.path)
}
} finally {
await rt.dispose()
}
})
test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => {
await using tmp = await tmpdir({ git: true })
interface Api {
readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
}
class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResumeOutside") {
static readonly layer = Layer.effect(
Test,
Effect.gen(function* () {
const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
return Test.of({
get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
yield* Deferred.await(gate)
return yield* InstanceState.get(state)
}),
})
}),
)
}
const rt = ManagedRuntime.make(Test.layer)
try {
const gate = await Effect.runPromise(Deferred.make<void>())
// Provide InstanceRef so the fiber carries the context even when
// the deferred is resolved from outside Instance.provide ALS.
const fiber = await Instance.provide({
directory: tmp.path,
fn: () =>
Promise.resolve(
rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))),
),
})
// Resume from outside any Instance.provide — ALS is NOT set here
await Effect.runPromise(Deferred.succeed(gate, void 0))
const exit = await Effect.runPromise(Fiber.await(fiber))
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value).toBe(tmp.path)
}
} finally {
await rt.dispose()
}
})

View File

@ -7,6 +7,7 @@ import type * as PlatformError from "effect/PlatformError"
import type * as Scope from "effect/Scope"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import type { Config } from "../../src/config/config"
import { InstanceRef } from "../../src/effect/instance-state"
import { Instance } from "../../src/project/instance"
import { TestLLMServer } from "../lib/llm-server"
@ -114,7 +115,8 @@ export const provideInstance =
Effect.promise<A>(async () =>
Instance.provide({
directory,
fn: () => Effect.runPromiseWith(services)(self),
fn: () =>
Effect.runPromiseWith(services)(self.pipe(Effect.provideService(InstanceRef, Instance.current))),
}),
),
)

View File

@ -8,7 +8,7 @@ type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
const run = <A, E, R, E2, S>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2, S>) =>
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
Effect.gen(function* () {
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
if (Exit.isFailure(exit)) {
@ -19,7 +19,7 @@ const run = <A, E, R, E2, S>(value: Body<A, E, R | Scope.Scope>, layer: Layer.La
return yield* exit
}).pipe(Effect.runPromise)
const make = <R, E, S, T>(testLayer: Layer.Layer<R, E, S>, liveLayer: Layer.Layer<R, E, T>) => {
const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>) => {
const effect = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test(name, () => run(value, testLayer), opts)
@ -49,5 +49,5 @@ const liveEnv = TestConsole.layer
export const it = make(testEnv, liveEnv)
export const testEffect = <R, E, S>(layer: Layer.Layer<R, E, S>) =>
export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))

View File

@ -1,7 +1,6 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect, spyOn } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import z from "zod"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
@ -42,105 +41,6 @@ const ref = {
modelID: ModelID.make("test-model"),
}
type Script = Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
class TestLLM extends ServiceMap.Service<
TestLLM,
{
readonly push: (stream: Script) => Effect.Effect<void>
readonly reply: (...items: LLM.Event[]) => Effect.Effect<void>
readonly calls: Effect.Effect<number>
readonly inputs: Effect.Effect<LLM.StreamInput[]>
}
>()("@test/PromptLLM") {}
function stream(...items: LLM.Event[]) {
return Stream.make(...items)
}
function usage(input = 1, output = 1, total = input + output) {
return {
inputTokens: input,
outputTokens: output,
totalTokens: total,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
}
}
function start(): LLM.Event {
return { type: "start" }
}
function textStart(id = "t"): LLM.Event {
return { type: "text-start", id }
}
function textDelta(id: string, text: string): LLM.Event {
return { type: "text-delta", id, text }
}
function textEnd(id = "t"): LLM.Event {
return { type: "text-end", id }
}
function finishStep(): LLM.Event {
return {
type: "finish-step",
finishReason: "stop",
rawFinishReason: "stop",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: usage(),
}
}
function finish(): LLM.Event {
return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
}
function finishToolCallsStep(): LLM.Event {
return {
type: "finish-step",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: usage(),
}
}
function finishToolCalls(): LLM.Event {
return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }
}
function replyStop(text: string, id = "t") {
return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const
}
function replyToolCalls(text: string, id = "t") {
return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const
}
function toolInputStart(id: string, toolName: string): LLM.Event {
return { type: "tool-input-start", id, toolName }
}
function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
return { type: "tool-call", toolCallId, toolName, input }
}
function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
}
function defer<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((done) => {
@ -149,10 +49,6 @@ function defer<T>() {
return { promise, resolve }
}
function waitMs(ms: number) {
return Effect.promise(() => new Promise<void>((done) => setTimeout(done, ms)))
}
function withSh<A, E, R>(fx: () => Effect.Effect<A, E, R>) {
return Effect.acquireUseRelease(
Effect.sync(() => {
@ -190,43 +86,6 @@ function errorTool(parts: MessageV2.Part[]) {
return part?.state.status === "error" ? (part as ErrorToolPart) : undefined
}
const llm = Layer.unwrap(
Effect.gen(function* () {
const queue: Script[] = []
const inputs: LLM.StreamInput[] = []
let calls = 0
const push = Effect.fn("TestLLM.push")((item: Script) => {
queue.push(item)
return Effect.void
})
const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
return Layer.mergeAll(
Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: (input) => {
calls += 1
inputs.push(input)
const item = queue.shift() ?? Stream.empty
return typeof item === "function" ? item(input) : item
},
}),
),
Layer.succeed(
TestLLM,
TestLLM.of({
push,
reply,
calls: Effect.sync(() => calls),
inputs: Effect.sync(() => [...inputs]),
}),
),
)
}),
)
const mcp = Layer.succeed(
MCP.Service,
MCP.Service.of({
@ -282,33 +141,6 @@ const filetime = Layer.succeed(
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
const deps = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
AgentSvc.defaultLayer,
Command.defaultLayer,
Permission.layer,
Plugin.defaultLayer,
Config.defaultLayer,
filetime,
lsp,
mcp,
AppFileSystem.defaultLayer,
status,
llm,
).pipe(Layer.provideMerge(infra))
const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps))
const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps))
const env = SessionPrompt.layer.pipe(
Layer.provideMerge(compact),
Layer.provideMerge(proc),
Layer.provideMerge(registry),
Layer.provideMerge(trunc),
Layer.provideMerge(deps),
)
function makeHttp() {
const deps = Layer.mergeAll(
Session.defaultLayer,
@ -341,9 +173,8 @@ function makeHttp() {
)
}
const it = testEffect(env)
const http = testEffect(makeHttp())
const unix = process.platform !== "win32" ? it.effect : it.effect.skip
const it = testEffect(makeHttp())
const unix = process.platform !== "win32" ? it.live : it.live.skip
// Config that registers a custom "test" provider with a "test-model" model
// so Provider.getModel("test", "test-model") succeeds inside the loop.
@ -457,32 +288,32 @@ const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
})
const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create(input ?? { title: "Pinned" })
return { test, prompt, sessions, chat }
return { prompt, sessions, chat }
})
// Loop semantics
it.live("loop exits immediately when last assistant has stop finish", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { test, prompt, chat } = yield* boot()
yield* seed(chat.id, { finish: "stop" })
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* seed(chat.id, { finish: "stop" })
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
expect(yield* test.calls).toBe(0)
}),
{ git: true },
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
expect(yield* llm.calls).toBe(0)
}),
{ git: true, config: providerCfg },
),
)
http.live("loop calls LLM and returns assistant message", () =>
it.live("loop calls LLM and returns assistant message", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
@ -509,7 +340,7 @@ http.live("loop calls LLM and returns assistant message", () =>
),
)
http.live("loop continues when finish is tool-calls", () =>
it.live("loop continues when finish is tool-calls", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
@ -540,97 +371,78 @@ http.live("loop continues when finish is tool-calls", () =>
)
it.live("failed subtask preserves metadata on error tool state", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { test, prompt, chat } = yield* boot({ title: "Pinned" })
yield* test.reply(
start(),
toolInputStart("task-1", "task"),
toolCall("task-1", "task", {
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
}),
{
type: "finish-step",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: usage(),
},
{ type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() },
)
yield* test.reply(...replyStop("done"))
const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id)
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.tool("task", {
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
})
yield* llm.text("done")
const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id)
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
expect(yield* test.calls).toBe(2)
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
expect(yield* llm.calls).toBe(2)
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
const tool = errorTool(taskMsg.parts)
if (!tool) return
const tool = errorTool(taskMsg.parts)
if (!tool) return
expect(tool.state.error).toContain("Tool execution failed")
expect(tool.state.metadata).toBeDefined()
expect(tool.state.metadata?.sessionId).toBeDefined()
expect(tool.state.metadata?.model).toEqual({
providerID: ProviderID.make("test"),
modelID: ModelID.make("missing-model"),
})
}),
expect(tool.state.error).toContain("Tool execution failed")
expect(tool.state.metadata).toBeDefined()
expect(tool.state.metadata?.sessionId).toBeDefined()
expect(tool.state.metadata?.model).toEqual({
providerID: ProviderID.make("test"),
modelID: ModelID.make("missing-model"),
})
}),
{
git: true,
config: {
...cfg,
config: (url) => ({
...providerCfg(url),
agent: {
general: {
model: "test/missing-model",
},
},
},
}),
},
),
)
it.live("loop sets status to busy then idle", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const test = yield* TestLLM
it.live(
"loop sets status to busy then idle",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const bus = yield* Bus.Service
const status = yield* SessionStatus.Service
yield* test.reply(start(), textStart(), textDelta("t", "ok"), textEnd(), finishStep(), finish())
yield* llm.hang
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
const types: string[] = []
const idle = defer<void>()
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
if (evt.properties.sessionID !== chat.id) return
types.push(evt.properties.status.type)
if (evt.properties.status.type === "idle") idle.resolve()
})
yield* prompt.loop({ sessionID: chat.id })
yield* Effect.promise(() => idle.promise)
off()
expect(types).toContain("busy")
expect(types[types.length - 1]).toBe("idle")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
expect((yield* status.get(chat.id)).type).toBe("busy")
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
expect((yield* status.get(chat.id)).type).toBe("idle")
}),
{ git: true, config: cfg },
),
{ git: true, config: providerCfg },
),
3_000,
)
// Cancel semantics
@ -638,66 +450,57 @@ it.live("loop sets status to busy then idle", () =>
it.live(
"cancel interrupts loop and resolves with an assistant message",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { test, prompt, chat } = yield* boot()
yield* seed(chat.id)
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* seed(chat.id)
// Make LLM hang so the loop blocks
yield* test.push((input) => hang(input, start()))
yield* llm.hang
// Seed a new user message so the loop enters the LLM path
yield* user(chat.id, "more")
yield* user(chat.id, "more")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
// Give the loop time to start
yield* waitMs(200)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
}
}),
{ git: true, config: cfg },
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
}
}),
{ git: true, config: providerCfg },
),
30_000,
3_000,
)
it.live(
"cancel records MessageAbortedError on interrupted process",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const { test, prompt, chat } = yield* boot()
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hello")
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
yield* user(chat.id, "hello")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
const info = exit.value.info
if (info.role === "assistant") {
expect(info.error?.name).toBe("MessageAbortedError")
}
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
const info = exit.value.info
if (info.role === "assistant") {
expect(info.error?.name).toBe("MessageAbortedError")
}
}),
{ git: true, config: cfg },
}
}),
{ git: true, config: providerCfg },
),
30_000,
3_000,
)
it.live(
@ -766,37 +569,30 @@ it.live(
it.live(
"cancel with queued callers resolves all cleanly",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const { test, prompt, chat } = yield* boot()
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hello")
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
yield* user(chat.id, "hello")
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
// Queue a second caller
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* waitMs(50)
yield* prompt.cancel(chat.id)
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
expect(exitA.value.info.id).toBe(exitB.value.info.id)
}
}),
{ git: true, config: cfg },
yield* prompt.cancel(chat.id)
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
expect(exitA.value.info.id).toBe(exitB.value.info.id)
}
}),
{ git: true, config: providerCfg },
),
30_000,
3_000,
)
// Queue semantics
@ -820,13 +616,16 @@ it.live("concurrent loop callers get same result", () =>
),
)
it.live("concurrent loop callers all receive same error result", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { test, prompt, chat } = yield* boot()
it.live(
"concurrent loop callers all receive same error result",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* test.push(Stream.fail(new Error("boom")))
yield* llm.fail("boom")
yield* user(chat.id, "hello")
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
@ -842,125 +641,107 @@ it.live("concurrent loop callers all receive same error result", () =>
expect(b.info.error).toBeDefined()
}
}),
{ git: true, config: cfg },
),
{ git: true, config: providerCfg },
),
3_000,
)
it.live(
"prompt submitted during an active run is included in the next LLM input",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const gate = defer<void>()
const { test, prompt, sessions, chat } = yield* boot()
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const gate = defer<void>()
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* test.push((_input) =>
stream(start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
Stream.concat(
Stream.fromEffect(Effect.promise(() => gate.promise)).pipe(
Stream.flatMap(() =>
stream(textStart("a"), textDelta("a", "first"), textEnd("a"), finishStep(), finish()),
),
),
),
),
)
yield* llm.hold("first", gate.promise)
yield* llm.text("second")
const a = yield* prompt
.prompt({
sessionID: chat.id,
agent: "build",
model: ref,
parts: [{ type: "text", text: "first" }],
})
.pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
const id = MessageID.ascending()
const b = yield* prompt
.prompt({
sessionID: chat.id,
messageID: id,
agent: "build",
model: ref,
parts: [{ type: "text", text: "second" }],
})
.pipe(Effect.forkChild)
yield* Effect.promise(async () => {
const end = Date.now() + 5000
while (Date.now() < end) {
const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
await new Promise((done) => setTimeout(done, 20))
}
throw new Error("timed out waiting for second prompt to save")
const a = yield* prompt
.prompt({
sessionID: chat.id,
agent: "build",
model: ref,
parts: [{ type: "text", text: "first" }],
})
.pipe(Effect.forkChild)
yield* test.reply(...replyStop("second"))
gate.resolve()
yield* llm.wait(1)
const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(ea)).toBe(true)
expect(Exit.isSuccess(eb)).toBe(true)
expect(yield* test.calls).toBe(2)
const id = MessageID.ascending()
const b = yield* prompt
.prompt({
sessionID: chat.id,
messageID: id,
agent: "build",
model: ref,
parts: [{ type: "text", text: "second" }],
})
.pipe(Effect.forkChild)
const msgs = yield* sessions.messages({ sessionID: chat.id })
const assistants = msgs.filter((msg) => msg.info.role === "assistant")
expect(assistants).toHaveLength(2)
const last = assistants.at(-1)
if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
expect(last.info.parentID).toBe(id)
expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
yield* Effect.promise(async () => {
const end = Date.now() + 5000
while (Date.now() < end) {
const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
await new Promise((done) => setTimeout(done, 20))
}
throw new Error("timed out waiting for second prompt to save")
})
const inputs = yield* test.inputs
expect(inputs).toHaveLength(2)
expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
}),
{ git: true, config: cfg },
gate.resolve()
const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(ea)).toBe(true)
expect(Exit.isSuccess(eb)).toBe(true)
expect(yield* llm.calls).toBe(2)
const msgs = yield* sessions.messages({ sessionID: chat.id })
const assistants = msgs.filter((msg) => msg.info.role === "assistant")
expect(assistants).toHaveLength(2)
const last = assistants.at(-1)
if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
expect(last.info.parentID).toBe(id)
expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
const inputs = yield* llm.inputs
expect(inputs).toHaveLength(2)
expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
}),
{ git: true, config: providerCfg },
),
30_000,
3_000,
)
it.live(
"assertNotBusy throws BusyError when loop running",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
yield* llm.hang
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
}),
{ git: true, config: cfg },
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
}),
{ git: true, config: providerCfg },
),
30_000,
3_000,
)
it.live("assertNotBusy succeeds when idle", () =>
@ -983,34 +764,31 @@ it.live("assertNotBusy succeeds when idle", () =>
it.live(
"shell rejects with BusyError when loop running",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
const { test, prompt, chat } = yield* boot()
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hi")
yield* test.push((input) =>
hang(input, start()).pipe(
Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
),
)
yield* user(chat.id, "hi")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
const exit = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "echo hi" })
.pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
}),
{ git: true, config: cfg },
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
}),
{ git: true, config: providerCfg },
),
30_000,
3_000,
)
unix("shell captures stdout and stderr in completed tool output", () =>
@ -1072,7 +850,7 @@ unix(
30_000,
)
http.live(
it.live(
"loop waits while shell runs and starts after shell exits",
() =>
provideTmpdirServer(
@ -1088,15 +866,15 @@ http.live(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
.pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* waitMs(50)
const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
expect(yield* llm.calls).toBe(0)
yield* Fiber.await(sh)
const exit = yield* Fiber.await(run)
const exit = yield* Fiber.await(loop)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
@ -1110,7 +888,7 @@ http.live(
3_000,
)
http.live(
it.live(
"shell completion resumes queued loop callers",
() =>
provideTmpdirServer(
@ -1126,11 +904,11 @@ http.live(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
.pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
expect(yield* llm.calls).toBe(0)
@ -1162,7 +940,7 @@ unix(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
@ -1199,7 +977,7 @@ unix(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" })
.pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
@ -1230,14 +1008,14 @@ unix(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* waitMs(50)
const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(run)
const exit = yield* Fiber.await(loop)
expect(Exit.isSuccess(exit)).toBe(true)
yield* Fiber.await(sh)
@ -1259,7 +1037,7 @@ unix(
const a = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* waitMs(50)
yield* Effect.sleep(50)
const exit = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "echo hi" })