From 191a747405e473b1cd306b26d09b101cd57b7bd0 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 31 Mar 2026 14:51:22 -0400 Subject: [PATCH] fix: propagate InstanceRef across static function boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - makeRuntime.provide reads InstanceRef from current Effect fiber when ALS is unavailable, bridging static function calls (like Bus.publish) that create new fibers from inside Effect code - Database.transaction preserves Instance ALS via Instance.bind on the bun:sqlite transaction callback (native fn loses ALS) - Instance.restore helper for bridging Effect→sync code with ALS - InstanceState.withALS bridges InstanceRef back to ALS for sync callers - prompt.ts: InstructionPrompt.clear wrapped with withALS - Remove ALL provideInstance(dir) wrappers from prompt-effect tests --- .../opencode/src/effect/instance-state.ts | 3 ++ packages/opencode/src/effect/run-service.ts | 11 ++++-- packages/opencode/src/project/instance.ts | 8 +++++ packages/opencode/src/session/prompt.ts | 4 +-- packages/opencode/src/storage/db.ts | 16 ++++----- .../test/session/prompt-effect.test.ts | 36 ++++++++++--------- 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/packages/opencode/src/effect/instance-state.ts b/packages/opencode/src/effect/instance-state.ts index a7f421d82e..638af89f43 100644 --- a/packages/opencode/src/effect/instance-state.ts +++ b/packages/opencode/src/effect/instance-state.ts @@ -58,4 +58,7 @@ export namespace InstanceState { export const invalidate = (self: InstanceState) => Effect.gen(function* () { return yield* ScopedCache.invalidate(self.cache, yield* directory) }) + + /** Run a sync function with Instance ALS restored from the InstanceRef. */ + export const withALS = (fn: () => T) => Effect.map(context, (ctx) => Instance.restore(ctx, fn)) } diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 164f9d05c7..db2672c339 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -6,12 +6,17 @@ import { InstanceRef } from "./instance-state" export const memoMap = Layer.makeMemoMapUnsafe() function provide(effect: Effect.Effect): Effect.Effect { + // Try ALS first try { const ctx = Instance.current return Effect.provideService(effect, InstanceRef, ctx) - } catch { - return effect - } + } catch {} + // Try current Effect fiber's InstanceRef (for calls from inside Effect code + // that escapes to static functions, like sync callbacks calling Bus.publish) + const fiber = (globalThis as any)["~effect/Fiber/currentFiber"] + const ref = fiber?.services?.mapUnsafe?.get("~opencode/InstanceRef") + if (ref) return Effect.provideService(effect, InstanceRef, ref) + return effect } export function makeRuntime(service: ServiceMap.Service, layer: Layer.Layer) { diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index 5dddfe627f..a0d6f2414a 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -114,6 +114,14 @@ export const Instance = { const ctx = context.use() return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F }, + /** + * Run a synchronous function within the given instance context ALS. + * Use this to bridge from Effect (where InstanceRef carries context) + * back to sync code that reads Instance.directory from ALS. + */ + restore(ctx: InstanceContext, fn: () => R): R { + return context.provide(ctx, fn) + }, state(init: () => S, dispose?: (state: Awaited) => Promise): () => S { return State.create(() => Instance.directory, init, dispose) }, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 1093be71a6..083c23cc68 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -979,7 +979,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the variant, } - yield* Effect.addFinalizer(() => Effect.sync(() => InstructionPrompt.clear(info.id))) + yield* Effect.addFinalizer(() => InstanceState.withALS(() => InstructionPrompt.clear(info.id))) type Draft = T extends MessageV2.Part ? Omit & { id?: string } : never const assign = (part: Draft): MessageV2.Part => ({ @@ -1542,7 +1542,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }), Effect.fnUntraced(function* (exit) { if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort() - InstructionPrompt.clear(handle.message.id) + yield* InstanceState.withALS(() => InstructionPrompt.clear(handle.message.id)) }), ) if (outcome === "break") break diff --git a/packages/opencode/src/storage/db.ts b/packages/opencode/src/storage/db.ts index b37c666c88..5a6db82785 100644 --- a/packages/opencode/src/storage/db.ts +++ b/packages/opencode/src/storage/db.ts @@ -143,11 +143,10 @@ export namespace Database { } export function effect(fn: () => any | Promise) { - const bound = Instance.bind(fn) try { - ctx.use().effects.push(bound) + ctx.use().effects.push(fn) } catch { - bound() + fn() } } @@ -164,12 +163,11 @@ export namespace Database { } catch (err) { if (err instanceof Context.NotFound) { const effects: (() => void | Promise)[] = [] - const result = Client().transaction( - (tx: TxOrDb) => { - return ctx.provide({ tx, effects }, () => callback(tx)) - }, - { behavior: options?.behavior }, - ) + let txCallback = (tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)) + try { + txCallback = Instance.bind(txCallback) + } catch {} + const result = Client().transaction(txCallback, { behavior: options?.behavior }) for (const effect of effects) effect() return result as NotPromise } diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 3c434bbabc..003af662c2 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -30,7 +30,7 @@ import { ToolRegistry } from "../../src/tool/registry" import { Truncate } from "../../src/tool/truncate" import { Log } from "../../src/util/log" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { provideInstance, provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" +import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { TestLLMServer } from "../lib/llm-server" @@ -451,7 +451,7 @@ it.live( "cancel interrupts loop and resolves with an assistant message", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ dir, llm }) { + Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) @@ -461,10 +461,13 @@ it.live( yield* user(chat.id, "more") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) yield* llm.wait(1) - yield* prompt.cancel(chat.id).pipe(provideInstance(dir)) + yield* prompt.cancel(chat.id) const exit = yield* Fiber.await(fiber) + if (Exit.isFailure(exit)) { + for (const err of Cause.prettyErrors(exit.cause)) console.error("DEBUG CANCEL FAIL:", err) + } expect(Exit.isSuccess(exit)).toBe(true) if (Exit.isSuccess(exit)) { expect(exit.value.info.role).toBe("assistant") @@ -479,16 +482,16 @@ it.live( "cancel records MessageAbortedError on interrupted process", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ dir, llm }) { + Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) yield* llm.hang yield* user(chat.id, "hello") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) yield* llm.wait(1) - yield* prompt.cancel(chat.id).pipe(provideInstance(dir)) + yield* prompt.cancel(chat.id) const exit = yield* Fiber.await(fiber) expect(Exit.isSuccess(exit)).toBe(true) if (Exit.isSuccess(exit)) { @@ -570,19 +573,19 @@ it.live( "cancel with queued callers resolves all cleanly", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ dir, llm }) { + Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) yield* llm.hang yield* user(chat.id, "hello") - const a = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) + const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) yield* llm.wait(1) - const b = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) + const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) yield* Effect.sleep(50) - yield* prompt.cancel(chat.id).pipe(provideInstance(dir)) + yield* prompt.cancel(chat.id) const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) expect(Exit.isSuccess(exitA)).toBe(true) expect(Exit.isSuccess(exitB)).toBe(true) @@ -620,7 +623,7 @@ it.live( "concurrent loop callers all receive same error result", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ dir, llm }) { + Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) @@ -631,8 +634,7 @@ it.live( const [a, b] = yield* Effect.all( [prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], { concurrency: "unbounded" }, - ).pipe(provideInstance(dir)) - + ) expect(a.info.id).toBe(b.info.id) expect(a.info.role).toBe("assistant") }), @@ -645,7 +647,7 @@ it.live( "prompt submitted during an active run is included in the next LLM input", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ dir, llm }) { + Effect.fnUntraced(function* ({ llm }) { const gate = defer() const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service @@ -661,7 +663,7 @@ it.live( model: ref, parts: [{ type: "text", text: "first" }], }) - .pipe(provideInstance(dir), Effect.forkChild) + .pipe(Effect.forkChild) yield* llm.wait(1) @@ -674,7 +676,7 @@ it.live( model: ref, parts: [{ type: "text", text: "second" }], }) - .pipe(provideInstance(dir), Effect.forkChild) + .pipe(Effect.forkChild) yield* Effect.promise(async () => { const end = Date.now() + 5000