fix: restore instance context in deferred database callbacks

pull/20304/head
Kit Langton 2026-03-31 15:46:43 -04:00
parent 191a747405
commit 825f51c39f
4 changed files with 31 additions and 27 deletions

View File

@ -1,4 +1,4 @@
import { Effect, ScopedCache, Scope, ServiceMap } from "effect"
import { Effect, Fiber, ScopedCache, Scope, ServiceMap } from "effect"
import { Instance, type InstanceContext } from "@/project/instance"
import { registerDisposer } from "./instance-registry"
@ -14,6 +14,16 @@ export interface InstanceState<A, E = never, R = never> {
}
export namespace InstanceState {
export const bind = <F extends (...args: any[]) => any>(fn: F): F => {
try {
return Instance.bind(fn)
} catch {}
const fiber = Fiber.getCurrent()
const ctx = fiber ? ServiceMap.getReferenceUnsafe(fiber.services, InstanceRef) : undefined
if (!ctx) return fn
return ((...args: any[]) => Instance.restore(ctx, () => fn(...args))) as F
}
export const context = Effect.gen(function* () {
const ref = yield* InstanceRef
return ref ?? Instance.current
@ -30,7 +40,10 @@ export namespace InstanceState {
Effect.gen(function* () {
const cache = yield* ScopedCache.make<string, A, E, R>({
capacity: Number.POSITIVE_INFINITY,
lookup: () => Effect.gen(function* () { return yield* init(yield* context) }),
lookup: () =>
Effect.gen(function* () {
return yield* init(yield* context)
}),
})
const off = registerDisposer((directory) => Effect.runPromise(ScopedCache.invalidate(cache, directory)))
@ -43,7 +56,9 @@ export namespace InstanceState {
})
export const get = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.gen(function* () { return yield* ScopedCache.get(self.cache, yield* directory) })
Effect.gen(function* () {
return yield* ScopedCache.get(self.cache, yield* directory)
})
export const use = <A, E, R, B>(self: InstanceState<A, E, R>, select: (value: A) => B) =>
Effect.map(get(self), select)
@ -54,10 +69,14 @@ export namespace InstanceState {
) => Effect.flatMap(get(self), select)
export const has = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.gen(function* () { return yield* ScopedCache.has(self.cache, yield* directory) })
Effect.gen(function* () {
return yield* ScopedCache.has(self.cache, yield* directory)
})
export const invalidate = <A, E, R>(self: InstanceState<A, E, R>) =>
Effect.gen(function* () { return yield* ScopedCache.invalidate(self.cache, yield* directory) })
Effect.gen(function* () {
return yield* ScopedCache.invalidate(self.cache, yield* directory)
})
/** Run a sync function with Instance ALS restored from the InstanceRef. */
export const withALS = <T>(fn: () => T) => Effect.map(context, (ctx) => Instance.restore(ctx, fn))

View File

@ -6,16 +6,10 @@ import { InstanceRef } from "./instance-state"
export const memoMap = Layer.makeMemoMapUnsafe()
function provide<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
// Try ALS first
try {
const ctx = Instance.current
return Effect.provideService(effect, InstanceRef, ctx)
} catch {}
// Try current Effect fiber's InstanceRef (for calls from inside Effect code
// that escapes to static functions, like sync callbacks calling Bus.publish)
const fiber = (globalThis as any)["~effect/Fiber/currentFiber"]
const ref = fiber?.services?.mapUnsafe?.get("~opencode/InstanceRef")
if (ref) return Effect.provideService(effect, InstanceRef, ref)
return effect
}

View File

@ -10,9 +10,9 @@ import { NamedError } from "@opencode-ai/util/error"
import z from "zod"
import path from "path"
import { readFileSync, readdirSync, existsSync } from "fs"
import { Instance } from "../project/instance"
import { Installation } from "../installation"
import { Flag } from "../flag/flag"
import { InstanceState } from "@/effect/instance-state"
import { iife } from "@/util/iife"
import { init } from "#db"
@ -144,7 +144,7 @@ export namespace Database {
export function effect(fn: () => any | Promise<any>) {
try {
ctx.use().effects.push(fn)
ctx.use().effects.push(InstanceState.bind(fn))
} catch {
fn()
}
@ -163,10 +163,7 @@ export namespace Database {
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = []
let txCallback = (tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx))
try {
txCallback = Instance.bind(txCallback)
} catch {}
const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
const result = Client().transaction(txCallback, { behavior: options?.behavior })
for (const effect of effects) effect()
return result as NotPromise<T>

View File

@ -465,9 +465,6 @@ it.live(
yield* llm.wait(1)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
if (Exit.isFailure(exit)) {
for (const err of Cause.prettyErrors(exit.cause)) console.error("DEBUG CANCEL FAIL:", err)
}
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
@ -631,10 +628,9 @@ it.live(
yield* llm.fail("boom")
yield* user(chat.id, "hello")
const [a, b] = yield* Effect.all(
[prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })],
{ concurrency: "unbounded" },
)
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
concurrency: "unbounded",
})
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
}),
@ -772,9 +768,7 @@ it.live(
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const exit = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "echo hi" })
.pipe(Effect.exit)
const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)