pull/19223/head
Aiden Cline 2026-03-24 23:01:08 -05:00
parent 0dcdf5f529
commit 05346fdc50
6 changed files with 229 additions and 8 deletions

View File

@ -383,6 +383,7 @@
"vscode-jsonrpc": "8.2.1",
"web-tree-sitter": "0.25.10",
"which": "6.0.1",
"ws": "8.18.0",
"xdg-basedir": "5.1.0",
"yargs": "18.0.0",
"zod": "catalog:",
@ -410,6 +411,7 @@
"@types/semver": "^7.5.8",
"@types/turndown": "5.0.5",
"@types/which": "3.0.4",
"@types/ws": "^8.18.1",
"@types/yargs": "17.0.33",
"@typescript/native-preview": "catalog:",
"drizzle-kit": "catalog:",

View File

@ -55,6 +55,7 @@
"@types/semver": "^7.5.8",
"@types/turndown": "5.0.5",
"@types/which": "3.0.4",
"@types/ws": "^8.18.1",
"@types/yargs": "17.0.33",
"@typescript/native-preview": "catalog:",
"drizzle-kit": "catalog:",
@ -133,9 +134,9 @@
"minimatch": "10.0.3",
"open": "10.1.2",
"opencode-gitlab-auth": "2.0.0",
"opencode-poe-auth": "0.0.1",
"opentui-spinner": "0.0.6",
"partial-json": "0.1.7",
"opencode-poe-auth": "0.0.1",
"remeda": "catalog:",
"semver": "^7.6.3",
"solid-js": "catalog:",
@ -145,6 +146,7 @@
"ulid": "catalog:",
"vscode-jsonrpc": "8.2.1",
"web-tree-sitter": "0.25.10",
"ws": "8.18.0",
"which": "6.0.1",
"xdg-basedir": "5.1.0",
"yargs": "18.0.0",

View File

@ -6,7 +6,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk"
import { Server } from "../server/server"
import { BunProc } from "../bun"
import { Flag } from "../flag/flag"
import { CodexAuthPlugin } from "./codex"
import { CodexAuthPlugin } from "./openai/codex"
import { Session } from "../session"
import { NamedError } from "@opencode-ai/util/error"
import { CopilotAuthPlugin } from "./copilot"

View File

