From 645c15351bbecd8ce72bf3f0719c8fd4d1c47631 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 18 Mar 2026 21:05:36 -0400 Subject: [PATCH] test(bus): add comprehensive test suite for Bus service Covers publish/subscribe, multiple subscribers, unsubscribe, subscribeAll, once, GlobalBus forwarding, instance isolation, disposal, and async subscribers. --- packages/opencode/test/bus/bus.test.ts | 320 +++++++++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 packages/opencode/test/bus/bus.test.ts diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts new file mode 100644 index 0000000000..64a21e7992 --- /dev/null +++ b/packages/opencode/test/bus/bus.test.ts @@ -0,0 +1,320 @@ +import { afterEach, describe, expect, test } from "bun:test" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { GlobalBus } from "../../src/bus/global" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +// --------------------------------------------------------------------------- +// Test event definitions +// --------------------------------------------------------------------------- + +const TestEvent = { + Ping: BusEvent.define("test.ping", z.object({ value: z.number() })), + Pong: BusEvent.define("test.pong", z.object({ message: z.string() })), +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function withInstance(directory: string, fn: () => Promise) { + return Instance.provide({ directory, fn }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("Bus", () => { + afterEach(() => Instance.disposeAll()) + + describe("publish + subscribe", () => { + test("subscriber receives matching events", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 42 }) + await Bus.publish(TestEvent.Ping, { value: 99 }) + }) + + expect(received).toEqual([42, 99]) + }) + + test("subscriber does not receive events of other types", async () => { + await using tmp = await tmpdir() + const pings: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + pings.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Pong, { message: "hello" }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + expect(pings).toEqual([1]) + }) + + test("publish with no subscribers does not throw", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + }) + }) + + describe("multiple subscribers", () => { + test("all subscribers for same event type are called", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value)) + Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value)) + await Bus.publish(TestEvent.Ping, { value: 7 }) + }) + + expect(a).toEqual([7]) + expect(b).toEqual([7]) + }) + + test("subscribers are called in registration order", async () => { + await using tmp = await tmpdir() + const order: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, () => order.push("first")) + Bus.subscribe(TestEvent.Ping, () => order.push("second")) + Bus.subscribe(TestEvent.Ping, () => order.push("third")) + await Bus.publish(TestEvent.Ping, { value: 0 }) + }) + + expect(order).toEqual(["first", "second", "third"]) + }) + }) + + describe("unsubscribe", () => { + test("unsubscribe stops delivery", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsub() + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(received).toEqual([1]) + }) + + test("unsubscribe is idempotent", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, () => {}) + unsub() + unsub() // should not throw + }) + }) + + test("unsubscribing one does not affect others", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + const unsubA = Bus.subscribe(TestEvent.Ping, (evt) => a.push(evt.properties.value)) + Bus.subscribe(TestEvent.Ping, (evt) => b.push(evt.properties.value)) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsubA() + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(a).toEqual([1]) + expect(b).toEqual([1, 2]) + }) + }) + + describe("subscribeAll", () => { + test("receives events of all types", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + all.push(evt.type) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Pong, { message: "hi" }) + }) + + expect(all).toEqual(["test.ping", "test.pong"]) + }) + + test("subscribeAll + typed subscribe both fire", async () => { + await using tmp = await tmpdir() + const typed: number[] = [] + const wild: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => typed.push(evt.properties.value)) + Bus.subscribeAll((evt) => wild.push(evt.type)) + await Bus.publish(TestEvent.Ping, { value: 5 }) + }) + + expect(typed).toEqual([5]) + expect(wild).toEqual(["test.ping"]) + }) + + test("unsubscribe from subscribeAll", async () => { + await using tmp = await tmpdir() + const all: string[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribeAll((evt) => all.push(evt.type)) + await Bus.publish(TestEvent.Ping, { value: 1 }) + unsub() + await Bus.publish(TestEvent.Pong, { message: "missed" }) + }) + + expect(all).toEqual(["test.ping"]) + }) + }) + + describe("once", () => { + test("fires once when callback returns 'done'", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.once(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + return "done" + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(received).toEqual([1]) + }) + + test("keeps listening when callback returns undefined", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.once(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + if (evt.properties.value === 3) return "done" + return undefined + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Ping, { value: 2 }) + await Bus.publish(TestEvent.Ping, { value: 3 }) + await Bus.publish(TestEvent.Ping, { value: 4 }) + }) + + expect(received).toEqual([1, 2, 3]) + }) + }) + + describe("GlobalBus forwarding", () => { + test("publish emits to GlobalBus with directory", async () => { + await using tmp = await tmpdir() + const globalEvents: Array<{ directory?: string; payload: any }> = [] + + const handler = (evt: any) => globalEvents.push(evt) + GlobalBus.on("event", handler) + + try { + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 42 }) + }) + + const ping = globalEvents.find((e) => e.payload.type === "test.ping") + expect(ping).toBeDefined() + expect(ping!.directory).toBe(tmp.path) + expect(ping!.payload).toEqual({ + type: "test.ping", + properties: { value: 42 }, + }) + } finally { + GlobalBus.off("event", handler) + } + }) + }) + + describe("instance isolation", () => { + test("subscribers in one instance do not receive events from another", async () => { + await using tmpA = await tmpdir() + await using tmpB = await tmpdir() + const eventsA: number[] = [] + const eventsB: number[] = [] + + await withInstance(tmpA.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => eventsA.push(evt.properties.value)) + }) + + await withInstance(tmpB.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => eventsB.push(evt.properties.value)) + }) + + await withInstance(tmpA.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + + await withInstance(tmpB.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 2 }) + }) + + expect(eventsA).toEqual([1]) + expect(eventsB).toEqual([2]) + }) + }) + + describe("instance disposal", () => { + test("wildcard subscribers receive InstanceDisposed on disposal", async () => { + await using tmp = await tmpdir() + const events: Array<{ type: string }> = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => events.push({ type: evt.type })) + }) + + await Instance.disposeAll() + + const disposed = events.find((e) => e.type === "server.instance.disposed") + expect(disposed).toBeDefined() + }) + }) + + describe("async subscribers", () => { + test("publish awaits async subscriber promises", async () => { + await using tmp = await tmpdir() + const order: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, async () => { + await new Promise((r) => setTimeout(r, 10)) + order.push("async-done") + }) + + await Bus.publish(TestEvent.Ping, { value: 1 }) + order.push("after-publish") + }) + + expect(order).toEqual(["async-done", "after-publish"]) + }) + }) +})