fix: propagate InstanceRef across static function boundaries

- makeRuntime.provide reads InstanceRef from current Effect fiber when
  ALS is unavailable, bridging static function calls (like Bus.publish)
  that create new fibers from inside Effect code
- Database.transaction preserves Instance ALS via Instance.bind on the
  bun:sqlite transaction callback (native fn loses ALS)
- Instance.restore helper for bridging Effect→sync code with ALS
- InstanceState.withALS bridges InstanceRef back to ALS for sync callers
- prompt.ts: InstructionPrompt.clear wrapped with withALS
- Remove ALL provideInstance(dir) wrappers from prompt-effect tests
pull/20304/head
Kit Langton 2026-03-31 14:51:22 -04:00
parent cc412f3014
commit 191a747405
6 changed files with 47 additions and 31 deletions

View File

@ -58,4 +58,7 @@ export namespace InstanceState {
export const invalidate = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.gen(function* () { return yield* ScopedCache.invalidate(self.cache, yield* directory) })
/** Run a sync function with Instance ALS restored from the InstanceRef. */
export const withALS = <T>(fn: () => T) => Effect.map(context, (ctx) => Instance.restore(ctx, fn))
}

View File

@ -6,12 +6,17 @@ import { InstanceRef } from "./instance-state"
export const memoMap = Layer.makeMemoMapUnsafe()
function provide<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
// Try ALS first
try {
const ctx = Instance.current
return Effect.provideService(effect, InstanceRef, ctx)
} catch {
return effect
}
} catch {}
// Try current Effect fiber's InstanceRef (for calls from inside Effect code
// that escapes to static functions, like sync callbacks calling Bus.publish)
const fiber = (globalThis as any)["~effect/Fiber/currentFiber"]
const ref = fiber?.services?.mapUnsafe?.get("~opencode/InstanceRef")
if (ref) return Effect.provideService(effect, InstanceRef, ref)
return effect
}
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {

View File

@ -114,6 +114,14 @@ export const Instance = {
const ctx = context.use()
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
},
/**
* Run a synchronous function within the given instance context ALS.
* Use this to bridge from Effect (where InstanceRef carries context)
* back to sync code that reads Instance.directory from ALS.
*/
restore<R>(ctx: InstanceContext, fn: () => R): R {
return context.provide(ctx, fn)
},
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
return State.create(() => Instance.directory, init, dispose)
},

View File

@ -979,7 +979,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
variant,
}
yield* Effect.addFinalizer(() => Effect.sync(() => InstructionPrompt.clear(info.id)))
yield* Effect.addFinalizer(() => InstanceState.withALS(() => InstructionPrompt.clear(info.id)))
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
@ -1542,7 +1542,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}),
Effect.fnUntraced(function* (exit) {
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
InstructionPrompt.clear(handle.message.id)
yield* InstanceState.withALS(() => InstructionPrompt.clear(handle.message.id))
}),
)
if (outcome === "break") break

View File

@ -143,11 +143,10 @@ export namespace Database {
}
export function effect(fn: () => any | Promise<any>) {
const bound = Instance.bind(fn)
try {
ctx.use().effects.push(bound)
ctx.use().effects.push(fn)
} catch {
bound()
fn()
}
}
@ -164,12 +163,11 @@ export namespace Database {
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const result = Client().transaction(
(tx: TxOrDb) => {
return ctx.provide({ tx, effects }, () => callback(tx))
},
{ behavior: options?.behavior },
)
let txCallback = (tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx))
try {
txCallback = Instance.bind(txCallback)
} catch {}
const result = Client().transaction(txCallback, { behavior: options?.behavior })
for (const effect of effects) effect()
return result as NotPromise<T>
}

View File

@ -30,7 +30,7 @@ import { ToolRegistry } from "../../src/tool/registry"
import { Truncate } from "../../src/tool/truncate"
import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { provideInstance, provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { TestLLMServer } from "../lib/llm-server"
@ -451,7 +451,7 @@ it.live(
"cancel interrupts loop and resolves with an assistant message",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ dir, llm }) {
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
@ -461,10 +461,13 @@ it.live(
yield* user(chat.id, "more")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(chat.id).pipe(provideInstance(dir))
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
if (Exit.isFailure(exit)) {
for (const err of Cause.prettyErrors(exit.cause)) console.error("DEBUG CANCEL FAIL:", err)
}
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
@ -479,16 +482,16 @@ it.live(
"cancel records MessageAbortedError on interrupted process",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ dir, llm }) {
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hello")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(chat.id).pipe(provideInstance(dir))
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
@ -570,19 +573,19 @@ it.live(
"cancel with queued callers resolves all cleanly",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ dir, llm }) {
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hello")
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id).pipe(provideInstance(dir))
yield* prompt.cancel(chat.id)
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
@ -620,7 +623,7 @@ it.live(
"concurrent loop callers all receive same error result",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ dir, llm }) {
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
@ -631,8 +634,7 @@ it.live(
const [a, b] = yield* Effect.all(
[prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })],
{ concurrency: "unbounded" },
).pipe(provideInstance(dir))
)
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
}),
@ -645,7 +647,7 @@ it.live(
"prompt submitted during an active run is included in the next LLM input",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ dir, llm }) {
Effect.fnUntraced(function* ({ llm }) {
const gate = defer<void>()
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
@ -661,7 +663,7 @@ it.live(
model: ref,
parts: [{ type: "text", text: "first" }],
})
.pipe(provideInstance(dir), Effect.forkChild)
.pipe(Effect.forkChild)
yield* llm.wait(1)
@ -674,7 +676,7 @@ it.live(
model: ref,
parts: [{ type: "text", text: "second" }],
})
.pipe(provideInstance(dir), Effect.forkChild)
.pipe(Effect.forkChild)
yield* Effect.promise(async () => {
const end = Date.now() + 5000