From 05346fdc50db72d296b69eea1455362946f6a830 Mon Sep 17 00:00:00 2001 From: Aiden Cline Date: Tue, 24 Mar 2026 23:01:08 -0500 Subject: [PATCH] wip --- bun.lock | 2 + packages/opencode/package.json | 4 +- packages/opencode/src/plugin/index.ts | 2 +- .../opencode/src/plugin/{ => openai}/codex.ts | 17 +- .../opencode/src/plugin/openai/websocket.ts | 210 ++++++++++++++++++ packages/opencode/test/plugin/codex.test.ts | 2 +- 6 files changed, 229 insertions(+), 8 deletions(-) rename packages/opencode/src/plugin/{ => openai}/codex.ts (97%) create mode 100644 packages/opencode/src/plugin/openai/websocket.ts diff --git a/bun.lock b/bun.lock index a4746ad5cb..2317928fd6 100644 --- a/bun.lock +++ b/bun.lock @@ -383,6 +383,7 @@ "vscode-jsonrpc": "8.2.1", "web-tree-sitter": "0.25.10", "which": "6.0.1", + "ws": "8.18.0", "xdg-basedir": "5.1.0", "yargs": "18.0.0", "zod": "catalog:", @@ -410,6 +411,7 @@ "@types/semver": "^7.5.8", "@types/turndown": "5.0.5", "@types/which": "3.0.4", + "@types/ws": "^8.18.1", "@types/yargs": "17.0.33", "@typescript/native-preview": "catalog:", "drizzle-kit": "catalog:", diff --git a/packages/opencode/package.json b/packages/opencode/package.json index 05fb389b24..1c949516c3 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -55,6 +55,7 @@ "@types/semver": "^7.5.8", "@types/turndown": "5.0.5", "@types/which": "3.0.4", + "@types/ws": "^8.18.1", "@types/yargs": "17.0.33", "@typescript/native-preview": "catalog:", "drizzle-kit": "catalog:", @@ -133,9 +134,9 @@ "minimatch": "10.0.3", "open": "10.1.2", "opencode-gitlab-auth": "2.0.0", + "opencode-poe-auth": "0.0.1", "opentui-spinner": "0.0.6", "partial-json": "0.1.7", - "opencode-poe-auth": "0.0.1", "remeda": "catalog:", "semver": "^7.6.3", "solid-js": "catalog:", @@ -145,6 +146,7 @@ "ulid": "catalog:", "vscode-jsonrpc": "8.2.1", "web-tree-sitter": "0.25.10", + "ws": "8.18.0", "which": "6.0.1", "xdg-basedir": "5.1.0", "yargs": "18.0.0", diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 5cd3790784..770b03c45d 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -6,7 +6,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk" import { Server } from "../server/server" import { BunProc } from "../bun" import { Flag } from "../flag/flag" -import { CodexAuthPlugin } from "./codex" +import { CodexAuthPlugin } from "./openai/codex" import { Session } from "../session" import { NamedError } from "@opencode-ai/util/error" import { CopilotAuthPlugin } from "./copilot" diff --git a/packages/opencode/src/plugin/codex.ts b/packages/opencode/src/plugin/openai/codex.ts similarity index 97% rename from packages/opencode/src/plugin/codex.ts rename to packages/opencode/src/plugin/openai/codex.ts index 943295e64c..009463b5da 100644 --- a/packages/opencode/src/plugin/codex.ts +++ b/packages/opencode/src/plugin/openai/codex.ts @@ -1,11 +1,12 @@ import type { Hooks, PluginInput } from "@opencode-ai/plugin" -import { Log } from "../util/log" -import { Installation } from "../installation" -import { Auth, OAUTH_DUMMY_KEY } from "../auth" +import { Log } from "../../util/log" +import { Installation } from "../../installation" +import { OAUTH_DUMMY_KEY } from "../../auth" import os from "os" import { ProviderTransform } from "@/provider/transform" import { ModelID, ProviderID } from "@/provider/schema" import { setTimeout as sleep } from "node:timers/promises" +import { createWebSocketFetch } from "./websocket" const log = Log.create({ service: "plugin.codex" }) @@ -351,12 +352,18 @@ function waitForOAuthCallback(pkce: PkceCodes, state: string): Promise { + const ws = createWebSocketFetch() return { auth: { provider: "openai", async loader(getAuth, provider) { const auth = await getAuth() - if (auth.type !== "oauth") return {} + if (auth.type !== "oauth") + return { + async fetch(requestInput: RequestInfo | URL, init?: RequestInit) { + return ws(requestInput, init) + }, + } // Filter models to only allowed Codex models for OAuth const allowedModels = new Set([ @@ -491,7 +498,7 @@ export async function CodexAuthPlugin(input: PluginInput): Promise { ? new URL(CODEX_API_ENDPOINT) : parsed - return fetch(url, { + return ws(url, { ...init, headers, }) diff --git a/packages/opencode/src/plugin/openai/websocket.ts b/packages/opencode/src/plugin/openai/websocket.ts new file mode 100644 index 0000000000..328c109a54 --- /dev/null +++ b/packages/opencode/src/plugin/openai/websocket.ts @@ -0,0 +1,210 @@ +import WebSocket from "ws" + +export interface CreateWebSocketFetchOptions { + /** + * WebSocket endpoint URL. + * @default 'wss://api.openai.com/v1/responses' + */ + url?: string +} + +/** + * Creates a `fetch` function that routes OpenAI Responses API streaming + * requests through a persistent WebSocket connection instead of HTTP. + * + * Non-streaming requests and requests to other endpoints are passed + * through to the standard `fetch`. + * + * The connection is created lazily on the first streaming request and + * reused for subsequent ones, which is the main source of latency + * savings in multi-step tool-calling workflows. + * + * @example + * ```ts + * import { createOpenAI } from '@ai-sdk/openai'; + * import { createWebSocketFetch } from 'ai-sdk-openai-websocket-fetch'; + * + * const wsFetch = createWebSocketFetch(); + * const openai = createOpenAI({ fetch: wsFetch }); + * + * const result = streamText({ + * model: openai('gpt-4.1-mini'), + * prompt: 'Hello!', + * onFinish: () => wsFetch.close(), + * }); + * ``` + */ +export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) { + const wsUrl = options?.url ?? "wss://api.openai.com/v1/responses" + + let ws: WebSocket | null = null + let connecting: Promise | null = null + let busy = false + + function getConnection(authorization: string): Promise { + if (ws?.readyState === WebSocket.OPEN && !busy) { + return Promise.resolve(ws) + } + + if (connecting && !busy) return connecting + + connecting = new Promise((resolve, reject) => { + const socket = new WebSocket(wsUrl, { + headers: { + Authorization: authorization, + "OpenAI-Beta": "responses_websockets=2026-02-06", + }, + }) + + socket.on("open", () => { + ws = socket + connecting = null + resolve(socket) + }) + + socket.on("error", (err) => { + if (connecting) { + connecting = null + reject(err) + } + }) + + socket.on("close", () => { + if (ws === socket) ws = null + }) + }) + + return connecting + } + + async function websocketFetch(input: RequestInfo | URL, init?: RequestInit): Promise { + const url = input instanceof URL ? input.toString() : typeof input === "string" ? input : input.url + + if (init?.method !== "POST" || !url.endsWith("/responses")) { + return globalThis.fetch(input, init) + } + + let body: Record + try { + body = JSON.parse(typeof init.body === "string" ? init.body : "") + } catch { + return globalThis.fetch(input, init) + } + + if (!body.stream) { + return globalThis.fetch(input, init) + } + + const headers = normalizeHeaders(init.headers) + const authorization = headers["authorization"] ?? "" + + const connection = await getConnection(authorization) + busy = true + + const { stream: _, ...requestBody } = body + const encoder = new TextEncoder() + + const responseStream = new ReadableStream({ + start(controller) { + function cleanup() { + connection.off("message", onMessage) + connection.off("error", onError) + connection.off("close", onClose) + busy = false + } + + function onMessage(data: WebSocket.RawData) { + const text = data.toString() + controller.enqueue(encoder.encode(`data: ${text}\n\n`)) + + try { + const event = JSON.parse(text) + if (event.type === "response.completed" || event.type === "error") { + controller.enqueue(encoder.encode("data: [DONE]\n\n")) + cleanup() + controller.close() + } + } catch { + // non-JSON frame, continue + } + } + + function onError(err: Error) { + cleanup() + controller.error(err) + } + + function onClose() { + cleanup() + try { + controller.close() + } catch { + // already closed + } + } + + connection.on("message", onMessage) + connection.on("error", onError) + connection.on("close", onClose) + + if (init?.signal) { + if (init.signal.aborted) { + cleanup() + controller.error(init.signal.reason ?? new DOMException("Aborted", "AbortError")) + return + } + init.signal.addEventListener( + "abort", + () => { + cleanup() + try { + controller.error(init!.signal!.reason ?? new DOMException("Aborted", "AbortError")) + } catch { + // already closed + } + }, + { once: true }, + ) + } + + connection.send(JSON.stringify({ type: "response.create", ...requestBody })) + }, + }) + + return new Response(responseStream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }) + } + + return Object.assign(websocketFetch, { + /** Close the underlying WebSocket connection. */ + close() { + if (ws) { + ws.close() + ws = null + } + }, + }) +} + +function normalizeHeaders(headers: HeadersInit | undefined): Record { + const result: Record = {} + if (!headers) return result + + if (headers instanceof Headers) { + headers.forEach((v, k) => { + result[k.toLowerCase()] = v + }) + } else if (Array.isArray(headers)) { + for (const [k, v] of headers) { + result[k.toLowerCase()] = v + } + } else { + for (const [k, v] of Object.entries(headers)) { + if (v != null) result[k.toLowerCase()] = v + } + } + + return result +} diff --git a/packages/opencode/test/plugin/codex.test.ts b/packages/opencode/test/plugin/codex.test.ts index 74d28ac9dc..d7b154b88c 100644 --- a/packages/opencode/test/plugin/codex.test.ts +++ b/packages/opencode/test/plugin/codex.test.ts @@ -4,7 +4,7 @@ import { extractAccountIdFromClaims, extractAccountId, type IdTokenClaims, -} from "../../src/plugin/codex" +} from "../../src/plugin/openai/codex" function createTestJwt(payload: object): string { const header = Buffer.from(JSON.stringify({ alg: "none" })).toString("base64url")