diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 49b2a2335d..86bdd3d878 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -84,6 +84,19 @@ export namespace Bus { ) } + // Shut down all PubSubs when the layer is torn down. + // This causes Stream.fromPubSub consumers to end, triggering + // their ensuring/finalizers. + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + log.info("shutting down PubSubs") + yield* PubSub.shutdown(wildcardPubSub) + for (const ps of pubsubs.values()) { + yield* PubSub.shutdown(ps) + } + }), + ) + return Service.of({ publish, subscribe, subscribeAll }) }), ) @@ -109,9 +122,12 @@ export namespace Bus { export function subscribeAll(callback: (event: any) => void) { const directory = Instance.directory - // InstanceDisposed is delivered via GlobalBus because the sync - // callback API can't wait for async layer acquisition. The Effect - // service's stream ending IS the disposal signal for Effect consumers. + // InstanceDisposed is delivered via GlobalBus because the legacy + // adapter's fiber starts asynchronously and may not be running when + // disposal happens. In the Effect-native path, forkScoped + scope + // closure handles this correctly. This bridge can be removed once + // upstream PubSub.shutdown properly wakes suspended subscribers: + // https://github.com/Effect-TS/effect-smol/issues/TBD const onDispose = (evt: { directory?: string; payload: any }) => { if (evt.payload.type !== InstanceDisposed.type) return if (evt.directory !== directory) return diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index fa3ec528f4..457b3518ee 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -1,4 +1,4 @@ -import { Effect, Layer, LayerMap, ServiceMap } from "effect" +import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect" import { Bus } from "@/bus" import { File } from "@/file" import { FileTime } from "@/file/time" @@ -59,7 +59,23 @@ export class Instances extends ServiceMap.Service Effect.runPromise(layerMap.invalidate(directory))) + + // Force-invalidate closes the RcMap entry scope even when refCount > 0. + // Standard RcMap.invalidate bails in that case, leaving long-running + // consumer fibers orphaned. This is an upstream issue: + // https://github.com/Effect-TS/effect-smol/pull/1799 + const forceInvalidate = (directory: string) => + Effect.gen(function* () { + const rcMap = layerMap.rcMap + if (rcMap.state._tag === "Closed") return + const entry = MutableHashMap.get(rcMap.state.map, directory) + if (entry._tag === "None") return + MutableHashMap.remove(rcMap.state.map, directory) + if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber) + yield* Scope.close(entry.value.scope, Exit.void) + }).pipe(Effect.uninterruptible) + + const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory))) yield* Effect.addFinalizer(() => Effect.sync(unregister)) return Instances.of(layerMap) }), diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index a8929aa1c9..2c154049a6 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -1,10 +1,10 @@ import { afterEach, describe, expect, test } from "bun:test" +import { Deferred, Effect, Stream } from "effect" import z from "zod" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" import { GlobalBus } from "../../src/bus/global" import { Instance } from "../../src/project/instance" -import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" // --------------------------------------------------------------------------- @@ -192,7 +192,7 @@ describe("Bus", () => { expect(all).toEqual(["test.ping"]) }) - test("subscribeAll delivers InstanceDisposed via GlobalBus on disposal", async () => { + test("subscribeAll delivers InstanceDisposed on disposal", async () => { await using tmp = await tmpdir() const all: string[] = [] @@ -200,10 +200,12 @@ describe("Bus", () => { Bus.subscribeAll((evt) => { all.push(evt.type) }) + await Bus.publish(TestEvent.Ping, { value: 1 }) }) await Instance.disposeAll() + expect(all).toContain("test.ping") expect(all).toContain(Bus.InstanceDisposed.type) }) @@ -279,30 +281,6 @@ describe("Bus", () => { }) }) - describe("instance disposal", () => { - test("InstanceDisposed is emitted to GlobalBus on disposal", async () => { - await using tmp = await tmpdir() - const globalEvents: Array<{ directory?: string; payload: any }> = [] - - const handler = (evt: any) => globalEvents.push(evt) - GlobalBus.on("event", handler) - - try { - await withInstance(tmp.path, async () => { - // Instance is active — subscribe so the layer gets created - Bus.subscribe(TestEvent.Ping, () => {}) - }) - - await Instance.disposeAll() - - const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed") - expect(disposed).toBeDefined() - expect(disposed!.payload.properties.directory).toBe(tmp.path) - } finally { - GlobalBus.off("event", handler) - } - }) - }) describe("async subscribers", () => { test("publish is fire-and-forget (does not await subscriber callbacks)", async () => { @@ -323,4 +301,72 @@ describe("Bus", () => { expect(received).toEqual([1]) }) }) + + describe("Effect service", () => { + test("subscribeAll stream receives published events", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + const done = yield* Deferred.make() + let count = 0 + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach((msg) => + Effect.gen(function* () { + received.push(msg.type) + if (++count >= 2) yield* Deferred.succeed(done, undefined) + }), + ), + ), + ) + + // Let the forked fiber start and subscribe to the PubSub + yield* Effect.yieldNow + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* svc.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done) + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(received).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll stream ends with ensuring when scope closes", async () => { + await using tmp = await tmpdir() + let ensuringFired = false + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach(() => Effect.void), + Effect.ensuring(Effect.sync(() => { + ensuringFired = true + })), + ), + ) + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* Effect.yieldNow + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(ensuringFired).toBe(true) + }) + }) })