From 8d2385ad4904eba4e141429f14f8ee051f48c196 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 31 Mar 2026 20:36:18 -0400 Subject: [PATCH] test: finish HTTP mock processor coverage --- packages/opencode/test/lib/llm-server.ts | 502 +++++++++++------- .../test/session/processor-effect.test.ts | 264 +++------ 2 files changed, 371 insertions(+), 395 deletions(-) diff --git a/packages/opencode/test/lib/llm-server.ts b/packages/opencode/test/lib/llm-server.ts index 032a2fbebe..8e7365d97f 100644 --- a/packages/opencode/test/lib/llm-server.ts +++ b/packages/opencode/test/lib/llm-server.ts @@ -1,39 +1,12 @@ -import { NodeHttpServer } from "@effect/platform-node" +import { NodeHttpServer, NodeHttpServerRequest } from "@effect/platform-node" import * as Http from "node:http" import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect" import * as HttpServer from "effect/unstable/http/HttpServer" import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" -type Usage = { input: number; output: number } +export type Usage = { input: number; output: number } -type Step = - | { - type: "text" - text: string - usage?: Usage - } - | { - type: "tool" - tool: string - input: unknown - } - | { - type: "fail" - message: string - } - | { - type: "httpError" - status: number - body: unknown - } - | { - type: "hang" - } - | { - type: "hold" - text: string - wait: PromiseLike - } +type Line = Record type Hit = { url: URL @@ -45,163 +18,293 @@ type Wait = { ready: Deferred.Deferred } -function sse(lines: unknown[]) { - return HttpServerResponse.stream( - Stream.fromIterable([ - [...lines.map((line) => `data: ${JSON.stringify(line)}`), "data: [DONE]"].join("\n\n") + "\n\n", - ]).pipe(Stream.encodeText), - { contentType: "text/event-stream" }, - ) +type Sse = { + type: "sse" + head: unknown[] + tail: unknown[] + wait?: PromiseLike + hang?: boolean + error?: unknown + reset?: boolean } -function text(step: Extract) { - const finish: Record = { +type HttpError = { + type: "http-error" + status: number + body: unknown +} + +export type Item = Sse | HttpError + +const done = Symbol("done") + +function line(input: unknown) { + if (input === done) return "data: [DONE]\n\n" + return `data: ${JSON.stringify(input)}\n\n` +} + +function tokens(input?: Usage) { + if (!input) return + return { + prompt_tokens: input.input, + completion_tokens: input.output, + total_tokens: input.input + input.output, + } +} + +function chunk(input: { delta?: Record; finish?: string; usage?: Usage }) { + return { id: "chatcmpl-test", object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "stop" }], - } - if (step.usage) { - finish.usage = { - prompt_tokens: step.usage.input, - completion_tokens: step.usage.output, - total_tokens: step.usage.input + step.usage.output, - } - } - return sse([ - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { role: "assistant" } }], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { content: step.text } }], - }, - finish, - ]) + choices: [ + { + delta: input.delta ?? {}, + ...(input.finish ? { finish_reason: input.finish } : {}), + }, + ], + ...(input.usage ? { usage: tokens(input.usage) } : {}), + } satisfies Line } -function httpError(step: Extract) { - return HttpServerResponse.text(JSON.stringify(step.body), { - status: step.status, +function role() { + return chunk({ delta: { role: "assistant" } }) +} + +function textLine(value: string) { + return chunk({ delta: { content: value } }) +} + +function reasonLine(value: string) { + return chunk({ delta: { reasoning_content: value } }) +} + +function finishLine(reason: string, usage?: Usage) { + return chunk({ finish: reason, usage }) +} + +function toolStartLine(id: string, name: string) { + return chunk({ + delta: { + tool_calls: [ + { + index: 0, + id, + type: "function", + function: { + name, + arguments: "", + }, + }, + ], + }, + }) +} + +function toolArgsLine(value: string) { + return chunk({ + delta: { + tool_calls: [ + { + index: 0, + function: { + arguments: value, + }, + }, + ], + }, + }) +} + +function bytes(input: Iterable) { + return Stream.fromIterable([...input].map(line)).pipe(Stream.encodeText) +} + +function send(item: Sse) { + const head = bytes(item.head) + const tail = bytes([...item.tail, ...(item.hang || item.error ? [] : [done])]) + const empty = Stream.fromIterable([]) + const wait = item.wait + const body: Stream.Stream = wait + ? Stream.concat(head, Stream.fromEffect(Effect.promise(() => wait)).pipe(Stream.flatMap(() => tail))) + : Stream.concat(head, tail) + let end: Stream.Stream = empty + if (item.error) end = Stream.concat(empty, Stream.fail(item.error)) + else if (item.hang) end = Stream.concat(empty, Stream.never) + + return HttpServerResponse.stream(Stream.concat(body, end), { contentType: "text/event-stream" }) +} + +const reset = Effect.fn("TestLLMServer.reset")(function* (item: Sse) { + const req = yield* HttpServerRequest.HttpServerRequest + const res = NodeHttpServerRequest.toServerResponse(req) + yield* Effect.sync(() => { + res.writeHead(200, { "content-type": "text/event-stream" }) + for (const part of item.head) res.write(line(part)) + for (const part of item.tail) res.write(line(part)) + res.destroy(new Error("connection reset")) + }) + yield* Effect.never +}) + +function fail(item: HttpError) { + return HttpServerResponse.text(JSON.stringify(item.body), { + status: item.status, contentType: "application/json", }) } -function tool(step: Extract, seq: number) { - const id = `call_${seq}` - const args = JSON.stringify(step.input) - return sse([ - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { role: "assistant" } }], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [ - { - delta: { - tool_calls: [ - { - index: 0, - id, - type: "function", - function: { - name: step.tool, - arguments: "", - }, - }, - ], - }, - }, - ], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [ - { - delta: { - tool_calls: [ - { - index: 0, - function: { - arguments: args, - }, - }, - ], - }, - }, - ], - }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "tool_calls" }], - }, - ]) +export class Reply { + #head: unknown[] = [role()] + #tail: unknown[] = [] + #usage: Usage | undefined + #finish: string | undefined + #wait: PromiseLike | undefined + #hang = false + #error: unknown + #reset = false + #seq = 0 + + #id() { + this.#seq += 1 + return `call_${this.#seq}` + } + + text(value: string) { + this.#tail = [...this.#tail, textLine(value)] + return this + } + + reason(value: string) { + this.#tail = [...this.#tail, reasonLine(value)] + return this + } + + usage(value: Usage) { + this.#usage = value + return this + } + + wait(value: PromiseLike) { + this.#wait = value + return this + } + + stop() { + this.#finish = "stop" + this.#hang = false + this.#error = undefined + this.#reset = false + return this + } + + toolCalls() { + this.#finish = "tool_calls" + this.#hang = false + this.#error = undefined + this.#reset = false + return this + } + + tool(name: string, input: unknown) { + const id = this.#id() + const args = JSON.stringify(input) + this.#tail = [...this.#tail, toolStartLine(id, name), toolArgsLine(args)] + return this.toolCalls() + } + + pendingTool(name: string, input: unknown) { + const id = this.#id() + const args = JSON.stringify(input) + const size = Math.max(1, Math.floor(args.length / 2)) + this.#tail = [...this.#tail, toolStartLine(id, name), toolArgsLine(args.slice(0, size))] + return this + } + + hang() { + this.#finish = undefined + this.#hang = true + this.#error = undefined + this.#reset = false + return this + } + + streamError(error: unknown = "boom") { + this.#finish = undefined + this.#hang = false + this.#error = error + this.#reset = false + return this + } + + reset() { + this.#finish = undefined + this.#hang = false + this.#error = undefined + this.#reset = true + return this + } + + item(): Item { + return { + type: "sse", + head: this.#head, + tail: this.#finish ? [...this.#tail, finishLine(this.#finish, this.#usage)] : this.#tail, + wait: this.#wait, + hang: this.#hang, + error: this.#error, + reset: this.#reset, + } + } } -function fail(step: Extract) { - return HttpServerResponse.stream( - Stream.fromIterable([ - 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', - ]).pipe(Stream.encodeText, Stream.concat(Stream.fail(new Error(step.message)))), - { contentType: "text/event-stream" }, - ) +export function reply() { + return new Reply() } -function hang() { - return HttpServerResponse.stream( - Stream.fromIterable([ - 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', - ]).pipe(Stream.encodeText, Stream.concat(Stream.never)), - { contentType: "text/event-stream" }, - ) +export function httpError(status: number, body: unknown): Item { + return { + type: "http-error", + status, + body, + } } -function hold(step: Extract) { - return HttpServerResponse.stream( - Stream.fromIterable([ - 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', - ]).pipe( - Stream.encodeText, - Stream.concat( - Stream.fromEffect(Effect.promise(() => step.wait)).pipe( - Stream.flatMap(() => - Stream.fromIterable([ - `data: ${JSON.stringify({ - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: { content: step.text } }], - })}\n\n`, - `data: ${JSON.stringify({ - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "stop" }], - })}\n\n`, - "data: [DONE]\n\n", - ]).pipe(Stream.encodeText), - ), - ), - ), - ), - { contentType: "text/event-stream" }, - ) +export function raw(input: { + chunks?: unknown[] + head?: unknown[] + tail?: unknown[] + wait?: PromiseLike + hang?: boolean + error?: unknown + reset?: boolean +}): Item { + return { + type: "sse", + head: input.head ?? input.chunks ?? [], + tail: input.tail ?? [], + wait: input.wait, + hang: input.hang, + error: input.error, + reset: input.reset, + } +} + +function item(input: Item | Reply) { + return input instanceof Reply ? input.item() : input } namespace TestLLMServer { export interface Service { readonly url: string + readonly push: (...input: (Item | Reply)[]) => Effect.Effect readonly text: (value: string, opts?: { usage?: Usage }) => Effect.Effect - readonly tool: (tool: string, input: unknown) => Effect.Effect - readonly fail: (message?: string) => Effect.Effect + readonly tool: (name: string, input: unknown) => Effect.Effect + readonly toolHang: (name: string, input: unknown) => Effect.Effect + readonly reason: (value: string, opts?: { text?: string; usage?: Usage }) => Effect.Effect + readonly fail: (message?: unknown) => Effect.Effect readonly error: (status: number, body: unknown) => Effect.Effect readonly hang: Effect.Effect - readonly hold: (text: string, wait: PromiseLike) => Effect.Effect + readonly hold: (value: string, wait: PromiseLike) => Effect.Effect readonly hits: Effect.Effect readonly calls: Effect.Effect readonly wait: (count: number) => Effect.Effect @@ -218,12 +321,11 @@ export class TestLLMServer extends ServiceMap.Service { - list = [...list, step] + const queue = (...input: (Item | Reply)[]) => { + list = [...list, ...input.map(item)] } const notify = Effect.fnUntraced(function* () { @@ -234,11 +336,10 @@ export class TestLLMServer extends ServiceMap.Service { - const step = list[0] - if (!step) return { step: undefined, seq } - seq += 1 + const first = list[0] + if (!first) return list = list.slice(1) - return { step, seq } + return first } yield* router.add( @@ -247,22 +348,22 @@ export class TestLLMServer extends ServiceMap.Service ({}))) + if (!next) return HttpServerResponse.text("unexpected request", { status: 500 }) + const body = yield* req.json.pipe(Effect.orElseSucceed(() => ({}))) hits = [ ...hits, { url: new URL(req.originalUrl, "http://localhost"), - body: json && typeof json === "object" ? (json as Record) : {}, + body: body && typeof body === "object" ? (body as Record) : {}, }, ] yield* notify() - if (next.step.type === "text") return text(next.step) - if (next.step.type === "tool") return tool(next.step, next.seq) - if (next.step.type === "fail") return fail(next.step) - if (next.step.type === "httpError") return httpError(next.step) - if (next.step.type === "hang") return hang() - return hold(next.step) + if (next.type === "sse" && next.reset) { + yield* reset(next) + return HttpServerResponse.empty() + } + if (next.type === "sse") return send(next) + return fail(next) }), ) @@ -273,23 +374,37 @@ export class TestLLMServer extends ServiceMap.Service) { - push({ type: "hold", text, wait }) + hold: Effect.fn("TestLLMServer.hold")(function* (value: string, wait: PromiseLike) { + queue(reply().wait(wait).text(value).stop().item()) }), hits: Effect.sync(() => [...hits]), calls: Effect.sync(() => hits.length), @@ -303,8 +418,5 @@ export class TestLLMServer extends ServiceMap.Service list.length), }) }), - ).pipe( - Layer.provide(HttpRouter.layer), // - Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 })), - ) + ).pipe(Layer.provide(HttpRouter.layer), Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 }))) } diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 64e7e23745..1dd8b7edc9 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,7 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" -import { APICallError, jsonSchema, tool } from "ai" -import { Cause, Effect, Exit, Fiber, Layer, ServiceMap, Stream } from "effect" +import { Cause, Effect, Exit, Fiber, Layer } from "effect" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -20,19 +19,12 @@ import { SessionStatus } from "../../src/session/status" import { Snapshot } from "../../src/snapshot" import { Log } from "../../src/util/log" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" +import { provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" -import { TestLLMServer } from "../lib/llm-server" +import { reply, TestLLMServer } from "../lib/llm-server" Log.init({ print: false }) -const DEBUG = process.env.OPENCODE_TEST_DEBUG === "1" - -function trace(label: string, value: unknown) { - if (!DEBUG) return - console.log(label, JSON.stringify(value, null, 2)) -} - const ref = { providerID: ProviderID.make("test"), modelID: ModelID.make("test-model"), @@ -168,109 +160,6 @@ const env = Layer.mergeAll(TestLLMServer.layer, SessionProcessor.layer.pipe(Laye const it = testEffect(env) -// --------------------------------------------------------------------------- -// TestLLM kept only for the reasoning test -// TODO: reasoning events not available via OpenAI-compatible SSE -// --------------------------------------------------------------------------- -type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) - -class TestLLM extends ServiceMap.Service< - TestLLM, - { - readonly push: (stream: Script) => Effect.Effect - readonly reply: (...items: LLM.Event[]) => Effect.Effect - readonly calls: Effect.Effect - readonly inputs: Effect.Effect - } ->()("@test/SessionProcessorLLM") {} - -function reasoningUsage(input = 1, output = 1, total = input + output) { - return { - inputTokens: input, - outputTokens: output, - totalTokens: total, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - } -} - -const reasoningLlm = Layer.unwrap( - Effect.gen(function* () { - const queue: Script[] = [] - const inputs: LLM.StreamInput[] = [] - let calls = 0 - - const push = Effect.fn("TestLLM.push")((item: Script) => { - queue.push(item) - return Effect.void - }) - - const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(Stream.make(...items))) - return Layer.mergeAll( - Layer.succeed( - LLM.Service, - LLM.Service.of({ - stream: (input) => { - calls += 1 - inputs.push(input) - const item = queue.shift() ?? Stream.empty - return typeof item === "function" ? item(input) : item - }, - }), - ), - Layer.succeed( - TestLLM, - TestLLM.of({ - push, - reply, - calls: Effect.sync(() => calls), - inputs: Effect.sync(() => [...inputs]), - }), - ), - ) - }), -) - -const reasoningDeps = Layer.mergeAll( - Session.defaultLayer, - Snapshot.defaultLayer, - AgentSvc.defaultLayer, - Permission.layer, - Plugin.defaultLayer, - Config.defaultLayer, - status, - reasoningLlm, -).pipe(Layer.provideMerge(infra)) -const reasoningEnv = SessionProcessor.layer.pipe(Layer.provideMerge(reasoningDeps)) -const reasoningIt = testEffect(reasoningEnv) - -function reasoningModel(context: number): Provider.Model { - return { - id: "test-model", - providerID: "test", - name: "Test", - limit: { context, output: 10 }, - cost: { input: 0, output: 0, cache: { read: 0, write: 0 } }, - capabilities: { - toolcall: true, - attachment: false, - reasoning: false, - temperature: true, - input: { text: true, image: false, audio: false, video: false }, - output: { text: true, image: false, audio: false, video: false }, - }, - api: { npm: "@ai-sdk/anthropic" }, - options: {}, - } as Provider.Model -} - const boot = Effect.fn("test.boot")(function* () { const processors = yield* SessionProcessor.Service const session = yield* Session.Service @@ -367,12 +256,6 @@ it.live("session.processor effect tests stop after token overflow requests compa const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) - trace("overflow", { - value, - parts: parts.map((part) => part.type), - inputs: yield* llm.inputs, - }) - expect(value).toBe("compact") expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) expect(parts.some((part) => part.type === "step-finish")).toBe(true) @@ -381,57 +264,66 @@ it.live("session.processor effect tests stop after token overflow requests compa ), ) -// TODO: reasoning events not available via OpenAI-compatible SSE -reasoningIt.live("session.processor effect tests reset reasoning state across retries", () => - provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests capture reasoning from http mock", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM - const processors = yield* SessionProcessor.Service - const session = yield* Session.Service + const { processors, session, provider } = yield* boot() - yield* test.push( - Stream.fromIterable([ - { type: "start" }, - { type: "reasoning-start", id: "r" }, - { type: "reasoning-delta", id: "r", text: "one" }, - ]).pipe( - Stream.concat( - Stream.fail( - new APICallError({ - message: "boom", - url: "https://example.com/v1/chat/completions", - requestBodyValues: {}, - statusCode: 503, - responseHeaders: { "retry-after-ms": "0" }, - responseBody: '{"error":"boom"}', - isRetryable: true, - }), - ), - ), - ), - ) - - yield* test.reply( - { type: "start" }, - { type: "reasoning-start", id: "r" }, - { type: "reasoning-delta", id: "r", text: "two" }, - { type: "reasoning-end", id: "r" }, - { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: reasoningUsage(), - }, - { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: reasoningUsage() }, - ) + yield* llm.push(reply().reason("think").text("done").stop()) const chat = yield* session.create({}) const parent = yield* user(chat.id, "reason") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = reasoningModel(100) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "reason" }], + tools: {}, + }) + + const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning") + const text = parts.find((part): part is MessageV2.TextPart => part.type === "text") + + expect(value).toBe("continue") + expect(yield* llm.calls).toBe(1) + expect(reasoning?.text).toBe("think") + expect(text?.text).toBe("done") + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + +it.live("session.processor effect tests reset reasoning state across retries", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + yield* llm.push(reply().reason("one").reset(), reply().reason("two").stop()) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "reason") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -459,11 +351,11 @@ reasoningIt.live("session.processor effect tests reset reasoning state across re const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning") expect(value).toBe("continue") - expect(yield* test.calls).toBe(2) + expect(yield* llm.calls).toBe(2) expect(reasoning.some((part) => part.text === "two")).toBe(true) expect(reasoning.some((part) => part.text === "onetwo")).toBe(false) }), - { git: true }, + { git: true, config: (url) => providerCfg(url) }, ), ) @@ -502,13 +394,6 @@ it.live("session.processor effect tests do not retry unknown json errors", () => tools: {}, }) - trace("unknown-error", { - value, - calls: yield* llm.calls, - inputs: yield* llm.inputs, - error: handle.message.error, - }) - expect(value).toBe("stop") expect(yield* llm.calls).toBe(1) expect(handle.message.error?.name).toBe("APIError") @@ -664,9 +549,8 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup ({ dir, llm }) => Effect.gen(function* () { const { processors, session, provider } = yield* boot() - const wait = new Promise<{ output: string }>(() => {}) - yield* llm.tool("bash", { cmd: "pwd" }) + yield* llm.toolHang("bash", { cmd: "pwd" }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "tool abort") @@ -693,26 +577,16 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup agent: agent(), system: [], messages: [{ role: "user", content: "tool abort" }], - tools: { - bash: tool({ - description: "Run shell commands", - inputSchema: jsonSchema({ - type: "object", - properties: { - cmd: { type: "string" }, - }, - required: ["cmd"], - }), - execute: async () => wait, - }), - }, + tools: {}, }) .pipe(Effect.forkChild) yield* llm.wait(1) yield* Effect.promise(async () => { const end = Date.now() + 500 - while (!handle.partFromToolCall("call_1") && Date.now() < end) { + while (Date.now() < end) { + const parts = await MessageV2.parts(msg.id) + if (parts.some((part) => part.type === "tool")) return await Bun.sleep(10) } }) @@ -725,16 +599,6 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") - trace("tool-abort", { - calls: yield* llm.calls, - inputs: yield* llm.inputs, - pending: handle.partFromToolCall("call_1"), - parts: parts.map((part) => ({ - type: part.type, - ...(part.type === "tool" ? { state: part.state.status } : {}), - })), - }) - expect(Exit.isFailure(exit)).toBe(true) if (Exit.isFailure(exit)) { expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)