refactor(session): simplify LLM stream by replacing queue with fromAsyncIterable (#20324)

pull/20330/head
Dax 2026-03-31 15:27:51 -04:00 committed by GitHub
parent 434d82bbe2
commit 567a91191a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 5 additions and 15 deletions

View File

@ -53,32 +53,22 @@ export namespace LLM {
Effect.gen(function* () { Effect.gen(function* () {
return Service.of({ return Service.of({
stream(input) { stream(input) {
const stream: Stream.Stream<Event, unknown> = Stream.scoped( return Stream.scoped(
Stream.unwrap( Stream.unwrap(
Effect.gen(function* () { Effect.gen(function* () {
const ctrl = yield* Effect.acquireRelease( const ctrl = yield* Effect.acquireRelease(
Effect.sync(() => new AbortController()), Effect.sync(() => new AbortController()),
(ctrl) => Effect.sync(() => ctrl.abort()), (ctrl) => Effect.sync(() => ctrl.abort()),
) )
const queue = yield* Queue.unbounded<Event, unknown | Cause.Done>()
yield* Effect.promise(async () => { const result = yield* Effect.promise(() => LLM.stream({ ...input, abort: ctrl.signal }))
const result = await LLM.stream({ ...input, abort: ctrl.signal })
for await (const event of result.fullStream) { return Stream.fromAsyncIterable(result.fullStream, (e) =>
if (!Queue.offerUnsafe(queue, event)) break e instanceof Error ? e : new Error(String(e)),
}
Queue.endUnsafe(queue)
}).pipe(
Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))),
Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())),
Effect.forkScoped,
) )
return Stream.fromQueue(queue)
}), }),
), ),
) )
return stream
}, },
}) })
}), }),