255 lines
8.0 KiB
TypeScript
255 lines
8.0 KiB
TypeScript
import type { Event } from "@opencode-ai/sdk/v2/client"
|
|
import { createSimpleContext } from "@opencode-ai/ui/context"
|
|
import { createGlobalEmitter } from "@solid-primitives/event-bus"
|
|
import { batch, onCleanup } from "solid-js"
|
|
import z from "zod"
|
|
import { createSdkForServer } from "@/utils/server"
|
|
import { useLanguage } from "./language"
|
|
import { usePlatform } from "./platform"
|
|
import { useServer } from "./server"
|
|
|
|
const abortError = z.object({
|
|
name: z.literal("AbortError"),
|
|
})
|
|
|
|
export const { use: useGlobalSDK, provider: GlobalSDKProvider } = createSimpleContext({
|
|
name: "GlobalSDK",
|
|
init: () => {
|
|
const language = useLanguage()
|
|
const server = useServer()
|
|
const platform = usePlatform()
|
|
const abort = new AbortController()
|
|
|
|
const eventFetch = (() => {
|
|
if (!platform.fetch || !server.current) return
|
|
try {
|
|
const url = new URL(server.current.http.url)
|
|
const loopback = url.hostname === "localhost" || url.hostname === "127.0.0.1" || url.hostname === "::1"
|
|
if (url.protocol === "http:" && !loopback) return platform.fetch
|
|
} catch {
|
|
return
|
|
}
|
|
})()
|
|
|
|
const currentServer = server.current
|
|
if (!currentServer) throw new Error(language.t("error.globalSDK.noServerAvailable"))
|
|
|
|
const eventSdk = createSdkForServer({
|
|
signal: abort.signal,
|
|
fetch: eventFetch,
|
|
server: currentServer.http,
|
|
})
|
|
const emitter = createGlobalEmitter<{
|
|
[key: string]: Event
|
|
}>()
|
|
|
|
type Queued = { directory: string; payload: Event }
|
|
const FLUSH_FRAME_MS = 16
|
|
const STREAM_YIELD_MS = 8
|
|
const RECONNECT_DELAY_MS = 250
|
|
|
|
let queue: Queued[] = []
|
|
let buffer: Queued[] = []
|
|
const coalesced = new Map<string, number>()
|
|
const staleDeltas = new Set<string>()
|
|
let timer: ReturnType<typeof setTimeout> | undefined
|
|
let last = 0
|
|
|
|
const deltaKey = (directory: string, messageID: string, partID: string) => `${directory}:${messageID}:${partID}`
|
|
|
|
const key = (directory: string, payload: Event) => {
|
|
if (payload.type === "session.status") return `session.status:${directory}:${payload.properties.sessionID}`
|
|
if (payload.type === "lsp.updated") return `lsp.updated:${directory}`
|
|
if (payload.type === "message.part.updated") {
|
|
const part = payload.properties.part
|
|
return `message.part.updated:${directory}:${part.messageID}:${part.id}`
|
|
}
|
|
}
|
|
|
|
const flush = () => {
|
|
if (timer) clearTimeout(timer)
|
|
timer = undefined
|
|
|
|
if (queue.length === 0) return
|
|
|
|
const events = queue
|
|
const skip = staleDeltas.size > 0 ? new Set(staleDeltas) : undefined
|
|
queue = buffer
|
|
buffer = events
|
|
queue.length = 0
|
|
coalesced.clear()
|
|
staleDeltas.clear()
|
|
|
|
last = Date.now()
|
|
batch(() => {
|
|
for (const event of events) {
|
|
if (skip && event.payload.type === "message.part.delta") {
|
|
const props = event.payload.properties
|
|
if (skip.has(deltaKey(event.directory, props.messageID, props.partID))) continue
|
|
}
|
|
emitter.emit(event.directory, event.payload)
|
|
}
|
|
})
|
|
|
|
buffer.length = 0
|
|
}
|
|
|
|
const schedule = () => {
|
|
if (timer) return
|
|
const elapsed = Date.now() - last
|
|
timer = setTimeout(flush, Math.max(0, FLUSH_FRAME_MS - elapsed))
|
|
}
|
|
|
|
let streamErrorLogged = false
|
|
const wait = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms))
|
|
const aborted = (error: unknown) => abortError.safeParse(error).success
|
|
|
|
let attempt: AbortController | undefined
|
|
let run: Promise<void> | undefined
|
|
let started = false
|
|
const HEARTBEAT_TIMEOUT_MS = 15_000
|
|
let lastEventAt = Date.now()
|
|
let heartbeat: ReturnType<typeof setTimeout> | undefined
|
|
const resetHeartbeat = () => {
|
|
lastEventAt = Date.now()
|
|
if (heartbeat) clearTimeout(heartbeat)
|
|
heartbeat = setTimeout(() => {
|
|
attempt?.abort()
|
|
}, HEARTBEAT_TIMEOUT_MS)
|
|
}
|
|
const clearHeartbeat = () => {
|
|
if (!heartbeat) return
|
|
clearTimeout(heartbeat)
|
|
heartbeat = undefined
|
|
}
|
|
|
|
const start = () => {
|
|
if (started) return run
|
|
started = true
|
|
run = (async () => {
|
|
while (!abort.signal.aborted && started) {
|
|
attempt = new AbortController()
|
|
lastEventAt = Date.now()
|
|
const onAbort = () => {
|
|
attempt?.abort()
|
|
}
|
|
abort.signal.addEventListener("abort", onAbort)
|
|
try {
|
|
const events = await eventSdk.global.event({
|
|
signal: attempt.signal,
|
|
onSseError: (error) => {
|
|
if (aborted(error)) return
|
|
if (streamErrorLogged) return
|
|
streamErrorLogged = true
|
|
console.error("[global-sdk] event stream error", {
|
|
url: currentServer.http.url,
|
|
fetch: eventFetch ? "platform" : "webview",
|
|
error,
|
|
})
|
|
},
|
|
})
|
|
let yielded = Date.now()
|
|
resetHeartbeat()
|
|
for await (const event of events.stream) {
|
|
resetHeartbeat()
|
|
streamErrorLogged = false
|
|
const directory = event.directory ?? "global"
|
|
const payload = event.payload
|
|
const k = key(directory, payload)
|
|
if (k) {
|
|
const i = coalesced.get(k)
|
|
if (i !== undefined) {
|
|
queue[i] = { directory, payload }
|
|
if (payload.type === "message.part.updated") {
|
|
const part = payload.properties.part
|
|
staleDeltas.add(deltaKey(directory, part.messageID, part.id))
|
|
}
|
|
continue
|
|
}
|
|
coalesced.set(k, queue.length)
|
|
}
|
|
queue.push({ directory, payload })
|
|
schedule()
|
|
|
|
if (Date.now() - yielded < STREAM_YIELD_MS) continue
|
|
yielded = Date.now()
|
|
await wait(0)
|
|
}
|
|
} catch (error) {
|
|
if (!aborted(error) && !streamErrorLogged) {
|
|
streamErrorLogged = true
|
|
console.error("[global-sdk] event stream failed", {
|
|
url: currentServer.http.url,
|
|
fetch: eventFetch ? "platform" : "webview",
|
|
error,
|
|
})
|
|
}
|
|
} finally {
|
|
abort.signal.removeEventListener("abort", onAbort)
|
|
attempt = undefined
|
|
clearHeartbeat()
|
|
}
|
|
|
|
if (abort.signal.aborted || !started) return
|
|
await wait(RECONNECT_DELAY_MS)
|
|
}
|
|
})().finally(() => {
|
|
run = undefined
|
|
flush()
|
|
})
|
|
return run
|
|
}
|
|
|
|
const stop = () => {
|
|
started = false
|
|
attempt?.abort()
|
|
clearHeartbeat()
|
|
}
|
|
|
|
const onVisibility = () => {
|
|
if (typeof document === "undefined") return
|
|
if (document.visibilityState !== "visible") return
|
|
if (!started) return
|
|
if (Date.now() - lastEventAt < HEARTBEAT_TIMEOUT_MS) return
|
|
attempt?.abort()
|
|
}
|
|
if (typeof document !== "undefined") {
|
|
document.addEventListener("visibilitychange", onVisibility)
|
|
}
|
|
|
|
onCleanup(() => {
|
|
if (typeof document !== "undefined") {
|
|
document.removeEventListener("visibilitychange", onVisibility)
|
|
}
|
|
stop()
|
|
abort.abort()
|
|
flush()
|
|
})
|
|
|
|
const sdk = createSdkForServer({
|
|
server: server.current.http,
|
|
fetch: platform.fetch,
|
|
throwOnError: true,
|
|
})
|
|
|
|
return {
|
|
url: currentServer.http.url,
|
|
client: sdk,
|
|
event: {
|
|
on: emitter.on.bind(emitter),
|
|
listen: emitter.listen.bind(emitter),
|
|
start,
|
|
},
|
|
createClient(opts: Omit<Parameters<typeof createSdkForServer>[0], "server" | "fetch">) {
|
|
const s = server.current
|
|
if (!s) throw new Error(language.t("error.globalSDK.serverNotAvailable"))
|
|
return createSdkForServer({
|
|
server: s.http,
|
|
fetch: platform.fetch,
|
|
...opts,
|
|
})
|
|
},
|
|
}
|
|
},
|
|
})
|