diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f197..5a1c106f4c 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,12 +1,13 @@ import z from "zod" +import { Effect, Layer, PubSub, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" +import { runCallbackInstance, runPromiseInstance } from "../effect/runtime" export namespace Bus { const log = Log.create({ service: "bus" }) - type Subscription = (event: any) => void export const InstanceDisposed = BusEvent.define( "server.instance.disposed", @@ -15,91 +16,105 @@ export namespace Bus { }), ) - const state = Instance.state( - () => { - const subscriptions = new Map() + // --------------------------------------------------------------------------- + // Service definition + // --------------------------------------------------------------------------- - return { - subscriptions, + type Payload = { + type: D["type"] + properties: z.infer + } + + export interface Interface { + readonly publish: ( + def: D, + properties: z.output, + ) => Effect.Effect + readonly subscribe: (def: D) => Stream.Stream> + readonly subscribeAll: () => Stream.Stream + } + + export class Service extends ServiceMap.Service()("@opencode/Bus") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const pubsubs = new Map>() + const wildcardPubSub = yield* PubSub.unbounded() + + const getOrCreate = Effect.fnUntraced(function* (type: string) { + let ps = pubsubs.get(type) + if (!ps) { + ps = yield* PubSub.unbounded() + pubsubs.set(type, ps) + } + return ps + }) + + function publish( + def: D, + properties: z.output, + ) { + return Effect.gen(function* () { + const payload: Payload = { type: def.type, properties } + log.info("publishing", { type: def.type }) + + const ps = pubsubs.get(def.type) + if (ps) yield* PubSub.publish(ps, payload) + yield* PubSub.publish(wildcardPubSub, payload) + + GlobalBus.emit("event", { + directory: Instance.directory, + payload, + }) + }) } - }, - async (entry) => { - const wildcard = entry.subscriptions.get("*") - if (!wildcard) return - const event = { - type: InstanceDisposed.type, - properties: { - directory: Instance.directory, - }, + + function subscribe(def: D): Stream.Stream> { + log.info("subscribing", { type: def.type }) + return Stream.unwrap( + Effect.gen(function* () { + const ps = yield* getOrCreate(def.type) + return Stream.fromPubSub(ps) as Stream.Stream> + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) } - for (const sub of [...wildcard]) { - sub(event) + + function subscribeAll(): Stream.Stream { + log.info("subscribing", { type: "*" }) + return Stream.fromPubSub(wildcardPubSub).pipe( + Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))), + ) } - }, + + return Service.of({ publish, subscribe, subscribeAll }) + }), ) - export async function publish( - def: Definition, - properties: z.output, - ) { - const payload = { - type: def.type, - properties, - } - log.info("publishing", { - type: def.type, - }) - const pending = [] - for (const key of [def.type, "*"]) { - const match = state().subscriptions.get(key) - for (const sub of match ?? []) { - pending.push(sub(payload)) - } - } - GlobalBus.emit("event", { - directory: Instance.directory, - payload, - }) - return Promise.all(pending) + // --------------------------------------------------------------------------- + // Legacy adapters — plain function API wrapping the Effect service + // --------------------------------------------------------------------------- + + function runStream(stream: (svc: Interface) => Stream.Stream, callback: (event: any) => void) { + return runCallbackInstance( + Service.use((svc) => + stream(svc).pipe(Stream.runForEach((msg) => Effect.sync(() => callback(msg)))), + ), + ) } - export function subscribe( - def: Definition, - callback: (event: { type: Definition["type"]; properties: z.infer }) => void, - ) { - return raw(def.type, callback) + export function publish(def: D, properties: z.output) { + return runPromiseInstance(Service.use((svc) => svc.publish(def, properties))) } - export function once( - def: Definition, - callback: (event: { - type: Definition["type"] - properties: z.infer - }) => "done" | undefined, + export function subscribe( + def: D, + callback: (event: Payload) => void, ) { - const unsub = subscribe(def, (event) => { - if (callback(event)) unsub() - }) + return runStream((svc) => svc.subscribe(def), callback) } export function subscribeAll(callback: (event: any) => void) { - return raw("*", callback) - } - - function raw(type: string, callback: (event: any) => void) { - log.info("subscribing", { type }) - const subscriptions = state().subscriptions - let match = subscriptions.get(type) ?? [] - match.push(callback) - subscriptions.set(type, match) - - return () => { - log.info("unsubscribing", { type }) - const match = subscriptions.get(type) - if (!match) return - const index = match.indexOf(callback) - if (index === -1) return - match.splice(index, 1) - } + return runStream((svc) => svc.subscribeAll(), callback) } } diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index c05458d5df..fa3ec528f4 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -1,4 +1,5 @@ import { Effect, Layer, LayerMap, ServiceMap } from "effect" +import { Bus } from "@/bus" import { File } from "@/file" import { FileTime } from "@/file/time" import { FileWatcher } from "@/file/watcher" @@ -16,6 +17,7 @@ import { registerDisposer } from "./instance-registry" export { InstanceContext } from "./instance-context" export type InstanceServices = + | Bus.Service | Question.Service | PermissionNext.Service | ProviderAuth.Service @@ -36,6 +38,7 @@ export type InstanceServices = function lookup(_key: string) { const ctx = Layer.sync(InstanceContext, () => InstanceContext.of(Instance.current)) return Layer.mergeAll( + Layer.fresh(Bus.layer), Layer.fresh(Question.layer), Layer.fresh(PermissionNext.layer), Layer.fresh(ProviderAuth.defaultLayer), diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index f52203b222..4465b106f4 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -18,6 +18,12 @@ export function runPromiseInstance(effect: Effect.Effect( + effect: Effect.Effect, +): (interruptor?: number) => void { + return runtime.runCallback(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function disposeRuntime() { return runtime.dispose() } diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts index 64a21e7992..b3da0fc9ea 100644 --- a/packages/opencode/test/bus/bus.test.ts +++ b/packages/opencode/test/bus/bus.test.ts @@ -192,43 +192,6 @@ describe("Bus", () => { }) }) - describe("once", () => { - test("fires once when callback returns 'done'", async () => { - await using tmp = await tmpdir() - const received: number[] = [] - - await withInstance(tmp.path, async () => { - Bus.once(TestEvent.Ping, (evt) => { - received.push(evt.properties.value) - return "done" - }) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bus.publish(TestEvent.Ping, { value: 2 }) - }) - - expect(received).toEqual([1]) - }) - - test("keeps listening when callback returns undefined", async () => { - await using tmp = await tmpdir() - const received: number[] = [] - - await withInstance(tmp.path, async () => { - Bus.once(TestEvent.Ping, (evt) => { - received.push(evt.properties.value) - if (evt.properties.value === 3) return "done" - return undefined - }) - await Bus.publish(TestEvent.Ping, { value: 1 }) - await Bus.publish(TestEvent.Ping, { value: 2 }) - await Bus.publish(TestEvent.Ping, { value: 3 }) - await Bus.publish(TestEvent.Ping, { value: 4 }) - }) - - expect(received).toEqual([1, 2, 3]) - }) - }) - describe("GlobalBus forwarding", () => { test("publish emits to GlobalBus with directory", async () => { await using tmp = await tmpdir() @@ -284,37 +247,47 @@ describe("Bus", () => { }) describe("instance disposal", () => { - test("wildcard subscribers receive InstanceDisposed on disposal", async () => { + test("InstanceDisposed is emitted to GlobalBus on disposal", async () => { await using tmp = await tmpdir() - const events: Array<{ type: string }> = [] + const globalEvents: Array<{ directory?: string; payload: any }> = [] - await withInstance(tmp.path, async () => { - Bus.subscribeAll((evt) => events.push({ type: evt.type })) - }) + const handler = (evt: any) => globalEvents.push(evt) + GlobalBus.on("event", handler) - await Instance.disposeAll() + try { + await withInstance(tmp.path, async () => { + // Instance is active — subscribe so the layer gets created + Bus.subscribe(TestEvent.Ping, () => {}) + }) - const disposed = events.find((e) => e.type === "server.instance.disposed") - expect(disposed).toBeDefined() + await Instance.disposeAll() + + const disposed = globalEvents.find((e) => e.payload.type === "server.instance.disposed") + expect(disposed).toBeDefined() + expect(disposed!.payload.properties.directory).toBe(tmp.path) + } finally { + GlobalBus.off("event", handler) + } }) }) describe("async subscribers", () => { - test("publish awaits async subscriber promises", async () => { + test("publish is fire-and-forget (does not await subscriber callbacks)", async () => { await using tmp = await tmpdir() - const order: string[] = [] + const received: number[] = [] await withInstance(tmp.path, async () => { - Bus.subscribe(TestEvent.Ping, async () => { + Bus.subscribe(TestEvent.Ping, async (evt) => { await new Promise((r) => setTimeout(r, 10)) - order.push("async-done") + received.push(evt.properties.value) }) await Bus.publish(TestEvent.Ping, { value: 1 }) - order.push("after-publish") + // Give the async subscriber time to complete + await new Promise((r) => setTimeout(r, 50)) }) - expect(order).toEqual(["async-done", "after-publish"]) + expect(received).toEqual([1]) }) }) })