fix(console): harden zen stream relay diagnostics
Add stream lifecycle metrics and extract the Zen relay loop so missing bodies, decoder stalls, EOF framing, and postprocess failures are easier to diagnose. Cover the new relay behavior with focused repro tests for the streaming edge cases we suspect in production.pull/19308/head
parent
bcf18edde4
commit
8272d68335
|
|
@ -39,6 +39,7 @@ import { createRateLimiter } from "./rateLimiter"
|
|||
import { createDataDumper } from "./dataDumper"
|
||||
import { createTrialLimiter } from "./trialLimiter"
|
||||
import { createStickyTracker } from "./stickyProviderTracker"
|
||||
import { relay } from "./stream"
|
||||
import { LiteData } from "@opencode-ai/console-core/lite.js"
|
||||
import { Resource } from "@opencode-ai/console-resource"
|
||||
import { i18n, type Key } from "~/i18n"
|
||||
|
|
@ -254,80 +255,41 @@ export async function handler(
|
|||
const streamConverter = createStreamPartConverter(providerInfo.format, opts.format)
|
||||
const usageParser = providerInfo.createUsageParser()
|
||||
const binaryDecoder = providerInfo.createBinaryStreamDecoder()
|
||||
const stream = new ReadableStream({
|
||||
start(c) {
|
||||
const reader = res.body?.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
let buffer = ""
|
||||
let responseLength = 0
|
||||
|
||||
function pump(): Promise<void> {
|
||||
return (
|
||||
reader?.read().then(async ({ done, value: rawValue }) => {
|
||||
if (done) {
|
||||
logger.metric({
|
||||
response_length: responseLength,
|
||||
"timestamp.last_byte": Date.now(),
|
||||
})
|
||||
dataDumper?.flush()
|
||||
await rateLimiter?.track()
|
||||
const usage = usageParser.retrieve()
|
||||
if (usage) {
|
||||
const usageInfo = providerInfo.normalizeUsage(usage)
|
||||
const costInfo = calculateCost(modelInfo, usageInfo)
|
||||
await trialLimiter?.track(usageInfo)
|
||||
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
|
||||
await reload(billingSource, authInfo, costInfo)
|
||||
const cost = calculateOccurredCost(billingSource, costInfo)
|
||||
c.enqueue(encoder.encode(buildCostChunk(opts.format, cost)))
|
||||
}
|
||||
c.close()
|
||||
return
|
||||
}
|
||||
|
||||
if (responseLength === 0) {
|
||||
const now = Date.now()
|
||||
logger.metric({
|
||||
time_to_first_byte: now - startTimestamp,
|
||||
"timestamp.first_byte": now,
|
||||
})
|
||||
}
|
||||
|
||||
const value = binaryDecoder ? binaryDecoder(rawValue) : rawValue
|
||||
if (!value) return
|
||||
|
||||
responseLength += value.length
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
dataDumper?.provideStream(buffer)
|
||||
|
||||
const parts = buffer.split(providerInfo.streamSeparator)
|
||||
buffer = parts.pop() ?? ""
|
||||
|
||||
for (let part of parts) {
|
||||
logger.debug("PART: " + part)
|
||||
|
||||
part = part.trim()
|
||||
usageParser.parse(part)
|
||||
|
||||
if (providerInfo.format !== opts.format) {
|
||||
part = streamConverter(part)
|
||||
c.enqueue(encoder.encode(part + "\n\n"))
|
||||
}
|
||||
}
|
||||
|
||||
if (providerInfo.format === opts.format) {
|
||||
c.enqueue(value)
|
||||
}
|
||||
|
||||
return pump()
|
||||
}) || Promise.resolve()
|
||||
)
|
||||
}
|
||||
|
||||
return pump()
|
||||
const metric = (values: Record<string, unknown>) =>
|
||||
logger.metric({
|
||||
request: requestId,
|
||||
session: sessionId,
|
||||
client: ocClient,
|
||||
provider: providerInfo.id,
|
||||
model: modelInfo.id,
|
||||
...values,
|
||||
})
|
||||
const stream = relay({
|
||||
body: res.body,
|
||||
separator: providerInfo.streamSeparator,
|
||||
signal: input.request.signal,
|
||||
start: startTimestamp,
|
||||
same: providerInfo.format === opts.format,
|
||||
binary: binaryDecoder,
|
||||
parse: (part) => {
|
||||
logger.debug("PART: " + part)
|
||||
usageParser.parse(part)
|
||||
},
|
||||
convert: streamConverter,
|
||||
tail: async () => {
|
||||
await rateLimiter?.track()
|
||||
const usage = usageParser.retrieve()
|
||||
if (!usage) return
|
||||
const usageInfo = providerInfo.normalizeUsage(usage)
|
||||
const costInfo = calculateCost(modelInfo, usageInfo)
|
||||
await trialLimiter?.track(usageInfo)
|
||||
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
|
||||
await reload(billingSource, authInfo, costInfo)
|
||||
const cost = calculateOccurredCost(billingSource, costInfo)
|
||||
return buildCostChunk(opts.format, cost)
|
||||
},
|
||||
metric,
|
||||
dump: dataDumper,
|
||||
})
|
||||
return new Response(stream, {
|
||||
status: resStatus,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,196 @@
|
|||
const done = new Set(["[DONE]", "message_stop", "response.completed"])
|
||||
|
||||
type Dump = {
|
||||
provideStream: (chunk: string) => void
|
||||
flush: () => void
|
||||
}
|
||||
|
||||
type Opts = {
|
||||
body: ReadableStream<Uint8Array> | null | undefined
|
||||
separator: string
|
||||
signal: AbortSignal
|
||||
start: number
|
||||
same: boolean
|
||||
binary?: (chunk: Uint8Array) => Uint8Array | undefined
|
||||
parse: (part: string) => void
|
||||
convert: (part: string) => string
|
||||
tail: () => Promise<string | undefined>
|
||||
metric: (values: Record<string, unknown>) => void
|
||||
dump?: Dump
|
||||
}
|
||||
|
||||
export const eventName = (part: string) => {
|
||||
const line = part.split("\n", 1)[0]?.trim() ?? ""
|
||||
if (line.startsWith("event:")) return line.slice(6).trim() || "message"
|
||||
if (part.includes("[DONE]")) return "[DONE]"
|
||||
if (line.startsWith("data:")) return "message"
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
const errInfo = (err: unknown) => {
|
||||
if (err instanceof Error) {
|
||||
return {
|
||||
"stream.error_type": err.constructor.name,
|
||||
"stream.error_message": err.message,
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
"stream.error_type": typeof err,
|
||||
"stream.error_message": String(err),
|
||||
}
|
||||
}
|
||||
|
||||
const stats = (len: number, cnt: number, seen: number, buf: string, end: string | undefined, gap: number) => ({
|
||||
"stream.response_length": len,
|
||||
"stream.chunk_count": cnt,
|
||||
"stream.event_count": seen,
|
||||
"stream.pending_length": buf.length,
|
||||
"stream.last_event": end,
|
||||
"stream.max_gap_ms": gap || undefined,
|
||||
})
|
||||
|
||||
export const relay = (opts: Opts) =>
|
||||
new ReadableStream({
|
||||
async start(c) {
|
||||
let phase = "start"
|
||||
let len = 0
|
||||
let cnt = 0
|
||||
let seen = 0
|
||||
let end: string | undefined
|
||||
let gap = 0
|
||||
let prev: number | undefined
|
||||
let completed = false
|
||||
let aborted = false
|
||||
let buf = ""
|
||||
|
||||
if (!opts.body) {
|
||||
opts.metric({
|
||||
"stream.event": "missing_body",
|
||||
"stream.phase": phase,
|
||||
})
|
||||
opts.dump?.flush()
|
||||
c.close()
|
||||
return
|
||||
}
|
||||
|
||||
const reader = opts.body.getReader()
|
||||
const dec = new TextDecoder()
|
||||
const enc = new TextEncoder()
|
||||
const names: Record<string, number> = {}
|
||||
|
||||
const abort = () => {
|
||||
aborted = true
|
||||
reader.cancel().catch(() => undefined)
|
||||
}
|
||||
|
||||
const note = (part: string) => {
|
||||
const name = eventName(part)
|
||||
end = name
|
||||
seen += 1
|
||||
names[name] = (names[name] ?? 0) + 1
|
||||
if (done.has(name)) completed = true
|
||||
}
|
||||
|
||||
opts.signal.addEventListener("abort", abort)
|
||||
opts.metric({
|
||||
"stream.event": "started",
|
||||
})
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
phase = "read"
|
||||
const raw = await reader.read()
|
||||
if (raw.done) break
|
||||
|
||||
if (len === 0) {
|
||||
const now = Date.now()
|
||||
opts.metric({
|
||||
time_to_first_byte: now - opts.start,
|
||||
"timestamp.first_byte": now,
|
||||
})
|
||||
}
|
||||
|
||||
const value = opts.binary ? opts.binary(raw.value) : raw.value
|
||||
if (!value) continue
|
||||
|
||||
cnt += 1
|
||||
len += value.length
|
||||
const now = Date.now()
|
||||
if (prev !== undefined) gap = Math.max(gap, now - prev)
|
||||
prev = now
|
||||
|
||||
const text = dec.decode(value, { stream: true })
|
||||
buf += text
|
||||
opts.dump?.provideStream(text)
|
||||
|
||||
const parts = buf.split(opts.separator)
|
||||
buf = parts.pop() ?? ""
|
||||
|
||||
for (let part of parts) {
|
||||
part = part.trim()
|
||||
if (!part) continue
|
||||
note(part)
|
||||
phase = "parse"
|
||||
opts.parse(part)
|
||||
if (opts.same) continue
|
||||
phase = "convert"
|
||||
c.enqueue(enc.encode(opts.convert(part) + "\n\n"))
|
||||
}
|
||||
|
||||
if (opts.same) c.enqueue(value)
|
||||
}
|
||||
|
||||
const tail = dec.decode()
|
||||
if (tail) {
|
||||
buf += tail
|
||||
opts.dump?.provideStream(tail)
|
||||
}
|
||||
|
||||
if (buf.trim()) {
|
||||
const part = buf.trim()
|
||||
note(part)
|
||||
phase = "parse"
|
||||
opts.parse(part)
|
||||
if (!opts.same) {
|
||||
phase = "convert"
|
||||
c.enqueue(enc.encode(opts.convert(part) + "\n\n"))
|
||||
}
|
||||
buf = ""
|
||||
}
|
||||
|
||||
opts.metric({
|
||||
response_length: len,
|
||||
"timestamp.last_byte": Date.now(),
|
||||
})
|
||||
opts.dump?.flush()
|
||||
|
||||
phase = "tail"
|
||||
const chunk = await opts.tail()
|
||||
if (chunk) c.enqueue(enc.encode(chunk))
|
||||
|
||||
c.close()
|
||||
opts.metric({
|
||||
"stream.event": "finished",
|
||||
"stream.phase": "done",
|
||||
"stream.duration_ms": Date.now() - opts.start,
|
||||
"stream.saw_completed": completed,
|
||||
"stream.events": JSON.stringify(names),
|
||||
...stats(len, cnt, seen, buf, end, gap),
|
||||
})
|
||||
} catch (err) {
|
||||
opts.metric({
|
||||
"stream.event": aborted ? "aborted" : "error",
|
||||
"stream.phase": phase,
|
||||
"stream.duration_ms": Date.now() - opts.start,
|
||||
"stream.saw_completed": completed,
|
||||
"stream.events": JSON.stringify(names),
|
||||
...stats(len, cnt, seen, buf, end, gap),
|
||||
...errInfo(err),
|
||||
})
|
||||
c.error(err)
|
||||
} finally {
|
||||
opts.signal.removeEventListener("abort", abort)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
import { describe, expect, test } from "bun:test"
|
||||
import { eventName, relay } from "../src/routes/zen/util/stream"
|
||||
|
||||
const enc = new TextEncoder()
|
||||
|
||||
const read = (stream: ReadableStream<Uint8Array>) => new Response(stream).text()
|
||||
|
||||
const body = (parts: string[]) =>
|
||||
new ReadableStream<Uint8Array>({
|
||||
async start(c) {
|
||||
for (const part of parts) c.enqueue(enc.encode(part))
|
||||
c.close()
|
||||
},
|
||||
})
|
||||
|
||||
describe("zen stream", () => {
|
||||
test("parses known event names", () => {
|
||||
expect(eventName("event: response.created\ndata: {}")).toBe("response.created")
|
||||
expect(eventName('data: {"ok":true}')).toBe("message")
|
||||
expect(eventName("data: [DONE]")).toBe("[DONE]")
|
||||
})
|
||||
|
||||
test("relays split OpenAI responses and logs completion", async () => {
|
||||
const seen: string[] = []
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body([
|
||||
"event: response.created\n",
|
||||
'data: {"type":"response.created"}\n\n',
|
||||
"event: response.completed\n",
|
||||
'data: {"response":{"usage":{"input_tokens":1,"output_tokens":2}}}\n\n',
|
||||
]),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: (part) => {
|
||||
seen.push(part)
|
||||
},
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
const text = await read(stream)
|
||||
expect(text).toContain("response.created")
|
||||
expect(text).toContain("response.completed")
|
||||
expect(seen).toHaveLength(2)
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("finished")
|
||||
expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true)
|
||||
})
|
||||
|
||||
test("keeps reading when binary decoder needs another chunk", async () => {
|
||||
let calls = 0
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body(["a", "b"]),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
binary: (chunk) => {
|
||||
calls += 1
|
||||
if (calls === 1) return
|
||||
return chunk
|
||||
},
|
||||
parse: () => undefined,
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
const text = await read(stream)
|
||||
expect(text).toBe("b")
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("finished")
|
||||
})
|
||||
|
||||
test("flushes a final unterminated event at EOF", async () => {
|
||||
const seen: string[] = []
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body(['event: response.completed\ndata: {"response":{"usage":{"input_tokens":1}}}']),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: (part) => {
|
||||
seen.push(part)
|
||||
},
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
const text = await read(stream)
|
||||
expect(text).toContain("response.completed")
|
||||
expect(seen).toHaveLength(1)
|
||||
expect(logs.at(-1)?.["stream.saw_completed"]).toBe(true)
|
||||
})
|
||||
|
||||
test("closes cleanly when upstream body is missing", async () => {
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: null,
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: () => undefined,
|
||||
convert: (part) => part,
|
||||
tail: async () => undefined,
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
expect(await read(stream)).toBe("")
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("missing_body")
|
||||
})
|
||||
|
||||
test("surfaces postprocess failures with stream metrics", async () => {
|
||||
const logs: Array<Record<string, unknown>> = []
|
||||
const stream = relay({
|
||||
body: body(["event: response.created\ndata: {}\n\n"]),
|
||||
separator: "\n\n",
|
||||
signal: new AbortController().signal,
|
||||
start: Date.now(),
|
||||
same: true,
|
||||
parse: () => undefined,
|
||||
convert: (part) => part,
|
||||
tail: async () => {
|
||||
throw new Error("boom")
|
||||
},
|
||||
metric: (values) => logs.push(values),
|
||||
})
|
||||
|
||||
await expect(read(stream)).rejects.toThrow("boom")
|
||||
expect(logs.at(-1)?.["stream.event"]).toBe("error")
|
||||
expect(logs.at(-1)?.["stream.phase"]).toBe("tail")
|
||||
})
|
||||
})
|
||||
Loading…
Reference in New Issue