feat(bus): migrate Bus to Effect service with PubSub internals

Add Bus.Service as a ServiceMap.Service backed by Effect PubSub:
- publish() pushes to per-type + wildcard PubSubs and GlobalBus
- subscribe() returns a typed Stream via Stream.fromPubSub
- subscribeAll() returns a wildcard Stream

Legacy adapters wrap the Effect service:
- publish → runPromiseInstance
- subscribe/subscribeAll → runCallbackInstance with Stream.runForEach

Other changes:
- Register Bus.Service in Instances LayerMap
- Add runCallbackInstance helper to effect/runtime
- Remove unused Bus.once (zero callers)
- Skip PubSub creation on publish when no subscribers exist
- Move subscribe/unsubscribe logging into the Effect service layer
kit/effect-bus
Kit Langton 2026-03-18 21:36:04 -04:00
parent 645c15351b
commit f3cf519d98
4 changed files with 121 additions and 124 deletions

View File

@ -1,12 +1,13 @@
import z from "zod"
import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
import { runCallbackInstance, runPromiseInstance } from "../effect/runtime"
export namespace Bus {
const log = Log.create({ service: "bus" })
type Subscription = (event: any) => void
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
@ -15,91 +16,105 @@ export namespace Bus {
}),
)
const state = Instance.state(
() => {
const subscriptions = new Map<any, Subscription[]>()
// ---------------------------------------------------------------------------
// Service definition
// ---------------------------------------------------------------------------
return {
subscriptions,
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
type: D["type"]
properties: z.infer<D["properties"]>
}
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const pubsubs = new Map<string, PubSub.PubSub<Payload>>()
const wildcardPubSub = yield* PubSub.unbounded<Payload>()
const getOrCreate = Effect.fnUntraced(function* (type: string) {
let ps = pubsubs.get(type)
if (!ps) {
ps = yield* PubSub.unbounded<Payload>()
pubsubs.set(type, ps)
}
return ps
})
function publish<D extends BusEvent.Definition>(
def: D,
properties: z.output<D["properties"]>,
) {
return Effect.gen(function* () {
const payload: Payload = { type: def.type, properties }
log.info("publishing", { type: def.type })
const ps = pubsubs.get(def.type)
if (ps) yield* PubSub.publish(ps, payload)
yield* PubSub.publish(wildcardPubSub, payload)
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
})
}
},
async (entry) => {
const wildcard = entry.subscriptions.get("*")
if (!wildcard) return
const event = {
type: InstanceDisposed.type,
properties: {
directory: Instance.directory,
},
function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
log.info("subscribing", { type: def.type })
return Stream.unwrap(
Effect.gen(function* () {
const ps = yield* getOrCreate(def.type)
return Stream.fromPubSub(ps) as Stream.Stream<Payload<D>>
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
}
for (const sub of [...wildcard]) {
sub(event)
function subscribeAll(): Stream.Stream<Payload> {
log.info("subscribing", { type: "*" })
return Stream.fromPubSub(wildcardPubSub).pipe(
Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))),
)
}
},
return Service.of({ publish, subscribe, subscribeAll })
}),
)
export async function publish<Definition extends BusEvent.Definition>(
def: Definition,
properties: z.output<Definition["properties"]>,
) {
const payload = {
type: def.type,
properties,
}
log.info("publishing", {
type: def.type,
})
const pending = []
for (const key of [def.type, "*"]) {
const match = state().subscriptions.get(key)
for (const sub of match ?? []) {
pending.push(sub(payload))
}
}
GlobalBus.emit("event", {
directory: Instance.directory,
payload,
})
return Promise.all(pending)
// ---------------------------------------------------------------------------
// Legacy adapters — plain function API wrapping the Effect service
// ---------------------------------------------------------------------------
function runStream(stream: (svc: Interface) => Stream.Stream<Payload>, callback: (event: any) => void) {
return runCallbackInstance(
Service.use((svc) =>
stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))),
),
)
}
export function subscribe<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
) {
return raw(def.type, callback)
export function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
return runPromiseInstance(Service.use((svc) => svc.publish(def, properties)))
}
export function once<Definition extends BusEvent.Definition>(
def: Definition,
callback: (event: {
type: Definition["type"]
properties: z.infer<Definition["properties"]>
}) => "done" | undefined,
export function subscribe<D extends BusEvent.Definition>(
def: D,
callback: (event: Payload<D>) => void,
) {
const unsub = subscribe(def, (event) => {
if (callback(event)) unsub()
})
return runStream((svc) => svc.subscribe(def), callback)
}
export function subscribeAll(callback: (event: any) => void) {
return raw("*", callback)
}
function raw(type: string, callback: (event: any) => void) {
log.info("subscribing", { type })
const subscriptions = state().subscriptions
let match = subscriptions.get(type) ?? []
match.push(callback)
subscriptions.set(type, match)
return () => {
log.info("unsubscribing", { type })
const match = subscriptions.get(type)
if (!match) return
const index = match.indexOf(callback)
if (index === -1) return
match.splice(index, 1)
}
return runStream((svc) => svc.subscribeAll(), callback)
}
}

