refactor(core): support multiple event streams in worker and remove workspaces from plugin api (#21348)

pull/19054/merge
James Long 2026-04-07 13:22:34 -04:00 committed by GitHub
parent ec8b9810b4
commit 5d48e7bd44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 51 additions and 90 deletions

View File

@ -289,9 +289,6 @@ function App(props: { onSnapshot?: () => Promise<string[]> }) {
toast, toast,
renderer, renderer,
}) })
onCleanup(() => {
api.dispose()
})
const [ready, setReady] = createSignal(false) const [ready, setReady] = createSignal(false)
TuiPluginRuntime.init(api) TuiPluginRuntime.init(api)
.catch((error) => { .catch((error) => {

View File

@ -4,8 +4,7 @@ import { createGlobalEmitter } from "@solid-primitives/event-bus"
import { batch, onCleanup, onMount } from "solid-js" import { batch, onCleanup, onMount } from "solid-js"
export type EventSource = { export type EventSource = {
on: (handler: (event: Event) => void) => () => void subscribe: (directory: string | undefined, handler: (event: Event) => void) => Promise<() => void>
setWorkspace?: (workspaceID?: string) => void
} }
export const { use: useSDK, provider: SDKProvider } = createSimpleContext({ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
@ -18,7 +17,6 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
events?: EventSource events?: EventSource
}) => { }) => {
const abort = new AbortController() const abort = new AbortController()
let workspaceID: string | undefined
let sse: AbortController | undefined let sse: AbortController | undefined
function createSDK() { function createSDK() {
@ -28,7 +26,6 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
directory: props.directory, directory: props.directory,
fetch: props.fetch, fetch: props.fetch,
headers: props.headers, headers: props.headers,
experimental_workspaceID: workspaceID,
}) })
} }
@ -90,9 +87,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
})().catch(() => {}) })().catch(() => {})
} }
onMount(() => { onMount(async () => {
if (props.events) { if (props.events) {
const unsub = props.events.on(handleEvent) const unsub = await props.events.subscribe(props.directory, handleEvent)
onCleanup(unsub) onCleanup(unsub)
} else { } else {
startSSE() startSSE()
@ -109,19 +106,9 @@ export const { use: useSDK, provider: SDKProvider } = createSimpleContext({
get client() { get client() {
return sdk return sdk
}, },
get workspaceID() {
return workspaceID
},
directory: props.directory, directory: props.directory,
event: emitter, event: emitter,
fetch: props.fetch ?? fetch, fetch: props.fetch ?? fetch,
setWorkspace(next?: string) {
if (workspaceID === next) return
workspaceID = next
sdk = createSDK()
props.events?.setWorkspace?.(next)
if (!props.events) startSSE()
},
url: props.url, url: props.url,
} }
}, },

View File

@ -18,7 +18,7 @@ import { Prompt } from "../component/prompt"
import { Slot as HostSlot } from "./slots" import { Slot as HostSlot } from "./slots"
import type { useToast } from "../ui/toast" import type { useToast } from "../ui/toast"
import { Installation } from "@/installation" import { Installation } from "@/installation"
import { createOpencodeClient, type OpencodeClient } from "@opencode-ai/sdk/v2" import { type OpencodeClient } from "@opencode-ai/sdk/v2"
type RouteEntry = { type RouteEntry = {
key: symbol key: symbol
@ -43,11 +43,6 @@ type Input = {
renderer: TuiPluginApi["renderer"] renderer: TuiPluginApi["renderer"]
} }
type TuiHostPluginApi = TuiPluginApi & {
map: Map<string | undefined, OpencodeClient>
dispose: () => void
}
function routeRegister(routes: RouteMap, list: TuiRouteDefinition[], bump: () => void) { function routeRegister(routes: RouteMap, list: TuiRouteDefinition[], bump: () => void) {
const key = Symbol() const key = Symbol()
for (const item of list) { for (const item of list) {
@ -206,29 +201,7 @@ function appApi(): TuiPluginApi["app"] {
} }
} }
export function createTuiApi(input: Input): TuiHostPluginApi { export function createTuiApi(input: Input): TuiPluginApi {
const map = new Map<string | undefined, OpencodeClient>()
const scoped: TuiPluginApi["scopedClient"] = (workspaceID) => {
const hit = map.get(workspaceID)
if (hit) return hit
const next = createOpencodeClient({
baseUrl: input.sdk.url,
fetch: input.sdk.fetch,
directory: input.sync.data.path.directory || input.sdk.directory,
experimental_workspaceID: workspaceID,
})
map.set(workspaceID, next)
return next
}
const workspace: TuiPluginApi["workspace"] = {
current() {
return input.sdk.workspaceID
},
set(workspaceID) {
input.sdk.setWorkspace(workspaceID)
},
}
const lifecycle: TuiPluginApi["lifecycle"] = { const lifecycle: TuiPluginApi["lifecycle"] = {
signal: new AbortController().signal, signal: new AbortController().signal,
onDispose() { onDispose() {
@ -369,8 +342,6 @@ export function createTuiApi(input: Input): TuiHostPluginApi {
get client() { get client() {
return input.sdk.client return input.sdk.client
}, },
scopedClient: scoped,
workspace,
event: input.sdk.event, event: input.sdk.event,
renderer: input.renderer, renderer: input.renderer,
slots: { slots: {
@ -422,9 +393,5 @@ export function createTuiApi(input: Input): TuiHostPluginApi {
return input.theme.ready return input.theme.ready
}, },
}, },
map,
dispose() {
map.clear()
},
} }
} }

View File

@ -543,8 +543,6 @@ function pluginApi(runtime: RuntimeState, plugin: PluginEntry, scope: PluginScop
get client() { get client() {
return api.client return api.client
}, },
scopedClient: api.scopedClient,
workspace: api.workspace,
event, event,
renderer: api.renderer, renderer: api.renderer,
slots, slots,

View File

@ -167,12 +167,6 @@ export function Session() {
const scrollAcceleration = createMemo(() => getScrollAcceleration(tuiConfig)) const scrollAcceleration = createMemo(() => getScrollAcceleration(tuiConfig))
createEffect(() => {
if (session()?.workspaceID) {
sdk.setWorkspace(session()?.workspaceID)
}
})
createEffect(async () => { createEffect(async () => {
await sync.session await sync.session
.sync(route.sessionID) .sync(route.sessionID)

View File

@ -43,9 +43,18 @@ function createWorkerFetch(client: RpcClient): typeof fetch {
function createEventSource(client: RpcClient): EventSource { function createEventSource(client: RpcClient): EventSource {
return { return {
on: (handler) => client.on<Event>("event", handler), subscribe: async (directory, handler) => {
setWorkspace: (workspaceID) => { const id = await client.call("subscribe", { directory })
void client.call("setWorkspace", { workspaceID }) const unsub = client.on<{ id: string; event: Event }>("event", (e) => {
if (e.id === id) {
handler(e.event)
}
})
return () => {
unsub()
client.call("unsubscribe", { id })
}
}, },
} }
} }

View File

@ -45,20 +45,20 @@ GlobalBus.on("event", (event) => {
let server: Awaited<ReturnType<typeof Server.listen>> | undefined let server: Awaited<ReturnType<typeof Server.listen>> | undefined
const eventStream = { const eventStreams = new Map<string, AbortController>()
abort: undefined as AbortController | undefined,
} function startEventStream(directory: string) {
const id = crypto.randomUUID()
const startEventStream = (input: { directory: string; workspaceID?: string }) => {
if (eventStream.abort) eventStream.abort.abort()
const abort = new AbortController() const abort = new AbortController()
eventStream.abort = abort
const signal = abort.signal const signal = abort.signal
;(async () => { eventStreams.set(id, abort)
async function run() {
while (!signal.aborted) { while (!signal.aborted) {
const shouldReconnect = await Instance.provide({ const shouldReconnect = await Instance.provide({
directory: input.directory, directory,
init: InstanceBootstrap, init: InstanceBootstrap,
fn: () => fn: () =>
new Promise<boolean>((resolve) => { new Promise<boolean>((resolve) => {
@ -77,7 +77,10 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) =>
} }
const unsub = Bus.subscribeAll((event) => { const unsub = Bus.subscribeAll((event) => {
Rpc.emit("event", event as Event) Rpc.emit("event", {
id,
event: event as Event,
})
if (event.type === Bus.InstanceDisposed.type) { if (event.type === Bus.InstanceDisposed.type) {
settle(true) settle(true)
} }
@ -104,14 +107,24 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) =>
await sleep(250) await sleep(250)
} }
} }
})().catch((error) => { }
run().catch((error) => {
Log.Default.error("event stream error", { Log.Default.error("event stream error", {
error: error instanceof Error ? error.message : error, error: error instanceof Error ? error.message : error,
}) })
}) })
return id
} }
startEventStream({ directory: process.cwd() }) function stopEventStream(id: string) {
const abortController = eventStreams.get(id)
if (!abortController) return
abortController.abort()
eventStreams.delete(id)
}
export const rpc = { export const rpc = {
async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) { async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
@ -154,12 +167,19 @@ export const rpc = {
async reload() { async reload() {
await Config.invalidate(true) await Config.invalidate(true)
}, },
async setWorkspace(input: { workspaceID?: string }) { async subscribe(input: { directory: string | undefined }) {
startEventStream({ directory: process.cwd(), workspaceID: input.workspaceID }) return startEventStream(input.directory || process.cwd())
},
async unsubscribe(input: { id: string }) {
stopEventStream(input.id)
}, },
async shutdown() { async shutdown() {
Log.Default.info("worker shutting down") Log.Default.info("worker shutting down")
if (eventStream.abort) eventStream.abort.abort()
for (const id of [...eventStreams.keys()]) {
stopEventStream(id)
}
await Instance.disposeAll() await Instance.disposeAll()
if (server) await server.stop(true) if (server) await server.stop(true)
}, },

View File

@ -82,8 +82,6 @@ function themeCurrent(): HostPluginApi["theme"]["current"] {
type Opts = { type Opts = {
client?: HostPluginApi["client"] | (() => HostPluginApi["client"]) client?: HostPluginApi["client"] | (() => HostPluginApi["client"])
scopedClient?: HostPluginApi["scopedClient"]
workspace?: Partial<HostPluginApi["workspace"]>
renderer?: HostPluginApi["renderer"] renderer?: HostPluginApi["renderer"]
count?: Count count?: Count
keybind?: Partial<HostPluginApi["keybind"]> keybind?: Partial<HostPluginApi["keybind"]>
@ -127,11 +125,6 @@ export function createTuiPluginApi(opts: Opts = {}): HostPluginApi {
? () => opts.client as HostPluginApi["client"] ? () => opts.client as HostPluginApi["client"]
: fallback : fallback
const client = () => read() const client = () => read()
const scopedClient = opts.scopedClient ?? ((_workspaceID?: string) => client())
const workspace: HostPluginApi["workspace"] = {
current: opts.workspace?.current ?? (() => undefined),
set: opts.workspace?.set ?? (() => {}),
}
let depth = 0 let depth = 0
let size: "medium" | "large" | "xlarge" = "medium" let size: "medium" | "large" | "xlarge" = "medium"
const has = opts.theme?.has ?? (() => false) const has = opts.theme?.has ?? (() => false)
@ -171,8 +164,6 @@ export function createTuiPluginApi(opts: Opts = {}): HostPluginApi {
get client() { get client() {
return client() return client()
}, },
scopedClient,
workspace,
event: { event: {
on: () => { on: () => {
if (count) count.event_add += 1 if (count) count.event_add += 1

View File

@ -484,8 +484,6 @@ export type TuiPluginApi = {
state: TuiState state: TuiState
theme: TuiTheme theme: TuiTheme
client: OpencodeClient client: OpencodeClient
scopedClient: (workspaceID?: string) => OpencodeClient
workspace: TuiWorkspace
event: TuiEventBus event: TuiEventBus
renderer: CliRenderer renderer: CliRenderer
slots: TuiSlots slots: TuiSlots