From 8272d68335c8acae0bd76838c9f2a5caea23578e Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 26 Mar 2026 11:24:56 -0400 Subject: [PATCH] fix(console): harden zen stream relay diagnostics Add stream lifecycle metrics and extract the Zen relay loop so missing bodies, decoder stalls, EOF framing, and postprocess failures are easier to diagnose. Cover the new relay behavior with focused repro tests for the streaming edge cases we suspect in production. --- .../app/src/routes/zen/util/handler.ts | 108 ++++------ .../console/app/src/routes/zen/util/stream.ts | 196 ++++++++++++++++++ packages/console/app/test/zenStream.test.ts | 139 +++++++++++++ 3 files changed, 370 insertions(+), 73 deletions(-) create mode 100644 packages/console/app/src/routes/zen/util/stream.ts create mode 100644 packages/console/app/test/zenStream.test.ts diff --git a/packages/console/app/src/routes/zen/util/handler.ts b/packages/console/app/src/routes/zen/util/handler.ts index ee73b056de..2fe008c1c6 100644 --- a/packages/console/app/src/routes/zen/util/handler.ts +++ b/packages/console/app/src/routes/zen/util/handler.ts @@ -39,6 +39,7 @@ import { createRateLimiter } from "./rateLimiter" import { createDataDumper } from "./dataDumper" import { createTrialLimiter } from "./trialLimiter" import { createStickyTracker } from "./stickyProviderTracker" +import { relay } from "./stream" import { LiteData } from "@opencode-ai/console-core/lite.js" import { Resource } from "@opencode-ai/console-resource" import { i18n, type Key } from "~/i18n" @@ -254,80 +255,41 @@ export async function handler( const streamConverter = createStreamPartConverter(providerInfo.format, opts.format) const usageParser = providerInfo.createUsageParser() const binaryDecoder = providerInfo.createBinaryStreamDecoder() - const stream = new ReadableStream({ - start(c) { - const reader = res.body?.getReader() - const decoder = new TextDecoder() - const encoder = new TextEncoder() - - let buffer = "" - let responseLength = 0 - - function pump(): Promise { - return ( - reader?.read().then(async ({ done, value: rawValue }) => { - if (done) { - logger.metric({ - response_length: responseLength, - "timestamp.last_byte": Date.now(), - }) - dataDumper?.flush() - await rateLimiter?.track() - const usage = usageParser.retrieve() - if (usage) { - const usageInfo = providerInfo.normalizeUsage(usage) - const costInfo = calculateCost(modelInfo, usageInfo) - await trialLimiter?.track(usageInfo) - await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo) - await reload(billingSource, authInfo, costInfo) - const cost = calculateOccurredCost(billingSource, costInfo) - c.enqueue(encoder.encode(buildCostChunk(opts.format, cost))) - } - c.close() - return - } - - if (responseLength === 0) { - const now = Date.now() - logger.metric({ - time_to_first_byte: now - startTimestamp, - "timestamp.first_byte": now, - }) - } - - const value = binaryDecoder ? binaryDecoder(rawValue) : rawValue - if (!value) return - - responseLength += value.length - buffer += decoder.decode(value, { stream: true }) - dataDumper?.provideStream(buffer) - - const parts = buffer.split(providerInfo.streamSeparator) - buffer = parts.pop() ?? "" - - for (let part of parts) { - logger.debug("PART: " + part) - - part = part.trim() - usageParser.parse(part) - - if (providerInfo.format !== opts.format) { - part = streamConverter(part) - c.enqueue(encoder.encode(part + "\n\n")) - } - } - - if (providerInfo.format === opts.format) { - c.enqueue(value) - } - - return pump() - }) || Promise.resolve() - ) - } - - return pump() + const metric = (values: Record) => + logger.metric({ + request: requestId, + session: sessionId, + client: ocClient, + provider: providerInfo.id, + model: modelInfo.id, + ...values, + }) + const stream = relay({ + body: res.body, + separator: providerInfo.streamSeparator, + signal: input.request.signal, + start: startTimestamp, + same: providerInfo.format === opts.format, + binary: binaryDecoder, + parse: (part) => { + logger.debug("PART: " + part) + usageParser.parse(part) }, + convert: streamConverter, + tail: async () => { + await rateLimiter?.track() + const usage = usageParser.retrieve() + if (!usage) return + const usageInfo = providerInfo.normalizeUsage(usage) + const costInfo = calculateCost(modelInfo, usageInfo) + await trialLimiter?.track(usageInfo) + await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo) + await reload(billingSource, authInfo, costInfo) + const cost = calculateOccurredCost(billingSource, costInfo) + return buildCostChunk(opts.format, cost) + }, + metric, + dump: dataDumper, }) return new Response(stream, { status: resStatus, diff --git a/packages/console/app/src/routes/zen/util/stream.ts b/packages/console/app/src/routes/zen/util/stream.ts new file mode 100644 index 0000000000..7bfc962d71 --- /dev/null +++ b/packages/console/app/src/routes/zen/util/stream.ts @@ -0,0 +1,196 @@ +const done = new Set(["[DONE]", "message_stop", "response.completed"]) + +type Dump = { + provideStream: (chunk: string) => void + flush: () => void +} + +type Opts = { + body: ReadableStream | null | undefined + separator: string + signal: AbortSignal + start: number + same: boolean + binary?: (chunk: Uint8Array) => Uint8Array | undefined + parse: (part: string) => void + convert: (part: string) => string + tail: () => Promise + metric: (values: Record) => void + dump?: Dump +} + +export const eventName = (part: string) => { + const line = part.split("\n", 1)[0]?.trim() ?? "" + if (line.startsWith("event:")) return line.slice(6).trim() || "message" + if (part.includes("[DONE]")) return "[DONE]" + if (line.startsWith("data:")) return "message" + return "unknown" +} + +const errInfo = (err: unknown) => { + if (err instanceof Error) { + return { + "stream.error_type": err.constructor.name, + "stream.error_message": err.message, + } + } + + return { + "stream.error_type": typeof err, + "stream.error_message": String(err), + } +} + +const stats = (len: number, cnt: number, seen: number, buf: string, end: string | undefined, gap: number) => ({ + "stream.response_length": len, + "stream.chunk_count": cnt, + "stream.event_count": seen, + "stream.pending_length": buf.length, + "stream.last_event": end, + "stream.max_gap_ms": gap || undefined, +}) + +export const relay = (opts: Opts) => + new ReadableStream({ + async start(c) { + let phase = "start" + let len = 0 + let cnt = 0 + let seen = 0 + let end: string | undefined + let gap = 0 + let prev: number | undefined + let completed = false + let aborted = false + let buf = "" + + if (!opts.body) { + opts.metric({ + "stream.event": "missing_body", + "stream.phase": phase, + }) + opts.dump?.flush() + c.close() + return + } + + const reader = opts.body.getReader() + const dec = new TextDecoder() + const enc = new TextEncoder() + const names: Record = {} + + const abort = () => { + aborted = true + reader.cancel().catch(() => undefined) + } + + const note = (part: string) => { + const name = eventName(part) + end = name + seen += 1 + names[name] = (names[name] ?? 0) + 1 + if (done.has(name)) completed = true + } + + opts.signal.addEventListener("abort", abort) + opts.metric({ + "stream.event": "started", + }) + + try { + while (true) { + phase = "read" + const raw = await reader.read() + if (raw.done) break + + if (len === 0) { + const now = Date.now() + opts.metric({ + time_to_first_byte: now - opts.start, + "timestamp.first_byte": now, + }) + } + + const value = opts.binary ? opts.binary(raw.value) : raw.value + if (!value) continue + + cnt += 1 + len += value.length + const now = Date.now() + if (prev !== undefined) gap = Math.max(gap, now - prev) + prev = now + + const text = dec.decode(value, { stream: true }) + buf += text + opts.dump?.provideStream(text) + + const parts = buf.split(opts.separator) + buf = parts.pop() ?? "" + + for (let part of parts) { + part = part.trim() + if (!part) continue + note(part) + phase = "parse" + opts.parse(part) + if (opts.same) continue + phase = "convert" + c.enqueue(enc.encode(opts.convert(part) + "\n\n")) + } + + if (opts.same) c.enqueue(value) + } + + const tail = dec.decode() + if (tail) { + buf += tail + opts.dump?.provideStream(tail) + } + + if (buf.trim()) { + const part = buf.trim() + note(part) + phase = "parse" + opts.parse(part) + if (!opts.same) { + phase = "convert" + c.enqueue(enc.encode(opts.convert(part) + "\n\n")) + } + buf = "" + } + + opts.metric({ + response_length: len, + "timestamp.last_byte": Date.now(), + }) + opts.dump?.flush() + + phase = "tail" + const chunk = await opts.tail() + if (chunk) c.enqueue(enc.encode(chunk)) + + c.close() + opts.metric({ + "stream.event": "finished", + "stream.phase": "done", + "stream.duration_ms": Date.now() - opts.start, + "stream.saw_completed": completed, + "stream.events": JSON.stringify(names), + ...stats(len, cnt, seen, buf, end, gap), + }) + } catch (err) { + opts.metric({ + "stream.event": aborted ? "aborted" : "error", + "stream.phase": phase, + "stream.duration_ms": Date.now() - opts.start, + "stream.saw_completed": completed, + "stream.events": JSON.stringify(names), + ...stats(len, cnt, seen, buf, end, gap), + ...errInfo(err), + }) + c.error(err) + } finally { + opts.signal.removeEventListener("abort", abort) + } + }, + }) diff --git a/packages/console/app/test/zenStream.test.ts b/packages/console/app/test/zenStream.test.ts new file mode 100644 index 0000000000..5540429a04 --- /dev/null +++ b/packages/console/app/test/zenStream.test.ts @@ -0,0 +1,139 @@ +import { describe, expect, test } from "bun:test" +import { eventName, relay } from "../src/routes/zen/util/stream" + +const enc = new TextEncoder() + +const read = (stream: ReadableStream) => new Response(stream).text() + +const body = (parts: string[]) => + new ReadableStream({ + async start(c) { + for (const part of parts) c.enqueue(enc.encode(part)) + c.close() + }, + }) + +describe("zen stream", () => { + test("parses known event names", () => { + expect(eventName("event: response.created\ndata: {}")).toBe("response.created") + expect(eventName('data: {"ok":true}')).toBe("message") + expect(eventName("data: [DONE]")).toBe("[DONE]") + }) + + test("relays split OpenAI responses and logs completion", async () => { + const seen: string[] = [] + const logs: Array> = [] + const stream = relay({ + body: body([ + "event: response.created\n", + 'data: {"type":"response.created"}\n\n', + "event: response.completed\n", + 'data: {"response":{"usage":{"input_tokens":1,"output_tokens":2}}}\n\n', + ]), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: (part) => { + seen.push(part) + }, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + const text = await read(stream) + expect(text).toContain("response.created") + expect(text).toContain("response.completed") + expect(seen).toHaveLength(2) + expect(logs.at(-1)?.["stream.event"]).toBe("finished") + expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true) + }) + + test("keeps reading when binary decoder needs another chunk", async () => { + let calls = 0 + const logs: Array> = [] + const stream = relay({ + body: body(["a", "b"]), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + binary: (chunk) => { + calls += 1 + if (calls === 1) return + return chunk + }, + parse: () => undefined, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + const text = await read(stream) + expect(text).toBe("b") + expect(logs.at(-1)?.["stream.event"]).toBe("finished") + }) + + test("flushes a final unterminated event at EOF", async () => { + const seen: string[] = [] + const logs: Array> = [] + const stream = relay({ + body: body(['event: response.completed\ndata: {"response":{"usage":{"input_tokens":1}}}']), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: (part) => { + seen.push(part) + }, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + const text = await read(stream) + expect(text).toContain("response.completed") + expect(seen).toHaveLength(1) + expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true) + }) + + test("closes cleanly when upstream body is missing", async () => { + const logs: Array> = [] + const stream = relay({ + body: null, + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: () => undefined, + convert: (part) => part, + tail: async () => undefined, + metric: (values) => logs.push(values), + }) + + expect(await read(stream)).toBe("") + expect(logs.at(-1)?.["stream.event"]).toBe("missing_body") + }) + + test("surfaces postprocess failures with stream metrics", async () => { + const logs: Array> = [] + const stream = relay({ + body: body(["event: response.created\ndata: {}\n\n"]), + separator: "\n\n", + signal: new AbortController().signal, + start: Date.now(), + same: true, + parse: () => undefined, + convert: (part) => part, + tail: async () => { + throw new Error("boom") + }, + metric: (values) => logs.push(values), + }) + + await expect(read(stream)).rejects.toThrow("boom") + expect(logs.at(-1)?.["stream.event"]).toBe("error") + expect(logs.at(-1)?.["stream.phase"]).toBe("tail") + }) +})