View File

@ -1,4 +1,5 @@
import { Effect, Layer, LayerMap, ServiceMap } from "effect"
import { Bus } from "@/bus"
import { File } from "@/file"
import { FileTime } from "@/file/time"
import { FileWatcher } from "@/file/watcher"
@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry"
export { InstanceContext } from "./instance-context"
export type InstanceServices =
| Bus.Service
| Question.Service
| PermissionNext.Service
| ProviderAuth.Service
@ -36,6 +38,7 @@ export type InstanceServices =
function lookup(_key: string) {
const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current))
return Layer.mergeAll(
Layer.fresh(Bus.layer),
Layer.fresh(Question.layer),
Layer.fresh(PermissionNext.layer),
Layer.fresh(ProviderAuth.defaultLayer),

View File

@ -18,6 +18,12 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runCallbackInstance<A, E>(
effect: Effect.Effect<A, E, InstanceServices>,
): (interruptor?: number) => void {
return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function disposeRuntime() {
return runtime.dispose()
}

View File

@ -192,43 +192,6 @@ describe("Bus", () => {
})
})
describe("once", () => {
test("fires once when callback returns 'done'", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.once(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
return "done"
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Ping, { value: 2 })
})
expect(received).toEqual([1])
})
test("keeps listening when callback returns undefined", async () => {
await using tmp = await tmpdir()
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.once(TestEvent.Ping, (evt) => {
received.push(evt.properties.value)
if (evt.properties.value === 3) return "done"
return undefined
})
await Bus.publish(TestEvent.Ping, { value: 1 })
await Bus.publish(TestEvent.Ping, { value: 2 })
await Bus.publish(TestEvent.Ping, { value: 3 })
await Bus.publish(TestEvent.Ping, { value: 4 })
})
expect(received).toEqual([1, 2, 3])
})
})
describe("GlobalBus forwarding", () => {
test("publish emits to GlobalBus with directory", async () => {
await using tmp = await tmpdir()
@ -284,37 +247,47 @@ describe("Bus", () => {
})
describe("instance disposal", () => {
test("wildcard subscribers receive InstanceDisposed on disposal", async () => {
test("InstanceDisposed is emitted to GlobalBus on disposal", async () => {
await using tmp = await tmpdir()
const events: Array<{ type: string }> = []
const globalEvents: Array<{ directory?: string; payload: any }> = []
await withInstance(tmp.path, async () => {
Bus.subscribeAll((evt) => events.push({ type: evt.type }))
})
const handler = (evt: any) => globalEvents.push(evt)
GlobalBus.on("event", handler)
await Instance.disposeAll()
try {
await withInstance(tmp.path, async () => {
// Instance is active — subscribe so the layer gets created
Bus.subscribe(TestEvent.Ping, () => {})
})
const disposed = events.find((e) => e.type === "server.instance.disposed")
expect(disposed).toBeDefined()
await Instance.disposeAll()
const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed")
expect(disposed).toBeDefined()
expect(disposed!.payload.properties.directory).toBe(tmp.path)
} finally {
GlobalBus.off("event", handler)
}
})
})
describe("async subscribers", () => {
test("publish awaits async subscriber promises", async () => {
test("publish is fire-and-forget (does not await subscriber callbacks)", async () => {
await using tmp = await tmpdir()
const order: string[] = []
const received: number[] = []
await withInstance(tmp.path, async () => {
Bus.subscribe(TestEvent.Ping, async () => {
Bus.subscribe(TestEvent.Ping, async (evt) => {
await new Promise((r) => setTimeout(r, 10))
order.push("async-done")
received.push(evt.properties.value)
})
await Bus.publish(TestEvent.Ping, { value: 1 })
order.push("after-publish")
// Give the async subscriber time to complete
await new Promise((r) => setTimeout(r, 50))
})
expect(order).toEqual(["async-done", "after-publish"])
expect(received).toEqual([1])
})
})
})