fix(core): a chunk timeout when processing llm stream (#16366)
parent
4c4aed5a87
commit
69ddc91c35
|
|
@ -972,6 +972,14 @@ export namespace Config {
|
||||||
.describe(
|
.describe(
|
||||||
"Timeout in milliseconds for requests to this provider. Default is 300000 (5 minutes). Set to false to disable timeout.",
|
"Timeout in milliseconds for requests to this provider. Default is 300000 (5 minutes). Set to false to disable timeout.",
|
||||||
),
|
),
|
||||||
|
chunkTimeout: z
|
||||||
|
.number()
|
||||||
|
.int()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.describe(
|
||||||
|
"Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted.",
|
||||||
|
),
|
||||||
})
|
})
|
||||||
.catchall(z.any())
|
.catchall(z.any())
|
||||||
.optional(),
|
.optional(),
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,8 @@ import { GoogleAuth } from "google-auth-library"
|
||||||
import { ProviderTransform } from "./transform"
|
import { ProviderTransform } from "./transform"
|
||||||
import { Installation } from "../installation"
|
import { Installation } from "../installation"
|
||||||
|
|
||||||
|
const DEFAULT_CHUNK_TIMEOUT = 120_000
|
||||||
|
|
||||||
export namespace Provider {
|
export namespace Provider {
|
||||||
const log = Log.create({ service: "provider" })
|
const log = Log.create({ service: "provider" })
|
||||||
|
|
||||||
|
|
@ -85,6 +87,54 @@ export namespace Provider {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function wrapSSE(res: Response, ms: number, ctl: AbortController) {
|
||||||
|
if (typeof ms !== "number" || ms <= 0) return res
|
||||||
|
if (!res.body) return res
|
||||||
|
if (!res.headers.get("content-type")?.includes("text/event-stream")) return res
|
||||||
|
|
||||||
|
const reader = res.body.getReader()
|
||||||
|
const body = new ReadableStream<Uint8Array>({
|
||||||
|
async pull(ctrl) {
|
||||||
|
const part = await new Promise<Awaited<ReturnType<typeof reader.read>>>((resolve, reject) => {
|
||||||
|
const id = setTimeout(() => {
|
||||||
|
const err = new Error("SSE read timed out")
|
||||||
|
ctl.abort(err)
|
||||||
|
void reader.cancel(err)
|
||||||
|
reject(err)
|
||||||
|
}, ms)
|
||||||
|
|
||||||
|
reader.read().then(
|
||||||
|
(part) => {
|
||||||
|
clearTimeout(id)
|
||||||
|
resolve(part)
|
||||||
|
},
|
||||||
|
(err) => {
|
||||||
|
clearTimeout(id)
|
||||||
|
reject(err)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
if (part.done) {
|
||||||
|
ctrl.close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctrl.enqueue(part.value)
|
||||||
|
},
|
||||||
|
async cancel(reason) {
|
||||||
|
ctl.abort(reason)
|
||||||
|
await reader.cancel(reason)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return new Response(body, {
|
||||||
|
headers: new Headers(res.headers),
|
||||||
|
status: res.status,
|
||||||
|
statusText: res.statusText,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
const BUNDLED_PROVIDERS: Record<string, (options: any) => SDK> = {
|
const BUNDLED_PROVIDERS: Record<string, (options: any) => SDK> = {
|
||||||
"@ai-sdk/amazon-bedrock": createAmazonBedrock,
|
"@ai-sdk/amazon-bedrock": createAmazonBedrock,
|
||||||
"@ai-sdk/anthropic": createAnthropic,
|
"@ai-sdk/anthropic": createAnthropic,
|
||||||
|
|
@ -1092,21 +1142,23 @@ export namespace Provider {
|
||||||
if (existing) return existing
|
if (existing) return existing
|
||||||
|
|
||||||
const customFetch = options["fetch"]
|
const customFetch = options["fetch"]
|
||||||
|
const chunkTimeout = options["chunkTimeout"] || DEFAULT_CHUNK_TIMEOUT
|
||||||
|
delete options["chunkTimeout"]
|
||||||
|
|
||||||
options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
|
options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
|
||||||
// Preserve custom fetch if it exists, wrap it with timeout logic
|
// Preserve custom fetch if it exists, wrap it with timeout logic
|
||||||
const fetchFn = customFetch ?? fetch
|
const fetchFn = customFetch ?? fetch
|
||||||
const opts = init ?? {}
|
const opts = init ?? {}
|
||||||
|
const chunkAbortCtl = typeof chunkTimeout === "number" && chunkTimeout > 0 ? new AbortController() : undefined
|
||||||
|
const signals: AbortSignal[] = []
|
||||||
|
|
||||||
if (options["timeout"] !== undefined && options["timeout"] !== null) {
|
if (opts.signal) signals.push(opts.signal)
|
||||||
const signals: AbortSignal[] = []
|
if (chunkAbortCtl) signals.push(chunkAbortCtl.signal)
|
||||||
if (opts.signal) signals.push(opts.signal)
|
if (options["timeout"] !== undefined && options["timeout"] !== null && options["timeout"] !== false)
|
||||||
if (options["timeout"] !== false) signals.push(AbortSignal.timeout(options["timeout"]))
|
signals.push(AbortSignal.timeout(options["timeout"]))
|
||||||
|
|
||||||
const combined = signals.length > 1 ? AbortSignal.any(signals) : signals[0]
|
const combined = signals.length === 0 ? null : signals.length === 1 ? signals[0] : AbortSignal.any(signals)
|
||||||
|
if (combined) opts.signal = combined
|
||||||
opts.signal = combined
|
|
||||||
}
|
|
||||||
|
|
||||||
// Strip openai itemId metadata following what codex does
|
// Strip openai itemId metadata following what codex does
|
||||||
// Codex uses #[serde(skip_serializing)] on id fields for all item types:
|
// Codex uses #[serde(skip_serializing)] on id fields for all item types:
|
||||||
|
|
@ -1126,11 +1178,14 @@ export namespace Provider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fetchFn(input, {
|
const res = await fetchFn(input, {
|
||||||
...opts,
|
...opts,
|
||||||
// @ts-ignore see here: https://github.com/oven-sh/bun/issues/16682
|
// @ts-ignore see here: https://github.com/oven-sh/bun/issues/16682
|
||||||
timeout: false,
|
timeout: false,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if (!chunkAbortCtl) return res
|
||||||
|
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
|
||||||
}
|
}
|
||||||
|
|
||||||
const bundledFn = BUNDLED_PROVIDERS[model.api.npm]
|
const bundledFn = BUNDLED_PROVIDERS[model.api.npm]
|
||||||
|
|
|
||||||
|
|
@ -260,6 +260,7 @@ test("env variable takes precedence, config merges options", async () => {
|
||||||
anthropic: {
|
anthropic: {
|
||||||
options: {
|
options: {
|
||||||
timeout: 60000,
|
timeout: 60000,
|
||||||
|
chunkTimeout: 15000,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
@ -277,6 +278,7 @@ test("env variable takes precedence, config merges options", async () => {
|
||||||
expect(providers["anthropic"]).toBeDefined()
|
expect(providers["anthropic"]).toBeDefined()
|
||||||
// Config options should be merged
|
// Config options should be merged
|
||||||
expect(providers["anthropic"].options.timeout).toBe(60000)
|
expect(providers["anthropic"].options.timeout).toBe(60000)
|
||||||
|
expect(providers["anthropic"].options.chunkTimeout).toBe(15000)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -244,7 +244,7 @@ You can configure the providers and models you want to use in your OpenCode conf
|
||||||
|
|
||||||
The `small_model` option configures a separate model for lightweight tasks like title generation. By default, OpenCode tries to use a cheaper model if one is available from your provider, otherwise it falls back to your main model.
|
The `small_model` option configures a separate model for lightweight tasks like title generation. By default, OpenCode tries to use a cheaper model if one is available from your provider, otherwise it falls back to your main model.
|
||||||
|
|
||||||
Provider options can include `timeout` and `setCacheKey`:
|
Provider options can include `timeout`, `chunkTimeout`, and `setCacheKey`:
|
||||||
|
|
||||||
```json title="opencode.json"
|
```json title="opencode.json"
|
||||||
{
|
{
|
||||||
|
|
@ -253,6 +253,7 @@ Provider options can include `timeout` and `setCacheKey`:
|
||||||
"anthropic": {
|
"anthropic": {
|
||||||
"options": {
|
"options": {
|
||||||
"timeout": 600000,
|
"timeout": 600000,
|
||||||
|
"chunkTimeout": 30000,
|
||||||
"setCacheKey": true
|
"setCacheKey": true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -261,6 +262,7 @@ Provider options can include `timeout` and `setCacheKey`:
|
||||||
```
|
```
|
||||||
|
|
||||||
- `timeout` - Request timeout in milliseconds (default: 300000). Set to `false` to disable.
|
- `timeout` - Request timeout in milliseconds (default: 300000). Set to `false` to disable.
|
||||||
|
- `chunkTimeout` - Timeout in milliseconds between streamed response chunks. If no chunk arrives in time, the request is aborted.
|
||||||
- `setCacheKey` - Ensure a cache key is always set for designated provider.
|
- `setCacheKey` - Ensure a cache key is always set for designated provider.
|
||||||
|
|
||||||
You can also configure [local models](/docs/models#local). [Learn more](/docs/models).
|
You can also configure [local models](/docs/models#local). [Learn more](/docs/models).
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue