From 7532d99e5b596ac8a4736154efdda649dcdcd4cb Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 31 Mar 2026 20:45:42 -0400 Subject: [PATCH] test: finish HTTP mock processor coverage (#20372) --- packages/opencode/test/lib/llm-server.ts | 474 +++++++++----- .../test/session/processor-effect.test.ts | 579 +++++++----------- 2 files changed, 534 insertions(+), 519 deletions(-) diff --git a/packages/opencode/test/lib/llm-server.ts b/packages/opencode/test/lib/llm-server.ts index b0a54424ef..8e7365d97f 100644 --- a/packages/opencode/test/lib/llm-server.ts +++ b/packages/opencode/test/lib/llm-server.ts @@ -1,31 +1,12 @@ -import { NodeHttpServer } from "@effect/platform-node" +import { NodeHttpServer, NodeHttpServerRequest } from "@effect/platform-node" import * as Http from "node:http" import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect" import * as HttpServer from "effect/unstable/http/HttpServer" import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" -type Step = - | { - type: "text" - text: string - } - | { - type: "tool" - tool: string - input: unknown - } - | { - type: "fail" - message: string - } - | { - type: "hang" - } - | { - type: "hold" - text: string - wait: PromiseLike - } +export type Usage = { input: number; output: number } + +type Line = Record type Hit = { url: URL @@ -37,147 +18,293 @@ type Wait = { ready: Deferred.Deferred } -function sse(lines: unknown[]) { - return HttpServerResponse.stream( - Stream.fromIterable([ - [...lines.map((line) => `data: ${JSON.stringify(line)}`), "data: [DONE]"].join("\n\n") + "\n\n", - ]).pipe(Stream.encodeText), - { contentType: "text/event-stream" }, - ) +type Sse = { + type: "sse" + head: unknown[] + tail: unknown[] + wait?: PromiseLike + hang?: boolean + error?: unknown + reset?: boolean } -function text(step: Extract) { - return sse([ - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { role: "assistant" } }], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { content: step.text } }], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "stop" }], - }, - ]) +type HttpError = { + type: "http-error" + status: number + body: unknown } -function tool(step: Extract, seq: number) { - const id = `call_${seq}` - const args = JSON.stringify(step.input) - return sse([ - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { role: "assistant" } }], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [ +export type Item = Sse | HttpError + +const done = Symbol("done") + +function line(input: unknown) { + if (input === done) return "data: [DONE]\n\n" + return `data: ${JSON.stringify(input)}\n\n` +} + +function tokens(input?: Usage) { + if (!input) return + return { + prompt_tokens: input.input, + completion_tokens: input.output, + total_tokens: input.input + input.output, + } +} + +function chunk(input: { delta?: Record; finish?: string; usage?: Usage }) { + return { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [ + { + delta: input.delta ?? {}, + ...(input.finish ? { finish_reason: input.finish } : {}), + }, + ], + ...(input.usage ? { usage: tokens(input.usage) } : {}), + } satisfies Line +} + +function role() { + return chunk({ delta: { role: "assistant" } }) +} + +function textLine(value: string) { + return chunk({ delta: { content: value } }) +} + +function reasonLine(value: string) { + return chunk({ delta: { reasoning_content: value } }) +} + +function finishLine(reason: string, usage?: Usage) { + return chunk({ finish: reason, usage }) +} + +function toolStartLine(id: string, name: string) { + return chunk({ + delta: { + tool_calls: [ { - delta: { - tool_calls: [ - { - index: 0, - id, - type: "function", - function: { - name: step.tool, - arguments: "", - }, - }, - ], + index: 0, + id, + type: "function", + function: { + name, + arguments: "", }, }, ], }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [ + }) +} + +function toolArgsLine(value: string) { + return chunk({ + delta: { + tool_calls: [ { - delta: { - tool_calls: [ - { - index: 0, - function: { - arguments: args, - }, - }, - ], + index: 0, + function: { + arguments: value, }, }, ], }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "tool_calls" }], - }, - ]) + }) } -function fail(step: Extract) { - return HttpServerResponse.stream( - Stream.fromIterable([ - 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', - ]).pipe(Stream.encodeText, Stream.concat(Stream.fail(new Error(step.message)))), - { contentType: "text/event-stream" }, - ) +function bytes(input: Iterable) { + return Stream.fromIterable([...input].map(line)).pipe(Stream.encodeText) } -function hang() { - return HttpServerResponse.stream( - Stream.fromIterable([ - 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', - ]).pipe(Stream.encodeText, Stream.concat(Stream.never)), - { contentType: "text/event-stream" }, - ) +function send(item: Sse) { + const head = bytes(item.head) + const tail = bytes([...item.tail, ...(item.hang || item.error ? [] : [done])]) + const empty = Stream.fromIterable([]) + const wait = item.wait + const body: Stream.Stream = wait + ? Stream.concat(head, Stream.fromEffect(Effect.promise(() => wait)).pipe(Stream.flatMap(() => tail))) + : Stream.concat(head, tail) + let end: Stream.Stream = empty + if (item.error) end = Stream.concat(empty, Stream.fail(item.error)) + else if (item.hang) end = Stream.concat(empty, Stream.never) + + return HttpServerResponse.stream(Stream.concat(body, end), { contentType: "text/event-stream" }) } -function hold(step: Extract) { - return HttpServerResponse.stream( - Stream.fromIterable([ - 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', - ]).pipe( - Stream.encodeText, - Stream.concat( - Stream.fromEffect(Effect.promise(() => step.wait)).pipe( - Stream.flatMap(() => - Stream.fromIterable([ - `data: ${JSON.stringify({ - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { content: step.text } }], - })}\n\n`, - `data: ${JSON.stringify({ - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "stop" }], - })}\n\n`, - "data: [DONE]\n\n", - ]).pipe(Stream.encodeText), - ), - ), - ), - ), - { contentType: "text/event-stream" }, - ) +const reset = Effect.fn("TestLLMServer.reset")(function* (item: Sse) { + const req = yield* HttpServerRequest.HttpServerRequest + const res = NodeHttpServerRequest.toServerResponse(req) + yield* Effect.sync(() => { + res.writeHead(200, { "content-type": "text/event-stream" }) + for (const part of item.head) res.write(line(part)) + for (const part of item.tail) res.write(line(part)) + res.destroy(new Error("connection reset")) + }) + yield* Effect.never +}) + +function fail(item: HttpError) { + return HttpServerResponse.text(JSON.stringify(item.body), { + status: item.status, + contentType: "application/json", + }) +} + +export class Reply { + #head: unknown[] = [role()] + #tail: unknown[] = [] + #usage: Usage | undefined + #finish: string | undefined + #wait: PromiseLike | undefined + #hang = false + #error: unknown + #reset = false + #seq = 0 + + #id() { + this.#seq += 1 + return `call_${this.#seq}` + } + + text(value: string) { + this.#tail = [...this.#tail, textLine(value)] + return this + } + + reason(value: string) { + this.#tail = [...this.#tail, reasonLine(value)] + return this + } + + usage(value: Usage) { + this.#usage = value + return this + } + + wait(value: PromiseLike) { + this.#wait = value + return this + } + + stop() { + this.#finish = "stop" + this.#hang = false + this.#error = undefined + this.#reset = false + return this + } + + toolCalls() { + this.#finish = "tool_calls" + this.#hang = false + this.#error = undefined + this.#reset = false + return this + } + + tool(name: string, input: unknown) { + const id = this.#id() + const args = JSON.stringify(input) + this.#tail = [...this.#tail, toolStartLine(id, name), toolArgsLine(args)] + return this.toolCalls() + } + + pendingTool(name: string, input: unknown) { + const id = this.#id() + const args = JSON.stringify(input) + const size = Math.max(1, Math.floor(args.length / 2)) + this.#tail = [...this.#tail, toolStartLine(id, name), toolArgsLine(args.slice(0, size))] + return this + } + + hang() { + this.#finish = undefined + this.#hang = true + this.#error = undefined + this.#reset = false + return this + } + + streamError(error: unknown = "boom") { + this.#finish = undefined + this.#hang = false + this.#error = error + this.#reset = false + return this + } + + reset() { + this.#finish = undefined + this.#hang = false + this.#error = undefined + this.#reset = true + return this + } + + item(): Item { + return { + type: "sse", + head: this.#head, + tail: this.#finish ? [...this.#tail, finishLine(this.#finish, this.#usage)] : this.#tail, + wait: this.#wait, + hang: this.#hang, + error: this.#error, + reset: this.#reset, + } + } +} + +export function reply() { + return new Reply() +} + +export function httpError(status: number, body: unknown): Item { + return { + type: "http-error", + status, + body, + } +} + +export function raw(input: { + chunks?: unknown[] + head?: unknown[] + tail?: unknown[] + wait?: PromiseLike + hang?: boolean + error?: unknown + reset?: boolean +}): Item { + return { + type: "sse", + head: input.head ?? input.chunks ?? [], + tail: input.tail ?? [], + wait: input.wait, + hang: input.hang, + error: input.error, + reset: input.reset, + } +} + +function item(input: Item | Reply) { + return input instanceof Reply ? input.item() : input } namespace TestLLMServer { export interface Service { readonly url: string - readonly text: (value: string) => Effect.Effect - readonly tool: (tool: string, input: unknown) => Effect.Effect - readonly fail: (message?: string) => Effect.Effect + readonly push: (...input: (Item | Reply)[]) => Effect.Effect + readonly text: (value: string, opts?: { usage?: Usage }) => Effect.Effect + readonly tool: (name: string, input: unknown) => Effect.Effect + readonly toolHang: (name: string, input: unknown) => Effect.Effect + readonly reason: (value: string, opts?: { text?: string; usage?: Usage }) => Effect.Effect + readonly fail: (message?: unknown) => Effect.Effect + readonly error: (status: number, body: unknown) => Effect.Effect readonly hang: Effect.Effect - readonly hold: (text: string, wait: PromiseLike) => Effect.Effect + readonly hold: (value: string, wait: PromiseLike) => Effect.Effect readonly hits: Effect.Effect readonly calls: Effect.Effect readonly wait: (count: number) => Effect.Effect @@ -194,12 +321,11 @@ export class TestLLMServer extends ServiceMap.Service { - list = [...list, step] + const queue = (...input: (Item | Reply)[]) => { + list = [...list, ...input.map(item)] } const notify = Effect.fnUntraced(function* () { @@ -210,11 +336,10 @@ export class TestLLMServer extends ServiceMap.Service { - const step = list[0] - if (!step) return { step: undefined, seq } - seq += 1 + const first = list[0] + if (!first) return list = list.slice(1) - return { step, seq } + return first } yield* router.add( @@ -223,21 +348,22 @@ export class TestLLMServer extends ServiceMap.Service ({}))) + if (!next) return HttpServerResponse.text("unexpected request", { status: 500 }) + const body = yield* req.json.pipe(Effect.orElseSucceed(() => ({}))) hits = [ ...hits, { url: new URL(req.originalUrl, "http://localhost"), - body: json && typeof json === "object" ? (json as Record) : {}, + body: body && typeof body === "object" ? (body as Record) : {}, }, ] yield* notify() - if (next.step.type === "text") return text(next.step) - if (next.step.type === "tool") return tool(next.step, next.seq) - if (next.step.type === "fail") return fail(next.step) - if (next.step.type === "hang") return hang() - return hold(next.step) + if (next.type === "sse" && next.reset) { + yield* reset(next) + return HttpServerResponse.empty() + } + if (next.type === "sse") return send(next) + return fail(next) }), ) @@ -248,20 +374,37 @@ export class TestLLMServer extends ServiceMap.Service) { - push({ type: "hold", text, wait }) + hold: Effect.fn("TestLLMServer.hold")(function* (value: string, wait: PromiseLike) { + queue(reply().wait(wait).text(value).stop().item()) }), hits: Effect.sync(() => [...hits]), calls: Effect.sync(() => hits.length), @@ -275,8 +418,5 @@ export class TestLLMServer extends ServiceMap.Service list.length), }) }), - ).pipe( - Layer.provide(HttpRouter.layer), // - Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 })), - ) + ).pipe(Layer.provide(HttpRouter.layer), Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 }))) } diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 23c6911a2c..1dd8b7edc9 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,8 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" -import { APICallError } from "ai" -import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect" -import * as Stream from "effect/Stream" +import { Cause, Effect, Exit, Fiber, Layer } from "effect" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -10,7 +8,7 @@ import { Bus } from "../../src/bus" import { Config } from "../../src/config/config" import { Permission } from "../../src/permission" import { Plugin } from "../../src/plugin" -import type { Provider } from "../../src/provider/provider" +import { Provider } from "../../src/provider/provider" import { ModelID, ProviderID } from "../../src/provider/schema" import { Session } from "../../src/session" import { LLM } from "../../src/session/llm" @@ -21,8 +19,9 @@ import { SessionStatus } from "../../src/session/status" import { Snapshot } from "../../src/snapshot" import { Log } from "../../src/util/log" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { provideTmpdirInstance } from "../fixture/fixture" +import { provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" +import { reply, TestLLMServer } from "../lib/llm-server" Log.init({ print: false }) @@ -31,118 +30,51 @@ const ref = { modelID: ModelID.make("test-model"), } -type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) - -class TestLLM extends ServiceMap.Service< - TestLLM, - { - readonly push: (stream: Script) => Effect.Effect - readonly reply: (...items: LLM.Event[]) => Effect.Effect - readonly calls: Effect.Effect - readonly inputs: Effect.Effect - } ->()("@test/SessionProcessorLLM") {} - -function stream(...items: LLM.Event[]) { - return Stream.make(...items) +const cfg = { + provider: { + test: { + name: "Test", + id: "test", + env: [], + npm: "@ai-sdk/openai-compatible", + models: { + "test-model": { + id: "test-model", + name: "Test Model", + attachment: false, + reasoning: false, + temperature: false, + tool_call: true, + release_date: "2025-01-01", + limit: { context: 100000, output: 10000 }, + cost: { input: 0, output: 0 }, + options: {}, + }, + }, + options: { + apiKey: "test-key", + baseURL: "http://localhost:1/v1", + }, + }, + }, } -function usage(input = 1, output = 1, total = input + output) { +function providerCfg(url: string) { return { - inputTokens: input, - outputTokens: output, - totalTokens: total, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, + ...cfg, + provider: { + ...cfg.provider, + test: { + ...cfg.provider.test, + options: { + ...cfg.provider.test.options, + baseURL: url, + }, + }, }, } } -function start(): LLM.Event { - return { type: "start" } -} - -function textStart(id = "t"): LLM.Event { - return { type: "text-start", id } -} - -function textDelta(id: string, text: string): LLM.Event { - return { type: "text-delta", id, text } -} - -function textEnd(id = "t"): LLM.Event { - return { type: "text-end", id } -} - -function reasoningStart(id: string): LLM.Event { - return { type: "reasoning-start", id } -} - -function reasoningDelta(id: string, text: string): LLM.Event { - return { type: "reasoning-delta", id, text } -} - -function reasoningEnd(id: string): LLM.Event { - return { type: "reasoning-end", id } -} - -function finishStep(): LLM.Event { - return { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - } -} - -function finish(): LLM.Event { - return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() } -} - -function toolInputStart(id: string, toolName: string): LLM.Event { - return { type: "tool-input-start", id, toolName } -} - -function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event { - return { type: "tool-call", toolCallId, toolName, input } -} - -function fail(err: E, ...items: LLM.Event[]) { - return stream(...items).pipe(Stream.concat(Stream.fail(err))) -} - -function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) { - return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never))) -} - -function model(context: number): Provider.Model { - return { - id: "test-model", - providerID: "test", - name: "Test", - limit: { context, output: 10 }, - cost: { input: 0, output: 0, cache: { read: 0, write: 0 } }, - capabilities: { - toolcall: true, - attachment: false, - reasoning: false, - temperature: true, - input: { text: true, image: false, audio: false, video: false }, - output: { text: true, image: false, audio: false, video: false }, - }, - api: { npm: "@ai-sdk/anthropic" }, - options: {}, - } as Provider.Model -} - function agent(): Agent.Info { return { name: "build", @@ -211,43 +143,6 @@ const assistant = Effect.fn("TestSession.assistant")(function* ( return msg }) -const llm = Layer.unwrap( - Effect.gen(function* () { - const queue: Script[] = [] - const inputs: LLM.StreamInput[] = [] - let calls = 0 - - const push = Effect.fn("TestLLM.push")((item: Script) => { - queue.push(item) - return Effect.void - }) - - const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items))) - return Layer.mergeAll( - Layer.succeed( - LLM.Service, - LLM.Service.of({ - stream: (input) => { - calls += 1 - inputs.push(input) - const item = queue.shift() ?? Stream.empty - return typeof item === "function" ? item(input) : item - }, - }), - ), - Layer.succeed( - TestLLM, - TestLLM.of({ - push, - reply, - calls: Effect.sync(() => calls), - inputs: Effect.sync(() => [...inputs]), - }), - ), - ) - }), -) - const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) const deps = Layer.mergeAll( @@ -257,27 +152,37 @@ const deps = Layer.mergeAll( Permission.layer, Plugin.defaultLayer, Config.defaultLayer, + LLM.defaultLayer, + Provider.defaultLayer, status, - llm, ).pipe(Layer.provideMerge(infra)) -const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) +const env = Layer.mergeAll(TestLLMServer.layer, SessionProcessor.layer.pipe(Layer.provideMerge(deps))) const it = testEffect(env) -it.live("session.processor effect tests capture llm input cleanly", () => { - return provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service +const boot = Effect.fn("test.boot")(function* () { + const processors = yield* SessionProcessor.Service + const session = yield* Session.Service + const provider = yield* Provider.Service + return { processors, session, provider } +}) - yield* test.reply(start(), textStart(), textDelta("t", "hello"), textEnd(), finishStep(), finish()) +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +it.live("session.processor effect tests capture llm input cleanly", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.text("hello") const chat = yield* session.create({}) const parent = yield* user(chat.id, "hi") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -303,46 +208,29 @@ it.live("session.processor effect tests capture llm input cleanly", () => { const value = yield* handle.process(input) const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) - const calls = yield* test.calls - const inputs = yield* test.inputs + const calls = yield* llm.calls expect(value).toBe("continue") expect(calls).toBe(1) - expect(inputs).toHaveLength(1) - expect(inputs[0].messages).toStrictEqual([{ role: "user", content: "hi" }]) expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests stop after token overflow requests compaction", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests stop after token overflow requests compaction", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.reply( - start(), - { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(100, 0, 100), - }, - textStart(), - textDelta("t", "after"), - textEnd(), - ) + yield* llm.text("after", { usage: { input: 100, output: 0 } }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "compact") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(20) + const base = yield* provider.getModel(ref.providerID, ref.modelID) + const mdl = { ...base, limit: { context: 20, output: 10 } } const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -369,51 +257,73 @@ it.live("session.processor effect tests stop after token overflow requests compa const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) expect(value).toBe("compact") - expect(parts.some((part) => part.type === "text")).toBe(false) + expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) expect(parts.some((part) => part.type === "step-finish")).toBe(true) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests reset reasoning state across retries", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests capture reasoning from http mock", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.push( - fail( - new APICallError({ - message: "boom", - url: "https://example.com/v1/chat/completions", - requestBodyValues: {}, - statusCode: 503, - responseHeaders: { "retry-after-ms": "0" }, - responseBody: '{"error":"boom"}', - isRetryable: true, - }), - start(), - reasoningStart("r"), - reasoningDelta("r", "one"), - ), - ) - - yield* test.reply( - start(), - reasoningStart("r"), - reasoningDelta("r", "two"), - reasoningEnd("r"), - finishStep(), - finish(), - ) + yield* llm.push(reply().reason("think").text("done").stop()) const chat = yield* session.create({}) const parent = yield* user(chat.id, "reason") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "reason" }], + tools: {}, + }) + + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning") + const text = parts.find((part): part is MessageV2.TextPart => part.type === "text") + + expect(value).toBe("continue") + expect(yield* llm.calls).toBe(1) + expect(reasoning?.text).toBe("think") + expect(text?.text).toBe("done") + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + +it.live("session.processor effect tests reset reasoning state across retries", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.push(reply().reason("one").reset(), reply().reason("two").stop()) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "reason") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -441,28 +351,26 @@ it.live("session.processor effect tests reset reasoning state across retries", ( const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning") expect(value).toBe("continue") - expect(yield* test.calls).toBe(2) + expect(yield* llm.calls).toBe(2) expect(reasoning.some((part) => part.text === "two")).toBe(true) expect(reasoning.some((part) => part.text === "onetwo")).toBe(false) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests do not retry unknown json errors", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests do not retry unknown json errors", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.push(fail({ error: { message: "no_kv_space" } }, start())) + yield* llm.error(400, { error: { message: "no_kv_space" } }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -487,29 +395,26 @@ it.live("session.processor effect tests do not retry unknown json errors", () => }) expect(value).toBe("stop") - expect(yield* test.calls).toBe(1) - expect(yield* test.inputs).toHaveLength(1) - expect(handle.message.error?.name).toBe("UnknownError") + expect(yield* llm.calls).toBe(1) + expect(handle.message.error?.name).toBe("APIError") }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests retry recognized structured json errors", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests retry recognized structured json errors", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.push(fail({ type: "error", error: { type: "too_many_requests" } }, start())) - yield* test.reply(start(), textStart(), textDelta("t", "after"), textEnd(), finishStep(), finish()) + yield* llm.error(429, { type: "error", error: { type: "too_many_requests" } }) + yield* llm.text("after") const chat = yield* session.create({}) const parent = yield* user(chat.id, "retry json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -536,43 +441,28 @@ it.live("session.processor effect tests retry recognized structured json errors" const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) expect(value).toBe("continue") - expect(yield* test.calls).toBe(2) + expect(yield* llm.calls).toBe(2) expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) expect(handle.message.error).toBeUndefined() }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests publish retry status updates", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests publish retry status updates", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() const bus = yield* Bus.Service - yield* test.push( - fail( - new APICallError({ - message: "boom", - url: "https://example.com/v1/chat/completions", - requestBodyValues: {}, - statusCode: 503, - responseHeaders: { "retry-after-ms": "0" }, - responseBody: '{"error":"boom"}', - isRetryable: true, - }), - start(), - ), - ) - yield* test.reply(start(), finishStep(), finish()) + yield* llm.error(503, { error: "boom" }) + yield* llm.text("") const chat = yield* session.create({}) const parent = yield* user(chat.id, "retry") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const states: number[] = [] const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { if (evt.properties.sessionID !== chat.id) return @@ -604,27 +494,25 @@ it.live("session.processor effect tests publish retry status updates", () => { off() expect(value).toBe("continue") - expect(yield* test.calls).toBe(2) + expect(yield* llm.calls).toBe(2) expect(states).toStrictEqual([1]) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests compact on structured context overflow", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests compact on structured context overflow", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.push(fail({ type: "error", error: { code: "context_length_exceeded" } }, start())) + yield* llm.error(400, { type: "error", error: { code: "context_length_exceeded" } }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "compact json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -649,32 +537,25 @@ it.live("session.processor effect tests compact on structured context overflow", }) expect(value).toBe("compact") - expect(yield* test.calls).toBe(1) + expect(yield* llm.calls).toBe(1) expect(handle.message.error).toBeUndefined() }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const ready = defer() - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.push((input) => - hang(input, start(), toolInputStart("tool-1", "bash"), toolCall("tool-1", "bash", { cmd: "pwd" })).pipe( - Stream.tap((event) => (event.type === "tool-call" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + yield* llm.toolHang("bash", { cmd: "pwd" }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "tool abort") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -700,7 +581,15 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup }) .pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + yield* llm.wait(1) + yield* Effect.promise(async () => { + const end = Date.now() + 500 + while (Date.now() < end) { + const parts = await MessageV2.parts(msg.id) + if (parts.some((part) => part.type === "tool")) return + await Bun.sleep(10) + } + }) yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) @@ -708,45 +597,38 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup yield* handle.abort() } const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) - const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") + const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) { expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true) } - expect(yield* test.calls).toBe(1) - expect(tool?.state.status).toBe("error") - if (tool?.state.status === "error") { - expect(tool.state.error).toBe("Tool execution aborted") - expect(tool.state.time.end).toBeDefined() + expect(yield* llm.calls).toBe(1) + expect(call?.state.status).toBe("error") + if (call?.state.status === "error") { + expect(call.state.error).toBe("Tool execution aborted") + expect(call.state.time.end).toBeDefined() } }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests record aborted errors and idle state", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests record aborted errors and idle state", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const ready = defer() const seen = defer() - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() const bus = yield* Bus.Service - const status = yield* SessionStatus.Service + const sts = yield* SessionStatus.Service - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + yield* llm.hang const chat = yield* session.create({}) const parent = yield* user(chat.id, "abort") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const errs: string[] = [] const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => { if (evt.properties.sessionID !== chat.id) return @@ -779,7 +661,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( }) .pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + yield* llm.wait(1) yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) @@ -788,7 +670,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( } yield* Effect.promise(() => seen.promise) const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) - const state = yield* status.get(chat.id) + const state = yield* sts.get(chat.id) off() expect(Exit.isFailure(exit)).toBe(true) @@ -803,30 +685,23 @@ it.live("session.processor effect tests record aborted errors and idle state", ( expect(state).toMatchObject({ type: "idle" }) expect(errs).toContain("MessageAbortedError") }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests mark interruptions aborted without manual abort", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests mark interruptions aborted without manual abort", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const ready = defer() - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service - const status = yield* SessionStatus.Service - const test = yield* TestLLM + const { processors, session, provider } = yield* boot() + const sts = yield* SessionStatus.Service - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + yield* llm.hang const chat = yield* session.create({}) const parent = yield* user(chat.id, "interrupt") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -852,12 +727,12 @@ it.live("session.processor effect tests mark interruptions aborted without manua }) .pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + yield* llm.wait(1) yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) - const state = yield* status.get(chat.id) + const state = yield* sts.get(chat.id) expect(Exit.isFailure(exit)).toBe(true) expect(handle.message.error?.name).toBe("MessageAbortedError") @@ -867,6 +742,6 @@ it.live("session.processor effect tests mark interruptions aborted without manua } expect(state).toMatchObject({ type: "idle" }) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +)