From 0c2b5b2c397b6bc8cb06fe719af5674b2373560b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 08:49:06 -0400 Subject: [PATCH] fix(bus): use Fiber.interrupt for clean disposal of subscribeAll Use forkInstance + Fiber.interrupt (which awaits) instead of runCallbackInstance + interruptUnsafe (fire-and-forget) for subscribeAll. This ensures the fiber completes before layer invalidation, allowing the RcMap refCount to drop to 0. subscribeAll now delivers InstanceDisposed as the last callback message via Effect.ensuring when the fiber is interrupted during disposal, but not on manual unsubscribe. Add priority support to registerDisposer so Bus can interrupt subscription fibers (priority -1) before layer invalidation (priority 0). Add forkInstance helper to effect/runtime that returns a Fiber instead of an interrupt function. --- packages/opencode/src/bus/index.ts | 49 +++++++++++++------ .../opencode/src/effect/instance-registry.ts | 23 +++++++-- packages/opencode/src/effect/runtime.ts | 8 ++- 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 5a1c106f4c..b10c299665 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,10 +1,11 @@ import z from "zod" -import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" +import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" -import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" +import { registerDisposer } from "../effect/instance-registry" +import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) @@ -51,10 +52,7 @@ export namespace Bus { return ps }) - function publish( - def: D, - properties: z.output, - ) { + function publish(def: D, properties: z.output) { return Effect.gen(function* () { const payload: Payload = { type: def.type, properties } log.info("publishing", { type: def.type }) @@ -97,9 +95,7 @@ export namespace Bus { function runStream(stream: (svc: Interface) => Stream.Stream, callback: (event: any) => void) { return runCallbackInstance( - Service.use((svc) => - stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), - ), + Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))), ) } @@ -107,14 +103,39 @@ export namespace Bus { return runPromiseInstance(Service.use((svc) => svc.publish(def, properties))) } - export function subscribe( - def: D, - callback: (event: Payload) => void, - ) { + export function subscribe(def: D, callback: (event: Payload) => void) { return runStream((svc) => svc.subscribe(def), callback) } export function subscribeAll(callback: (event: any) => void) { - return runStream((svc) => svc.subscribeAll(), callback) + const directory = Instance.directory + let manualUnsub = false + + const fiber = forkInstance( + Service.use((svc) => + svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), + ).pipe( + Effect.ensuring( + Effect.sync(() => { + if (!manualUnsub) { + callback({ type: InstanceDisposed.type, properties: { directory } }) + } + }), + ), + ), + ) + + // Interrupt the fiber before the layer is invalidated, awaiting + // completion so the refCount drops and the scope can close. + const unregister = registerDisposer( + (dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))), + -1, + ) + + return () => { + manualUnsub = true + unregister() + fiber.interruptUnsafe() + } } } diff --git a/packages/opencode/src/effect/instance-registry.ts b/packages/opencode/src/effect/instance-registry.ts index 59c556e044..49b4abe137 100644 --- a/packages/opencode/src/effect/instance-registry.ts +++ b/packages/opencode/src/effect/instance-registry.ts @@ -1,12 +1,25 @@ -const disposers = new Set<(directory: string) => Promise>() +const disposers = new Set<{ + fn: (directory: string) => Promise + priority: number +}>() -export function registerDisposer(disposer: (directory: string) => Promise) { - disposers.add(disposer) +export function registerDisposer(disposer: (directory: string) => Promise, priority = 0) { + const item = { + fn: disposer, + priority, + } + disposers.add(item) return () => { - disposers.delete(disposer) + disposers.delete(item) } } export async function disposeInstance(directory: string) { - await Promise.allSettled([...disposers].map((disposer) => disposer(directory))) + const list = [...disposers].sort((a, b) => a.priority - b.priority) + const seen = new Set() + for (const item of list) { + if (seen.has(item.priority)) continue + seen.add(item.priority) + await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory))) + } } diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index 4465b106f4..794e5978e9 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -1,4 +1,4 @@ -import { Effect, Layer, ManagedRuntime } from "effect" +import { Effect, Fiber, Layer, ManagedRuntime } from "effect" import { AccountEffect } from "@/account/effect" import { AuthEffect } from "@/auth/effect" import { Instances } from "@/effect/instances" @@ -18,6 +18,12 @@ export function runPromiseInstance(effect: Effect.Effect( + effect: Effect.Effect, +): Fiber.Fiber { + return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function runCallbackInstance( effect: Effect.Effect, ): (interruptor?: number) => void {