From 567a91191aabe14c82eebd541ad8fffe20f8bc8a Mon Sep 17 00:00:00 2001 From: Dax Date: Tue, 31 Mar 2026 15:27:51 -0400 Subject: [PATCH] refactor(session): simplify LLM stream by replacing queue with fromAsyncIterable (#20324) --- packages/opencode/src/session/llm.ts | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index f5717da55e..dc89db409e 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -53,32 +53,22 @@ export namespace LLM { Effect.gen(function* () { return Service.of({ stream(input) { - const stream: Stream.Stream = Stream.scoped( + return Stream.scoped( Stream.unwrap( Effect.gen(function* () { const ctrl = yield* Effect.acquireRelease( Effect.sync(() => new AbortController()), (ctrl) => Effect.sync(() => ctrl.abort()), ) - const queue = yield* Queue.unbounded() - yield* Effect.promise(async () => { - const result = await LLM.stream({ ...input, abort: ctrl.signal }) - for await (const event of result.fullStream) { - if (!Queue.offerUnsafe(queue, event)) break - } - Queue.endUnsafe(queue) - }).pipe( - Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))), - Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())), - Effect.forkScoped, + const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal })) + + return Stream.fromAsyncIterable(result.fullStream, (e) => + e instanceof Error ? e : new Error(String(e)), ) - - return Stream.fromQueue(queue) }), ), ) - return stream }, }) }),