From a2622254c9c20629c9f0c4387ec6fd5c20e7d347 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Wed, 25 Mar 2026 19:36:42 -0400 Subject: [PATCH] tui: bypass local SSE event streaming in worker --- packages/opencode/src/cli/cmd/tui/worker.ts | 70 ++++++++++++--------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index e63f10ba80..de84f6c079 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -6,8 +6,9 @@ import { InstanceBootstrap } from "@/project/bootstrap" import { Rpc } from "@/util/rpc" import { upgrade } from "@/cli/upgrade" import { Config } from "@/config/config" +import { Bus } from "@/bus" import { GlobalBus } from "@/bus/global" -import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2" +import type { Event } from "@opencode-ai/sdk/v2" import type { BunWebSocketData } from "hono/bun" import { Flag } from "@/flag/flag" @@ -49,38 +50,49 @@ const startEventStream = (directory: string) => { eventStream.abort = abort const signal = abort.signal - const fetchFn = (async (input: RequestInfo | URL, init?: RequestInit) => { - const request = new Request(input, init) - const auth = getAuthorizationHeader() - if (auth) request.headers.set("Authorization", auth) - return Server.App().fetch(request) - }) as typeof globalThis.fetch - - const sdk = createOpencodeClient({ - baseUrl: "http://opencode.internal", - directory, - fetch: fetchFn, - signal, - }) - ;(async () => { while (!signal.aborted) { - const events = await Promise.resolve( - sdk.event.subscribe( - {}, - { - signal, - }, - ), - ).catch(() => undefined) + const shouldReconnect = await Instance.provide({ + directory, + init: InstanceBootstrap, + fn: () => + new Promise((resolve) => { + Rpc.emit("event", { + type: "server.connected", + properties: {}, + } satisfies Event) - if (!events) { - await Bun.sleep(250) - continue - } + let settled = false + const settle = (value: boolean) => { + if (settled) return + settled = true + signal.removeEventListener("abort", onAbort) + unsub() + resolve(value) + } - for await (const event of events.stream) { - Rpc.emit("event", event as Event) + const unsub = Bus.subscribeAll((event) => { + Rpc.emit("event", event as Event) + if (event.type === Bus.InstanceDisposed.type) { + settle(true) + } + }) + + const onAbort = () => { + settle(false) + } + + signal.addEventListener("abort", onAbort, { once: true }) + }), + }).catch((error) => { + Log.Default.error("event stream subscribe error", { + error: error instanceof Error ? error.message : error, + }) + return false + }) + + if (!shouldReconnect || signal.aborted) { + break } if (!signal.aborted) {