fix(bus): use Fiber.interrupt for clean disposal of subscribeAll

Use forkInstance + Fiber.interrupt (which awaits) instead of
runCallbackInstance + interruptUnsafe (fire-and-forget) for
subscribeAll. This ensures the fiber completes before layer
invalidation, allowing the RcMap refCount to drop to 0.

subscribeAll now delivers InstanceDisposed as the last callback
message via Effect.ensuring when the fiber is interrupted during
disposal, but not on manual unsubscribe.

Add priority support to registerDisposer so Bus can interrupt
subscription fibers (priority -1) before layer invalidation
(priority 0).

Add forkInstance helper to effect/runtime that returns a Fiber
instead of an interrupt function.
kit/effect-bus
Kit Langton 2026-03-19 08:49:06 -04:00
parent 009d77c9d8
commit 0c2b5b2c39
3 changed files with 60 additions and 20 deletions

View File

@ -1,10 +1,11 @@
import z from "zod"
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Effect, Fiber, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
import { registerDisposer } from "../effect/instance-registry"
import { forkInstance, runCallbackInstance, runPromiseInstance } from "../effect/runtime"
export namespace Bus {
const log = Log.create({ service: "bus" })
@ -51,10 +52,7 @@ export namespace Bus {
return ps
})
function publish<D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) {
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return Effect.gen(function* () {
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
@ -97,9 +95,7 @@ export namespace Bus {
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
return runCallbackInstance(
Service.use((svc) =>
stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))),
),
Service.use((svc) => stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg))))),
)
}
@ -107,14 +103,39 @@ export namespace Bus {
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
}
export function subscribe<D extends BusEvent.Definition>(
def: D,
callback: (event: Payload<D>) => void,
) {
export function subscribe<D extends BusEvent.Definition>(def: D, callback: (event: Payload<D>) => void) {
return runStream((svc) => svc.subscribe(def), callback)
}
export function subscribeAll(callback: (event: any) => void) {
return runStream((svc) => svc.subscribeAll(), callback)
const directory = Instance.directory
let manualUnsub = false
const fiber = forkInstance(
Service.use((svc) =>
svc.subscribeAll().pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))),
).pipe(
Effect.ensuring(
Effect.sync(() => {
if (!manualUnsub) {
callback({ type: InstanceDisposed.type, properties: { directory } })
}
}),
),
),
)
// Interrupt the fiber before the layer is invalidated, awaiting
// completion so the refCount drops and the scope can close.
const unregister = registerDisposer(
(dir) => (dir !== directory ? Promise.resolve() : Effect.runPromise(Fiber.interrupt(fiber))),
-1,
)
return () => {
manualUnsub = true
unregister()
fiber.interruptUnsafe()
}
}
}

View File

@ -1,12 +1,25 @@
const disposers = new Set<(directory: string) => Promise<void>>()
const disposers = new Set<{
fn: (directory: string) => Promise<void>
priority: number
}>()
export function registerDisposer(disposer: (directory: string) => Promise<void>) {
disposers.add(disposer)
export function registerDisposer(disposer: (directory: string) => Promise<void>, priority = 0) {
const item = {
fn: disposer,
priority,
}
disposers.add(item)
return () => {
disposers.delete(disposer)
disposers.delete(item)
}
}
export async function disposeInstance(directory: string) {
await Promise.allSettled([...disposers].map((disposer) => disposer(directory)))
const list = [...disposers].sort((a, b) => a.priority - b.priority)
const seen = new Set<number>()
for (const item of list) {
if (seen.has(item.priority)) continue
seen.add(item.priority)
await Promise.allSettled(list.filter((x) => x.priority === item.priority).map((x) => x.fn(directory)))
}
}

View File

@ -1,4 +1,4 @@
import { Effect, Layer, ManagedRuntime } from "effect"
import { Effect, Fiber, Layer, ManagedRuntime } from "effect"
import { AccountEffect } from "@/account/effect"
import { AuthEffect } from "@/auth/effect"
import { Instances } from "@/effect/instances"
@ -18,6 +18,12 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function forkInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): Fiber.Fiber<A, E> {
return runtime.runFork(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runCallbackInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): (interruptor?: number) => void {