fix(bus): use GlobalBus for InstanceDisposed in legacy subscribeAll

The sync callback API can't wait for async layer acquisition, so
delivering InstanceDisposed through the PubSub stream is a race
condition. Instead, the legacy subscribeAll adapter listens on
GlobalBus for InstanceDisposed matching the current directory.

The Effect service's stream ending IS the disposal signal for
Effect consumers — this is only needed for the legacy callback API.

Also reverts forceInvalidate, fiber tracking, priority-based
disposal, and other workaround attempts. Clean simple solution.
kit/effect-bus
Kit Langton 2026-03-19 09:40:39 -04:00
parent 0c2b5b2c39
commit 992f4f794a
4 changed files with 54 additions and 52 deletions

View File

@ -1,11 +1,10 @@
import z from "zod"
import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { registerDisposer } from "../effect/instance-registry"
import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime"
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
export namespace Bus {
const log = Log.create({ service: "bus" })
@ -109,33 +108,22 @@ export namespace Bus {
export function subscribeAll(callback: (event: any) => void) {
const directory = Instance.directory
let manualUnsub = false
const fiber = forkInstance(
Service.use((svc) =>
svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))),
).pipe(
Effect.ensuring(
Effect.sync(() => {
if (!manualUnsub) {
callback({ type: InstanceDisposed.type, properties: { directory } })
}
}),
),
),
)
// Interrupt the fiber before the layer is invalidated, awaiting
// completion so the refCount drops and the scope can close.
const unregister = registerDisposer(
(dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))),
-1,
)
// InstanceDisposed is delivered via GlobalBus because the sync
// callback API can't wait for async layer acquisition. The Effect
// service's stream ending IS the disposal signal for Effect consumers.
const onDispose = (evt: { directory?: string; payload: any }) => {
if (evt.payload.type !== InstanceDisposed.type) return
if (evt.directory !== directory) return
callback(evt.payload)
GlobalBus.off("event", onDispose)
}
GlobalBus.on("event", onDispose)
const interrupt = runStream((svc) => svc.subscribeAll(), callback)
return () => {
manualUnsub = true
unregister()
fiber.interruptUnsafe()
GlobalBus.off("event", onDispose)
interrupt()
}
}
}

View File

@ -1,25 +1,12 @@
const disposers = new Set<{
fn: (directory: string) => Promise<void>
priority: number
}>()
const disposers = new Set<(directory: string) => Promise<void>>()
export function registerDisposer(disposer: (directory: string) => Promise<void>, priority = 0) {
const item = {
fn: disposer,
priority,
}
disposers.add(item)
export function registerDisposer(disposer: (directory: string) => Promise<void>) {
disposers.add(disposer)
return () => {
disposers.delete(item)
disposers.delete(disposer)
}
}
export async function disposeInstance(directory: string) {
const list = [...disposers].sort((a, b) => a.priority - b.priority)
const seen = new Set<number>()
for (const item of list) {
if (seen.has(item.priority)) continue
seen.add(item.priority)
await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory)))
}
await Promise.allSettled([...disposers].map((disposer) => disposer(directory)))
}

View File

@ -1,4 +1,4 @@
import { Effect, Fiber, Layer, ManagedRuntime } from "effect"
import { Effect, Layer, ManagedRuntime } from "effect"
import { AccountEffect } from "@/account/effect"
import { AuthEffect } from "@/auth/effect"
import { Instances } from "@/effect/instances"
@ -18,12 +18,6 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function forkInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): Fiber.Fiber<A, E> {
return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runCallbackInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): (interruptor?: number) => void {

View File

@ -4,6 +4,7 @@ import { Bus } from "../../src/bus"
import { BusEvent } from "../../src/bus/bus-event"
import { GlobalBus } from "../../src/bus/global"
import { Instance } from "../../src/project/instance"
import { Log } from "../../src/util/log"
import { tmpdir } from "../fixture/fixture"
// ---------------------------------------------------------------------------
@ -190,6 +191,38 @@ describe("Bus", () => {
expect(all).toEqual(["test.ping"])
})
test("subscribeAll delivers InstanceDisposed via GlobalBus on disposal", async () => {
await using tmp = await tmpdir()
const all: string[] = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => {
all.push(evt.type)
})
})
await Instance.disposeAll()
expect(all).toContain(Bus.InstanceDisposed.type)
})
test("manual unsubscribe suppresses InstanceDisposed", async () => {
await using tmp = await tmpdir()
const all: string[] = []
let unsub = () => {}
await withInstance(tmp.path, async () => {
unsub = Bus.subscribeAll((evt) => {
all.push(evt.type)
})
})
unsub()
await Instance.disposeAll()
expect(all).not.toContain(Bus.InstanceDisposed.type)
})
})
describe("GlobalBus forwarding", () => {