diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index b10c299665..49b2a2335d 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,11 +1,10 @@ import z from "zod" -import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect" +import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" -import { registerDisposer } from "../effect/instance-registry" -import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime" +import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) @@ -109,33 +108,22 @@ export namespace Bus { export function subscribeAll(callback: (event: any) => void) { const directory = Instance.directory - let manualUnsub = false - const fiber = forkInstance( - Service.use((svc) => - svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), - ).pipe( - Effect.ensuring( - Effect.sync(() => { - if (!manualUnsub) { - callback({ type: InstanceDisposed.type, properties: { directory } }) - } - }), - ), - ), - ) - - // Interrupt the fiber before the layer is invalidated, awaiting - // completion so the refCount drops and the scope can close. - const unregister = registerDisposer( - (dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))), - -1, - ) + // InstanceDisposed is delivered via GlobalBus because the sync + // callback API can't wait for async layer acquisition. The Effect + // service's stream ending IS the disposal signal for Effect consumers. + const onDispose = (evt: { directory?: string; payload: any }) => { + if (evt.payload.type !== InstanceDisposed.type) return + if (evt.directory !== directory) return + callback(evt.payload) + GlobalBus.off("event", onDispose) + } + GlobalBus.on("event", onDispose) + const interrupt = runStream((svc) => svc.subscribeAll(), callback) return () => { - manualUnsub = true - unregister() - fiber.interruptUnsafe() + GlobalBus.off("event", onDispose) + interrupt() } } } diff --git a/packages/opencode/src/effect/instance-registry.ts b/packages/opencode/src/effect/instance-registry.ts index 49b4abe137..59c556e044 100644 --- a/packages/opencode/src/effect/instance-registry.ts +++ b/packages/opencode/src/effect/instance-registry.ts @@ -1,25 +1,12 @@ -const disposers = new Set<{ - fn: (directory: string) => Promise - priority: number -}>() +const disposers = new Set<(directory: string) => Promise>() -export function registerDisposer(disposer: (directory: string) => Promise, priority = 0) { - const item = { - fn: disposer, - priority, - } - disposers.add(item) +export function registerDisposer(disposer: (directory: string) => Promise) { + disposers.add(disposer) return () => { - disposers.delete(item) + disposers.delete(disposer) } } export async function disposeInstance(directory: string) { - const list = [...disposers].sort((a, b) => a.priority - b.priority) - const seen = new Set() - for (const item of list) { - if (seen.has(item.priority)) continue - seen.add(item.priority) - await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory))) - } + await Promise.allSettled([...disposers].map((disposer) => disposer(directory))) } diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index 794e5978e9..4465b106f4 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -1,4 +1,4 @@ -import { Effect, Fiber, Layer, ManagedRuntime } from "effect" +import { Effect, Layer, ManagedRuntime } from "effect" import { AccountEffect } from "@/account/effect" import { AuthEffect } from "@/auth/effect" import { Instances } from "@/effect/instances" @@ -18,12 +18,6 @@ export function runPromiseInstance(effect: Effect.Effect( - effect: Effect.Effect, -): Fiber.Fiber { - return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) -} - export function runCallbackInstance( effect: Effect.Effect, ): (interruptor?: number) => void { diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index b3da0fc9ea..a8929aa1c9 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -4,6 +4,7 @@ import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" import { GlobalBus } from "../../src/bus/global" import { Instance } from "../../src/project/instance" +import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" // --------------------------------------------------------------------------- @@ -190,6 +191,38 @@ describe("Bus", () => { expect(all).toEqual(["test.ping"]) }) + + test("subscribeAll delivers InstanceDisposed via GlobalBus on disposal", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + }) + + await Instance.disposeAll() + + expect(all).toContain(Bus.InstanceDisposed.type) + }) + + test("manual unsubscribe suppresses InstanceDisposed", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + let unsub = () => {} + + await withInstance(tmp.path, async () => { + unsub = Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + }) + + unsub() + await Instance.disposeAll() + + expect(all).not.toContain(Bus.InstanceDisposed.type) + }) }) describe("GlobalBus forwarding", () => {