diff --git a/packages/console/app/src/routes/zen/util/handler.ts b/packages/console/app/src/routes/zen/util/handler.ts index ee73b056de..2fe008c1c6 100644 --- a/packages/console/app/src/routes/zen/util/handler.ts +++ b/packages/console/app/src/routes/zen/util/handler.ts @@ -39,6 +39,7 @@ import { createRateLimiter } from "./rateLimiter" import { createDataDumper } from "./dataDumper" import { createTrialLimiter } from "./trialLimiter" import { createStickyTracker } from "./stickyProviderTracker" +import { relay } from "./stream" import { LiteData } from "@opencode-ai/console-core/lite.js" import { Resource } from "@opencode-ai/console-resource" import { i18n, type Key } from "~/i18n" @@ -254,80 +255,41 @@ export async function handler( const streamConverter = createStreamPartConverter(providerInfo.format, opts.format) const usageParser = providerInfo.createUsageParser() const binaryDecoder = providerInfo.createBinaryStreamDecoder() - const stream = new ReadableStream({ - start(c) { - const reader = res.body?.getReader() - const decoder = new TextDecoder() - const encoder = new TextEncoder() - - let buffer = "" - let responseLength = 0 - - function pump(): Promise { - return ( - reader?.read().then(async ({ done, value: rawValue }) => { - if (done) { - logger.metric({ - response_length: responseLength, - "timestamp.last_byte": Date.now(), - }) - dataDumper?.flush() - await rateLimiter?.track() - const usage = usageParser.retrieve() - if (usage) { - const usageInfo = providerInfo.normalizeUsage(usage) - const costInfo = calculateCost(modelInfo, usageInfo) - await trialLimiter?.track(usageInfo) - await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo) - await reload(billingSource, authInfo, costInfo) - const cost = calculateOccurredCost(billingSource, costInfo) - c.enqueue(encoder.encode(buildCostChunk(opts.format, cost))) - } - c.close() - return - } - - if (responseLength === 0) { - const now = Date.now() - logger.metric({ - time_to_first_byte: now - startTimestamp, - "timestamp.first_byte": now, - }) - } - - const value = binaryDecoder ? binaryDecoder(rawValue) : rawValue - if (!value) return - - responseLength += value.length - buffer += decoder.decode(value, { stream: true }) - dataDumper?.provideStream(buffer) - - const parts = buffer.split(providerInfo.streamSeparator) - buffer = parts.pop() ?? "" - - for (let part of parts) { - logger.debug("PART: " + part) - - part = part.trim() - usageParser.parse(part) - - if (providerInfo.format !== opts.format) { - part = streamConverter(part) - c.enqueue(encoder.encode(part + "\n\n")) - } - } - - if (providerInfo.format === opts.format) { - c.enqueue(value) - } - - return pump() - }) || Promise.resolve() - ) - } - - return pump() + const metric = (values: Record) => + logger.metric({ + request: requestId, + session: sessionId, + client: ocClient, + provider: providerInfo.id, + model: modelInfo.id, + ...values, + }) + const stream = relay({ + body: res.body, + separator: providerInfo.streamSeparator, + signal: input.request.signal, + start: startTimestamp, + same: providerInfo.format === opts.format, + binary: binaryDecoder, + parse: (part) => { + logger.debug("PART: " + part) + usageParser.parse(part) }, + convert: streamConverter, + tail: async () => { + await rateLimiter?.track() + const usage = usageParser.retrieve() + if (!usage) return + const usageInfo = providerInfo.normalizeUsage(usage) + const costInfo = calculateCost(modelInfo, usageInfo) + await trialLimiter?.track(usageInfo) + await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo) + await reload(billingSource, authInfo, costInfo) + const cost = calculateOccurredCost(billingSource, costInfo) + return buildCostChunk(opts.format, cost) + }, + metric, + dump: dataDumper, }) return new Response(stream, { status: resStatus, diff --git a/packages/console/app/src/routes/zen/util/stream.ts b/packages/console/app/src/routes/zen/util/stream.ts new file mode 100644 index 0000000000..7bfc962d71 --- /dev/null +++ b/packages/console/app/src/routes/zen/util/stream.ts @@ -0,0 +1,196 @@ +const done = new Set(["[DONE]", "message_stop", "response.completed"]) + +type Dump = { + provideStream: (chunk: string) => void + flush: () => void +} + +type Opts = { + body: ReadableStream | null | undefined + separator: string + signal: AbortSignal + start: number + same: boolean + binary?: (chunk: Uint8Array) => Uint8Array | undefined + parse: (part: string) => void + convert: (part: string) => string + tail: () => Promise + metric: (values: Record) => void + dump?: Dump +} + +export const eventName = (part: string) => { + const line = part.split("\n", 1)[0]?.trim() ?? "" + if (line.startsWith("event:")) return line.slice(6).trim() || "message" + if (part.includes("[DONE]")) return "[DONE]" + if (line.startsWith("data:")) return "message" + return "unknown" +} + +const errInfo = (err: unknown) => { + if (err instanceof Error) { + return { + "stream.error_type": err.constructor.name, + "stream.error_message": err.message, + } + } + + return { + "stream.error_type": typeof err, + "stream.error_message": String(err), + } +} + +const stats = (len: number, cnt: number, seen: number, buf: string, end: string | undefined, gap: number) => ({ + "stream.response_length": len, + "stream.chunk_count": cnt, + "stream.event_count": seen, + "stream.pending_length": buf.length, + "stream.last_event": end, + "stream.max_gap_ms": gap || undefined, +}) + +export const relay = (opts: Opts) => + new ReadableStream({ + async start(c) { + let phase = "start" + let len = 0 + let cnt = 0 + let seen = 0 + let end: string | undefined + let gap = 0 + let prev: number | undefined + let completed = false + let aborted = false + let buf = "" + + if (!opts.body) { + opts.metric({ + "stream.event": "missing_body", + "stream.phase": phase, + }) + opts.dump?.flush() + c.close() + return + } + + const reader = opts.body.getReader() + const dec = new TextDecoder() + const enc = new TextEncoder() + const names: Record = {} + + const abort = () => { + aborted = true + reader.cancel().catch(() => undefined) + } + + const note = (part: string) => { + const name = eventName(part) + end = name + seen += 1 + names[name] = (names[name] ?? 0) + 1 + if (done.has(name)) completed = true + } + + opts.signal.addEventListener("abort", abort) + opts.metric({ + "stream.event": "started", + }) + + try { + while (true) { + phase = "read" + const raw = await reader.read() + if (raw.done) break + + if (len === 0) { + const now = Date.now() + opts.metric({ + time_to_first_byte: now - opts.start, + "timestamp.first_byte": now, + }) + } + + const value = opts.binary ? opts.binary(raw.value) : raw.value + if (!value) continue + + cnt += 1 + len += value.length + const now = Date.now() + if (prev !== undefined) gap = Math.max(gap, now - prev) + prev = now + + const text = dec.decode(value, { stream: true }) + buf += text + opts.dump?.provideStream(text) + + const parts = buf.split(opts.separator) + buf = parts.pop() ?? "" + + for (let part of parts) { + part = part.trim() + if (!part) continue + note(part) + phase = "parse" + opts.parse(part) + if (opts.same) continue + phase = "convert" + c.enqueue(enc.encode(opts.convert(part) + "\n\n")) + } + + if (opts.same) c.enqueue(value) + } + + const tail = dec.decode() + if (tail) { + buf += tail + opts.dump?.provideStream(tail) + } + + if (buf.trim()) { + const part = buf.trim() + note(part) + phase = "parse" + opts.parse(part) + if (!opts.same) { + phase = "convert" + c.enqueue(enc.encode(opts.convert(part) + "\n\n")) + } + buf = "" + } + + opts.metric({ + response_length: len, + "timestamp.last_byte": Date.now(), + }) + opts.dump?.flush() + + phase = "tail" + const chunk = await opts.tail() + if (chunk) c.enqueue(enc.encode(chunk)) + + c.close() + opts.metric({ + "stream.event": "finished", + "stream.phase": "done", + "stream.duration_ms": Date.now() - opts.start, + "stream.saw_completed": completed, + "stream.events": JSON.stringify(names), + ...stats(len, cnt, seen, buf, end, gap), + }) + } catch (err) { + opts.metric({ + "stream.event": aborted ? "aborted" : "error", + "stream.phase": phase, + "stream.duration_ms": Date.now() - opts.start, + "stream.saw_completed": completed, + "stream.events": JSON.stringify(names), + ...stats(len, cnt, seen, buf, end, gap), + ...errInfo(err), + }) + c.error(err) + } finally { + opts.signal.removeEventListener("abort", abort) + } + }, + }) diff --git a/packages/console/app/test/zenStream.test.ts b/packages/console/app/test/zenStream.test.ts new file mode 100644 index 0000000000..5540429a04 --- /dev/null +++ b/packages/console/app/test/zenStream.test.ts @@ -0,0 +1,139 @@ +import { describe, expect, test } from "bun:test" +import { eventName, relay } from "../src/routes/zen/util/stream" + +const enc = new TextEncoder() + +const read = (stream: ReadableStream) => new Response(stream).text() + +const body = (parts: string[]) => + new ReadableStream({ + async start(c) { + for (const part of parts) c.enqueue(enc.encode(part)) + c.close() + }, + }) + +describe("zen stream", () => { + test("parses known event names", () => { + expect(eventName("event: response.created\ndata: {}")).toBe("response.created") + expect(eventName('data: {"ok":true}')).toBe("message") + expect(eventName("data: [DONE]")).toBe("[DONE]") + }) + + test("relays split OpenAI responses and logs completion", async () => { + const seen: string[] = [] + const logs: Array> = [] + const stream = relay({ + body: body([ + "event: response.created\n", + 'data: {"type":"response.created"}\n\n', + "event: response.completed\n", + 'data: {"response":{"usage":{"input_tokens":1,"output_tokens":2}}}\n\n', + ]), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: (part) => { + seen.push(part) + }, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + const text = await read(stream) + expect(text).toContain("response.created") + expect(text).toContain("response.completed") + expect(seen).toHaveLength(2) + expect(logs.at(-1)?.["stream.event"]).toBe("finished") + expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true) + }) + + test("keeps reading when binary decoder needs another chunk", async () => { + let calls = 0 + const logs: Array> = [] + const stream = relay({ + body: body(["a", "b"]), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + binary: (chunk) => { + calls += 1 + if (calls === 1) return + return chunk + }, + parse: () => undefined, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + const text = await read(stream) + expect(text).toBe("b") + expect(logs.at(-1)?.["stream.event"]).toBe("finished") + }) + + test("flushes a final unterminated event at EOF", async () => { + const seen: string[] = [] + const logs: Array> = [] + const stream = relay({ + body: body(['event: response.completed\ndata: {"response":{"usage":{"input_tokens":1}}}']), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: (part) => { + seen.push(part) + }, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + const text = await read(stream) + expect(text).toContain("response.completed") + expect(seen).toHaveLength(1) + expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true) + }) + + test("closes cleanly when upstream body is missing", async () => { + const logs: Array> = [] + const stream = relay({ + body: null, + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: () => undefined, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + expect(await read(stream)).toBe("") + expect(logs.at(-1)?.["stream.event"]).toBe("missing_body") + }) + + test("surfaces postprocess failures with stream metrics", async () => { + const logs: Array> = [] + const stream = relay({ + body: body(["event: response.created\ndata: {}\n\n"]), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: () => undefined, + convert: (part) => part, + tail: async () => { + throw new Error("boom") + }, + metric: (values) => logs.push(values), + }) + + await expect(read(stream)).rejects.toThrow("boom") + expect(logs.at(-1)?.["stream.event"]).toBe("error") + expect(logs.at(-1)?.["stream.phase"]).toBe("tail") + }) +})