From 81f71c9b30cd2814096b174d2b702245637bd488 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 12:17:39 -0400 Subject: [PATCH] fix(bus): GlobalBus bridge for InstanceDisposed + forceInvalidate + Effect tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Legacy subscribeAll delivers InstanceDisposed via GlobalBus because the fiber starts asynchronously and may not be running when disposal happens. This bridge can be removed once upstream PubSub.shutdown properly wakes suspended subscribers. Add forceInvalidate in Instances that closes the RcMap entry scope regardless of refCount. Standard RcMap.invalidate bails when refCount > 0 — an upstream issue (Effect-TS/effect-smol#1799). Add PubSub shutdown finalizer to Bus layer so layer teardown properly cleans up PubSubs. Add Effect-native tests proving forkScoped + scope closure works correctly: ensuring fires when the scope closes, streams receive published events. Remove stale GlobalBus disposal test (instance.ts responsibility). --- packages/opencode/src/bus/index.ts | 22 ++++- packages/opencode/src/effect/instances.ts | 20 ++++- packages/opencode/test/bus/bus.test.ts | 98 +++++++++++++++++------ 3 files changed, 109 insertions(+), 31 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 49b2a2335d..86bdd3d878 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -84,6 +84,19 @@ export namespace Bus { ) } + // Shut down all PubSubs when the layer is torn down. + // This causes Stream.fromPubSub consumers to end, triggering + // their ensuring/finalizers. + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + log.info("shutting down PubSubs") + yield* PubSub.shutdown(wildcardPubSub) + for (const ps of pubsubs.values()) { + yield* PubSub.shutdown(ps) + } + }), + ) + return Service.of({ publish, subscribe, subscribeAll }) }), ) @@ -109,9 +122,12 @@ export namespace Bus { export function subscribeAll(callback: (event: any) => void) { const directory = Instance.directory - // InstanceDisposed is delivered via GlobalBus because the sync - // callback API can't wait for async layer acquisition. The Effect - // service's stream ending IS the disposal signal for Effect consumers. + // InstanceDisposed is delivered via GlobalBus because the legacy + // adapter's fiber starts asynchronously and may not be running when + // disposal happens. In the Effect-native path, forkScoped + scope + // closure handles this correctly. This bridge can be removed once + // upstream PubSub.shutdown properly wakes suspended subscribers: + // https://github.com/Effect-TS/effect-smol/issues/TBD const onDispose = (evt: { directory?: string; payload: any }) => { if (evt.payload.type !== InstanceDisposed.type) return if (evt.directory !== directory) return diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index fa3ec528f4..457b3518ee 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -1,4 +1,4 @@ -import { Effect, Layer, LayerMap, ServiceMap } from "effect" +import { Effect, Exit, Fiber, Layer, LayerMap, MutableHashMap, Scope, ServiceMap } from "effect" import { Bus } from "@/bus" import { File } from "@/file" import { FileTime } from "@/file/time" @@ -59,7 +59,23 @@ export class Instances extends ServiceMap.Service Effect.runPromise(layerMap.invalidate(directory))) + + // Force-invalidate closes the RcMap entry scope even when refCount > 0. + // Standard RcMap.invalidate bails in that case, leaving long-running + // consumer fibers orphaned. This is an upstream issue: + // https://github.com/Effect-TS/effect-smol/pull/1799 + const forceInvalidate = (directory: string) => + Effect.gen(function* () { + const rcMap = layerMap.rcMap + if (rcMap.state._tag === "Closed") return + const entry = MutableHashMap.get(rcMap.state.map, directory) + if (entry._tag === "None") return + MutableHashMap.remove(rcMap.state.map, directory) + if (entry.value.fiber) yield* Fiber.interrupt(entry.value.fiber) + yield* Scope.close(entry.value.scope, Exit.void) + }).pipe(Effect.uninterruptible) + + const unregister = registerDisposer((directory) => Effect.runPromise(forceInvalidate(directory))) yield* Effect.addFinalizer(() => Effect.sync(unregister)) return Instances.of(layerMap) }), diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index a8929aa1c9..2c154049a6 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -1,10 +1,10 @@ import { afterEach, describe, expect, test } from "bun:test" +import { Deferred, Effect, Stream } from "effect" import z from "zod" import { Bus } from "../../src/bus" import { BusEvent } from "../../src/bus/bus-event" import { GlobalBus } from "../../src/bus/global" import { Instance } from "../../src/project/instance" -import { Log } from "../../src/util/log" import { tmpdir } from "../fixture/fixture" // --------------------------------------------------------------------------- @@ -192,7 +192,7 @@ describe("Bus", () => { expect(all).toEqual(["test.ping"]) }) - test("subscribeAll delivers InstanceDisposed via GlobalBus on disposal", async () => { + test("subscribeAll delivers InstanceDisposed on disposal", async () => { await using tmp = await tmpdir() const all: string[] = [] @@ -200,10 +200,12 @@ describe("Bus", () => { Bus.subscribeAll((evt) => { all.push(evt.type) }) + await Bus.publish(TestEvent.Ping, { value: 1 }) }) await Instance.disposeAll() + expect(all).toContain("test.ping") expect(all).toContain(Bus.InstanceDisposed.type) }) @@ -279,30 +281,6 @@ describe("Bus", () => { }) }) - describe("instance disposal", () => { - test("InstanceDisposed is emitted to GlobalBus on disposal", async () => { - await using tmp = await tmpdir() - const globalEvents: Array<{ directory?: string; payload: any }> = [] - - const handler = (evt: any) => globalEvents.push(evt) - GlobalBus.on("event", handler) - - try { - await withInstance(tmp.path, async () => { - // Instance is active — subscribe so the layer gets created - Bus.subscribe(TestEvent.Ping, () => {}) - }) - - await Instance.disposeAll() - - const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed") - expect(disposed).toBeDefined() - expect(disposed!.payload.properties.directory).toBe(tmp.path) - } finally { - GlobalBus.off("event", handler) - } - }) - }) describe("async subscribers", () => { test("publish is fire-and-forget (does not await subscriber callbacks)", async () => { @@ -323,4 +301,72 @@ describe("Bus", () => { expect(received).toEqual([1]) }) }) + + describe("Effect service", () => { + test("subscribeAll stream receives published events", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + const done = yield* Deferred.make() + let count = 0 + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach((msg) => + Effect.gen(function* () { + received.push(msg.type) + if (++count >= 2) yield* Deferred.succeed(done, undefined) + }), + ), + ), + ) + + // Let the forked fiber start and subscribe to the PubSub + yield* Effect.yieldNow + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* svc.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done) + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(received).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll stream ends with ensuring when scope closes", async () => { + await using tmp = await tmpdir() + let ensuringFired = false + + await withInstance(tmp.path, () => + Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const svc = yield* Bus.Service + + yield* Effect.forkScoped( + svc.subscribeAll().pipe( + Stream.runForEach(() => Effect.void), + Effect.ensuring(Effect.sync(() => { + ensuringFired = true + })), + ), + ) + + yield* svc.publish(TestEvent.Ping, { value: 1 }) + yield* Effect.yieldNow + }), + ).pipe(Effect.provide(Bus.layer)), + ), + ) + + expect(ensuringFired).toBe(true) + }) + }) })