@ -1,11 +1,12 @@
import type { Hooks, PluginInput } from "@opencode-ai/plugin"
import { Log } from "../util/log"
import { Installation } from "../installation"
import { Auth, OAUTH_DUMMY_KEY } from "../auth"
import { Log } from "../../util/log"
import { Installation } from "../../installation"
import { OAUTH_DUMMY_KEY } from "../../auth"
import os from "os"
import { ProviderTransform } from "@/provider/transform"
import { ModelID, ProviderID } from "@/provider/schema"
import { setTimeout as sleep } from "node:timers/promises"
import { createWebSocketFetch } from "./websocket"
const log = Log.create({ service: "plugin.codex" })
@ -351,12 +352,18 @@ function waitForOAuthCallback(pkce: PkceCodes, state: string): Promise<TokenResp
}
export async function CodexAuthPlugin(input: PluginInput): Promise<Hooks> {
const ws = createWebSocketFetch()
return {
auth: {
provider: "openai",
async loader(getAuth, provider) {
const auth = await getAuth()
if (auth.type !== "oauth") return {}
if (auth.type !== "oauth")
return {
async fetch(requestInput: RequestInfo | URL, init?: RequestInit) {
return ws(requestInput, init)
},
}
// Filter models to only allowed Codex models for OAuth
const allowedModels = new Set([
@ -491,7 +498,7 @@ export async function CodexAuthPlugin(input: PluginInput): Promise<Hooks> {
? new URL(CODEX_API_ENDPOINT)
: parsed
return fetch(url, {
return ws(url, {
...init,
headers,
})

View File

@ -0,0 +1,210 @@
import WebSocket from "ws"
export interface CreateWebSocketFetchOptions {
/**
* WebSocket endpoint URL.
* @default 'wss://api.openai.com/v1/responses'
*/
url?: string
}
/**
* Creates a `fetch` function that routes OpenAI Responses API streaming
* requests through a persistent WebSocket connection instead of HTTP.
*
* Non-streaming requests and requests to other endpoints are passed
* through to the standard `fetch`.
*
* The connection is created lazily on the first streaming request and
* reused for subsequent ones, which is the main source of latency
* savings in multi-step tool-calling workflows.
*
* @example
* ```ts
* import { createOpenAI } from '@ai-sdk/openai';
* import { createWebSocketFetch } from 'ai-sdk-openai-websocket-fetch';
*
* const wsFetch = createWebSocketFetch();
* const openai = createOpenAI({ fetch: wsFetch });
*
* const result = streamText({
* model: openai('gpt-4.1-mini'),
* prompt: 'Hello!',
* onFinish: () => wsFetch.close(),
* });
* ```
*/
export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
const wsUrl = options?.url ?? "wss://api.openai.com/v1/responses"
let ws: WebSocket | null = null
let connecting: Promise<WebSocket> | null = null
let busy = false
function getConnection(authorization: string): Promise<WebSocket> {
if (ws?.readyState === WebSocket.OPEN && !busy) {
return Promise.resolve(ws)
}
if (connecting && !busy) return connecting
connecting = new Promise<WebSocket>((resolve, reject) => {
const socket = new WebSocket(wsUrl, {
headers: {
Authorization: authorization,
"OpenAI-Beta": "responses_websockets=2026-02-06",
},
})
socket.on("open", () => {
ws = socket
connecting = null
resolve(socket)
})
socket.on("error", (err) => {
if (connecting) {
connecting = null
reject(err)
}
})
socket.on("close", () => {
if (ws === socket) ws = null
})
})
return connecting
}
async function websocketFetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response> {
const url = input instanceof URL ? input.toString() : typeof input === "string" ? input : input.url
if (init?.method !== "POST" || !url.endsWith("/responses")) {
return globalThis.fetch(input, init)
}
let body: Record<string, unknown>
try {
body = JSON.parse(typeof init.body === "string" ? init.body : "")
} catch {
return globalThis.fetch(input, init)
}
if (!body.stream) {
return globalThis.fetch(input, init)
}
const headers = normalizeHeaders(init.headers)
const authorization = headers["authorization"] ?? ""
const connection = await getConnection(authorization)
busy = true
const { stream: _, ...requestBody } = body
const encoder = new TextEncoder()
const responseStream = new ReadableStream<Uint8Array>({
start(controller) {
function cleanup() {
connection.off("message", onMessage)
connection.off("error", onError)
connection.off("close", onClose)
busy = false
}
function onMessage(data: WebSocket.RawData) {
const text = data.toString()
controller.enqueue(encoder.encode(`data: ${text}\n\n`))
try {
const event = JSON.parse(text)
if (event.type === "response.completed" || event.type === "error") {
controller.enqueue(encoder.encode("data: [DONE]\n\n"))
cleanup()
controller.close()
}
} catch {
// non-JSON frame, continue
}
}
function onError(err: Error) {
cleanup()
controller.error(err)
}
function onClose() {
cleanup()
try {
controller.close()
} catch {
// already closed
}
}
connection.on("message", onMessage)
connection.on("error", onError)
connection.on("close", onClose)
if (init?.signal) {
if (init.signal.aborted) {
cleanup()
controller.error(init.signal.reason ?? new DOMException("Aborted", "AbortError"))
return
}
init.signal.addEventListener(
"abort",
() => {
cleanup()
try {
controller.error(init!.signal!.reason ?? new DOMException("Aborted", "AbortError"))
} catch {
// already closed
}
},
{ once: true },
)
}
connection.send(JSON.stringify({ type: "response.create", ...requestBody }))
},
})
return new Response(responseStream, {
status: 200,
headers: { "content-type": "text/event-stream" },
})
}
return Object.assign(websocketFetch, {
/** Close the underlying WebSocket connection. */
close() {
if (ws) {
ws.close()
ws = null
}
},
})
}
function normalizeHeaders(headers: HeadersInit | undefined): Record<string, string> {
const result: Record<string, string> = {}
if (!headers) return result
if (headers instanceof Headers) {
headers.forEach((v, k) => {
result[k.toLowerCase()] = v
})
} else if (Array.isArray(headers)) {
for (const [k, v] of headers) {
result[k.toLowerCase()] = v
}
} else {
for (const [k, v] of Object.entries(headers)) {
if (v != null) result[k.toLowerCase()] = v
}
}
return result
}

View File

@ -4,7 +4,7 @@ import {
extractAccountIdFromClaims,
extractAccountId,
type IdTokenClaims,
} from "../../src/plugin/codex"
} from "../../src/plugin/openai/codex"
function createTestJwt(payload: object): string {
const header = Buffer.from(JSON.stringify({ alg: "none" })).toString("base64url")