diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 5a1c106f4c..b10c299665 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,10 +1,11 @@ import z from "zod" -import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" +import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" -import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" +import { registerDisposer } from "../effect/instance-registry" +import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) @@ -51,10 +52,7 @@ export namespace Bus { return ps }) - function publish( - def: D, - properties: z.output, - ) { + function publish(def: D, properties: z.output) { return Effect.gen(function* () { const payload: Payload = { type: def.type, properties } log.info("publishing", { type: def.type }) @@ -97,9 +95,7 @@ export namespace Bus { function runStream(stream: (svc: Interface) => Stream.Stream, callback: (event: any) => void) { return runCallbackInstance( - Service.use((svc) => - stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), - ), + Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))), ) } @@ -107,14 +103,39 @@ export namespace Bus { return runPromiseInstance(Service.use((svc) => svc.publish(def, properties))) } - export function subscribe( - def: D, - callback: (event: Payload) => void, - ) { + export function subscribe(def: D, callback: (event: Payload) => void) { return runStream((svc) => svc.subscribe(def), callback) } export function subscribeAll(callback: (event: any) => void) { - return runStream((svc) => svc.subscribeAll(), callback) + const directory = Instance.directory + let manualUnsub = false + + const fiber = forkInstance( + Service.use((svc) => + svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), + ).pipe( + Effect.ensuring( + Effect.sync(() => { + if (!manualUnsub) { + callback({ type: InstanceDisposed.type, properties: { directory } }) + } + }), + ), + ), + ) + + // Interrupt the fiber before the layer is invalidated, awaiting + // completion so the refCount drops and the scope can close. + const unregister = registerDisposer( + (dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))), + -1, + ) + + return () => { + manualUnsub = true + unregister() + fiber.interruptUnsafe() + } } } diff --git a/packages/opencode/src/effect/instance-registry.ts b/packages/opencode/src/effect/instance-registry.ts index 59c556e044..49b4abe137 100644 --- a/packages/opencode/src/effect/instance-registry.ts +++ b/packages/opencode/src/effect/instance-registry.ts @@ -1,12 +1,25 @@ -const disposers = new Set<(directory: string) => Promise>() +const disposers = new Set<{ + fn: (directory: string) => Promise + priority: number +}>() -export function registerDisposer(disposer: (directory: string) => Promise) { - disposers.add(disposer) +export function registerDisposer(disposer: (directory: string) => Promise, priority = 0) { + const item = { + fn: disposer, + priority, + } + disposers.add(item) return () => { - disposers.delete(disposer) + disposers.delete(item) } } export async function disposeInstance(directory: string) { - await Promise.allSettled([...disposers].map((disposer) => disposer(directory))) + const list = [...disposers].sort((a, b) => a.priority - b.priority) + const seen = new Set() + for (const item of list) { + if (seen.has(item.priority)) continue + seen.add(item.priority) + await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory))) + } } diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index 4465b106f4..794e5978e9 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -1,4 +1,4 @@ -import { Effect, Layer, ManagedRuntime } from "effect" +import { Effect, Fiber, Layer, ManagedRuntime } from "effect" import { AccountEffect } from "@/account/effect" import { AuthEffect } from "@/auth/effect" import { Instances } from "@/effect/instances" @@ -18,6 +18,12 @@ export function runPromiseInstance(effect: Effect.Effect( + effect: Effect.Effect, +): Fiber.Fiber { + return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function runCallbackInstance( effect: Effect.Effect, ): (interruptor?: number) => void {