diff --git a/packages/opencode/test/lib/llm-server.ts b/packages/opencode/test/lib/llm-server.ts index b0a54424ef..032a2fbebe 100644 --- a/packages/opencode/test/lib/llm-server.ts +++ b/packages/opencode/test/lib/llm-server.ts @@ -4,10 +4,13 @@ import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect" import * as HttpServer from "effect/unstable/http/HttpServer" import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" +type Usage = { input: number; output: number } + type Step = | { type: "text" text: string + usage?: Usage } | { type: "tool" @@ -18,6 +21,11 @@ type Step = type: "fail" message: string } + | { + type: "httpError" + status: number + body: unknown + } | { type: "hang" } @@ -47,6 +55,18 @@ function sse(lines: unknown[]) { } function text(step: Extract) { + const finish: Record = { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + } + if (step.usage) { + finish.usage = { + prompt_tokens: step.usage.input, + completion_tokens: step.usage.output, + total_tokens: step.usage.input + step.usage.output, + } + } return sse([ { id: "chatcmpl-test", @@ -58,14 +78,17 @@ function text(step: Extract) { object: "chat.completion.chunk", choices: [{ delta: { content: step.text } }], }, - { - id: "chatcmpl-test", - object: "chat.completion.chunk", - choices: [{ delta: {}, finish_reason: "stop" }], - }, + finish, ]) } +function httpError(step: Extract) { + return HttpServerResponse.text(JSON.stringify(step.body), { + status: step.status, + contentType: "application/json", + }) +} + function tool(step: Extract, seq: number) { const id = `call_${seq}` const args = JSON.stringify(step.input) @@ -173,9 +196,10 @@ function hold(step: Extract) { namespace TestLLMServer { export interface Service { readonly url: string - readonly text: (value: string) => Effect.Effect + readonly text: (value: string, opts?: { usage?: Usage }) => Effect.Effect readonly tool: (tool: string, input: unknown) => Effect.Effect readonly fail: (message?: string) => Effect.Effect + readonly error: (status: number, body: unknown) => Effect.Effect readonly hang: Effect.Effect readonly hold: (text: string, wait: PromiseLike) => Effect.Effect readonly hits: Effect.Effect @@ -236,6 +260,7 @@ export class TestLLMServer extends ServiceMap.Service) { push({ type: "hold", text, wait }) }), diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 23c6911a2c..b2336f226d 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,8 +1,7 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" import { APICallError } from "ai" -import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect" -import * as Stream from "effect/Stream" +import { Cause, Effect, Exit, Fiber, Layer, ServiceMap, Stream } from "effect" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -10,7 +9,7 @@ import { Bus } from "../../src/bus" import { Config } from "../../src/config/config" import { Permission } from "../../src/permission" import { Plugin } from "../../src/plugin" -import type { Provider } from "../../src/provider/provider" +import { Provider } from "../../src/provider/provider" import { ModelID, ProviderID } from "../../src/provider/schema" import { Session } from "../../src/session" import { LLM } from "../../src/session/llm" @@ -21,8 +20,9 @@ import { SessionStatus } from "../../src/session/status" import { Snapshot } from "../../src/snapshot" import { Log } from "../../src/util/log" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { provideTmpdirInstance } from "../fixture/fixture" +import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" +import { TestLLMServer } from "../lib/llm-server" Log.init({ print: false }) @@ -31,118 +31,51 @@ const ref = { modelID: ModelID.make("test-model"), } -type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) - -class TestLLM extends ServiceMap.Service< - TestLLM, - { - readonly push: (stream: Script) => Effect.Effect - readonly reply: (...items: LLM.Event[]) => Effect.Effect - readonly calls: Effect.Effect - readonly inputs: Effect.Effect - } ->()("@test/SessionProcessorLLM") {} - -function stream(...items: LLM.Event[]) { - return Stream.make(...items) +const cfg = { + provider: { + test: { + name: "Test", + id: "test", + env: [], + npm: "@ai-sdk/openai-compatible", + models: { + "test-model": { + id: "test-model", + name: "Test Model", + attachment: false, + reasoning: false, + temperature: false, + tool_call: true, + release_date: "2025-01-01", + limit: { context: 100000, output: 10000 }, + cost: { input: 0, output: 0 }, + options: {}, + }, + }, + options: { + apiKey: "test-key", + baseURL: "http://localhost:1/v1", + }, + }, + }, } -function usage(input = 1, output = 1, total = input + output) { +function providerCfg(url: string) { return { - inputTokens: input, - outputTokens: output, - totalTokens: total, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, + ...cfg, + provider: { + ...cfg.provider, + test: { + ...cfg.provider.test, + options: { + ...cfg.provider.test.options, + baseURL: url, + }, + }, }, } } -function start(): LLM.Event { - return { type: "start" } -} - -function textStart(id = "t"): LLM.Event { - return { type: "text-start", id } -} - -function textDelta(id: string, text: string): LLM.Event { - return { type: "text-delta", id, text } -} - -function textEnd(id = "t"): LLM.Event { - return { type: "text-end", id } -} - -function reasoningStart(id: string): LLM.Event { - return { type: "reasoning-start", id } -} - -function reasoningDelta(id: string, text: string): LLM.Event { - return { type: "reasoning-delta", id, text } -} - -function reasoningEnd(id: string): LLM.Event { - return { type: "reasoning-end", id } -} - -function finishStep(): LLM.Event { - return { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - } -} - -function finish(): LLM.Event { - return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() } -} - -function toolInputStart(id: string, toolName: string): LLM.Event { - return { type: "tool-input-start", id, toolName } -} - -function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event { - return { type: "tool-call", toolCallId, toolName, input } -} - -function fail(err: E, ...items: LLM.Event[]) { - return stream(...items).pipe(Stream.concat(Stream.fail(err))) -} - -function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) { - return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never))) -} - -function model(context: number): Provider.Model { - return { - id: "test-model", - providerID: "test", - name: "Test", - limit: { context, output: 10 }, - cost: { input: 0, output: 0, cache: { read: 0, write: 0 } }, - capabilities: { - toolcall: true, - attachment: false, - reasoning: false, - temperature: true, - input: { text: true, image: false, audio: false, video: false }, - output: { text: true, image: false, audio: false, video: false }, - }, - api: { npm: "@ai-sdk/anthropic" }, - options: {}, - } as Provider.Model -} - function agent(): Agent.Info { return { name: "build", @@ -211,7 +144,59 @@ const assistant = Effect.fn("TestSession.assistant")(function* ( return msg }) -const llm = Layer.unwrap( +const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) +const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) +const deps = Layer.mergeAll( + Session.defaultLayer, + Snapshot.defaultLayer, + AgentSvc.defaultLayer, + Permission.layer, + Plugin.defaultLayer, + Config.defaultLayer, + LLM.defaultLayer, + status, +).pipe(Layer.provideMerge(infra)) +const env = Layer.mergeAll( + TestLLMServer.layer, + SessionProcessor.layer.pipe(Layer.provideMerge(deps)), +) + +const it = testEffect(env) + +// --------------------------------------------------------------------------- +// TestLLM kept only for the reasoning test +// TODO: reasoning events not available via OpenAI-compatible SSE +// --------------------------------------------------------------------------- +type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) + +class TestLLM extends ServiceMap.Service< + TestLLM, + { + readonly push: (stream: Script) => Effect.Effect + readonly reply: (...items: LLM.Event[]) => Effect.Effect + readonly calls: Effect.Effect + readonly inputs: Effect.Effect + } +>()("@test/SessionProcessorLLM") {} + +function reasoningUsage(input = 1, output = 1, total = input + output) { + return { + inputTokens: input, + outputTokens: output, + totalTokens: total, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + } +} + +const reasoningLlm = Layer.unwrap( Effect.gen(function* () { const queue: Script[] = [] const inputs: LLM.StreamInput[] = [] @@ -222,7 +207,7 @@ const llm = Layer.unwrap( return Effect.void }) - const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items))) + const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(Stream.make(...items))) return Layer.mergeAll( Layer.succeed( LLM.Service, @@ -248,9 +233,7 @@ const llm = Layer.unwrap( }), ) -const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) -const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) -const deps = Layer.mergeAll( +const reasoningDeps = Layer.mergeAll( Session.defaultLayer, Snapshot.defaultLayer, AgentSvc.defaultLayer, @@ -258,26 +241,48 @@ const deps = Layer.mergeAll( Plugin.defaultLayer, Config.defaultLayer, status, - llm, + reasoningLlm, ).pipe(Layer.provideMerge(infra)) -const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) +const reasoningEnv = SessionProcessor.layer.pipe(Layer.provideMerge(reasoningDeps)) +const reasoningIt = testEffect(reasoningEnv) -const it = testEffect(env) +function reasoningModel(context: number): Provider.Model { + return { + id: "test-model", + providerID: "test", + name: "Test", + limit: { context, output: 10 }, + cost: { input: 0, output: 0, cache: { read: 0, write: 0 } }, + capabilities: { + toolcall: true, + attachment: false, + reasoning: false, + temperature: true, + input: { text: true, image: false, audio: false, video: false }, + output: { text: true, image: false, audio: false, video: false }, + }, + api: { npm: "@ai-sdk/anthropic" }, + options: {}, + } as Provider.Model +} -it.live("session.processor effect tests capture llm input cleanly", () => { - return provideTmpdirInstance( - (dir) => +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +it.live("session.processor effect tests capture llm input cleanly", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service - yield* test.reply(start(), textStart(), textDelta("t", "hello"), textEnd(), finishStep(), finish()) + yield* llm.text("hello") const chat = yield* session.create({}) const parent = yield* user(chat.id, "hi") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -303,46 +308,30 @@ it.live("session.processor effect tests capture llm input cleanly", () => { const value = yield* handle.process(input) const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) - const calls = yield* test.calls - const inputs = yield* test.inputs + const calls = yield* llm.calls expect(value).toBe("continue") expect(calls).toBe(1) - expect(inputs).toHaveLength(1) - expect(inputs[0].messages).toStrictEqual([{ role: "user", content: "hi" }]) expect(parts.some((part) => part.type === "text" && part.text === "hello")).toBe(true) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests stop after token overflow requests compaction", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests stop after token overflow requests compaction", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service - yield* test.reply( - start(), - { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(100, 0, 100), - }, - textStart(), - textDelta("t", "after"), - textEnd(), - ) + yield* llm.text("after", { usage: { input: 100, output: 0 } }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "compact") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(20) + const base = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) + const mdl = { ...base, limit: { context: 20, output: 10 } } const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -372,12 +361,13 @@ it.live("session.processor effect tests stop after token overflow requests compa expect(parts.some((part) => part.type === "text")).toBe(false) expect(parts.some((part) => part.type === "step-finish")).toBe(true) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests reset reasoning state across retries", () => { - return provideTmpdirInstance( +// TODO: reasoning events not available via OpenAI-compatible SSE +reasoningIt.live("session.processor effect tests reset reasoning state across retries", () => + provideTmpdirInstance( (dir) => Effect.gen(function* () { const test = yield* TestLLM @@ -385,35 +375,47 @@ it.live("session.processor effect tests reset reasoning state across retries", ( const session = yield* Session.Service yield* test.push( - fail( - new APICallError({ - message: "boom", - url: "https://example.com/v1/chat/completions", - requestBodyValues: {}, - statusCode: 503, - responseHeaders: { "retry-after-ms": "0" }, - responseBody: '{"error":"boom"}', - isRetryable: true, - }), - start(), - reasoningStart("r"), - reasoningDelta("r", "one"), + Stream.make( + { type: "start" }, + { type: "reasoning-start", id: "r" }, + { type: "reasoning-delta", id: "r", text: "one" }, + ).pipe( + Stream.concat( + Stream.fail( + new APICallError({ + message: "boom", + url: "https://example.com/v1/chat/completions", + requestBodyValues: {}, + statusCode: 503, + responseHeaders: { "retry-after-ms": "0" }, + responseBody: '{"error":"boom"}', + isRetryable: true, + }), + ), + ), ), ) yield* test.reply( - start(), - reasoningStart("r"), - reasoningDelta("r", "two"), - reasoningEnd("r"), - finishStep(), - finish(), + { type: "start" }, + { type: "reasoning-start", id: "r" }, + { type: "reasoning-delta", id: "r", text: "two" }, + { type: "reasoning-end", id: "r" }, + { + type: "finish-step", + finishReason: "stop", + rawFinishReason: "stop", + response: { id: "res", modelId: "test-model", timestamp: new Date() }, + providerMetadata: undefined, + usage: reasoningUsage(), + }, + { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: reasoningUsage() }, ) const chat = yield* session.create({}) const parent = yield* user(chat.id, "reason") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = reasoningModel(100) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -438,7 +440,9 @@ it.live("session.processor effect tests reset reasoning state across retries", ( }) const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) - const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning") + const reasoning = parts.filter( + (part): part is MessageV2.ReasoningPart => part.type === "reasoning", + ) expect(value).toBe("continue") expect(yield* test.calls).toBe(2) @@ -446,23 +450,22 @@ it.live("session.processor effect tests reset reasoning state across retries", ( expect(reasoning.some((part) => part.text === "onetwo")).toBe(false) }), { git: true }, - ) -}) + ), +) -it.live("session.processor effect tests do not retry unknown json errors", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests do not retry unknown json errors", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service - yield* test.push(fail({ error: { message: "no_kv_space" } }, start())) + yield* llm.error(500, { error: { message: "no_kv_space" } }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -487,29 +490,27 @@ it.live("session.processor effect tests do not retry unknown json errors", () => }) expect(value).toBe("stop") - expect(yield* test.calls).toBe(1) - expect(yield* test.inputs).toHaveLength(1) + expect(yield* llm.calls).toBe(1) expect(handle.message.error?.name).toBe("UnknownError") }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests retry recognized structured json errors", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests retry recognized structured json errors", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service - yield* test.push(fail({ type: "error", error: { type: "too_many_requests" } }, start())) - yield* test.reply(start(), textStart(), textDelta("t", "after"), textEnd(), finishStep(), finish()) + yield* llm.error(429, { type: "error", error: { type: "too_many_requests" } }) + yield* llm.text("after") const chat = yield* session.create({}) const parent = yield* user(chat.id, "retry json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -536,43 +537,29 @@ it.live("session.processor effect tests retry recognized structured json errors" const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) expect(value).toBe("continue") - expect(yield* test.calls).toBe(2) + expect(yield* llm.calls).toBe(2) expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) expect(handle.message.error).toBeUndefined() }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests publish retry status updates", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests publish retry status updates", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service const bus = yield* Bus.Service - yield* test.push( - fail( - new APICallError({ - message: "boom", - url: "https://example.com/v1/chat/completions", - requestBodyValues: {}, - statusCode: 503, - responseHeaders: { "retry-after-ms": "0" }, - responseBody: '{"error":"boom"}', - isRetryable: true, - }), - start(), - ), - ) - yield* test.reply(start(), finishStep(), finish()) + yield* llm.error(503, { error: "boom" }) + yield* llm.text("") const chat = yield* session.create({}) const parent = yield* user(chat.id, "retry") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const states: number[] = [] const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { if (evt.properties.sessionID !== chat.id) return @@ -604,27 +591,26 @@ it.live("session.processor effect tests publish retry status updates", () => { off() expect(value).toBe("continue") - expect(yield* test.calls).toBe(2) + expect(yield* llm.calls).toBe(2) expect(states).toStrictEqual([1]) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests compact on structured context overflow", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests compact on structured context overflow", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service - yield* test.push(fail({ type: "error", error: { code: "context_length_exceeded" } }, start())) + yield* llm.error(400, { type: "error", error: { code: "context_length_exceeded" } }) const chat = yield* session.create({}) const parent = yield* user(chat.id, "compact json") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -649,32 +635,27 @@ it.live("session.processor effect tests compact on structured context overflow", }) expect(value).toBe("compact") - expect(yield* test.calls).toBe(1) + expect(yield* llm.calls).toBe(1) expect(handle.message.error).toBeUndefined() }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const ready = defer() - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service - yield* test.push((input) => - hang(input, start(), toolInputStart("tool-1", "bash"), toolCall("tool-1", "bash", { cmd: "pwd" })).pipe( - Stream.tap((event) => (event.type === "tool-call" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + yield* llm.tool("bash", { cmd: "pwd" }) + yield* llm.hang const chat = yield* session.create({}) const parent = yield* user(chat.id, "tool abort") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -700,7 +681,8 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup }) .pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + yield* llm.wait(1) + yield* Effect.sleep("100 millis") yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) @@ -714,39 +696,33 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup if (Exit.isFailure(exit)) { expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true) } - expect(yield* test.calls).toBe(1) + expect(yield* llm.calls).toBe(1) expect(tool?.state.status).toBe("error") if (tool?.state.status === "error") { expect(tool.state.error).toBe("Tool execution aborted") expect(tool.state.time.end).toBeDefined() } }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests record aborted errors and idle state", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests record aborted errors and idle state", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const ready = defer() const seen = defer() - const test = yield* TestLLM const processors = yield* SessionProcessor.Service const session = yield* Session.Service const bus = yield* Bus.Service - const status = yield* SessionStatus.Service + const sts = yield* SessionStatus.Service - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + yield* llm.hang const chat = yield* session.create({}) const parent = yield* user(chat.id, "abort") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const errs: string[] = [] const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => { if (evt.properties.sessionID !== chat.id) return @@ -779,7 +755,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( }) .pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + yield* llm.wait(1) yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) @@ -788,7 +764,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( } yield* Effect.promise(() => seen.promise) const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) - const state = yield* status.get(chat.id) + const state = yield* sts.get(chat.id) off() expect(Exit.isFailure(exit)).toBe(true) @@ -803,30 +779,24 @@ it.live("session.processor effect tests record aborted errors and idle state", ( expect(state).toMatchObject({ type: "idle" }) expect(errs).toContain("MessageAbortedError") }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +) -it.live("session.processor effect tests mark interruptions aborted without manual abort", () => { - return provideTmpdirInstance( - (dir) => +it.live("session.processor effect tests mark interruptions aborted without manual abort", () => + provideTmpdirServer( + ({ dir, llm }) => Effect.gen(function* () { - const ready = defer() const processors = yield* SessionProcessor.Service const session = yield* Session.Service - const status = yield* SessionStatus.Service - const test = yield* TestLLM + const sts = yield* SessionStatus.Service - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + yield* llm.hang const chat = yield* session.create({}) const parent = yield* user(chat.id, "interrupt") const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) - const mdl = model(100) + const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID)) const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, @@ -852,12 +822,12 @@ it.live("session.processor effect tests mark interruptions aborted without manua }) .pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + yield* llm.wait(1) yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) - const state = yield* status.get(chat.id) + const state = yield* sts.get(chat.id) expect(Exit.isFailure(exit)).toBe(true) expect(handle.message.error?.name).toBe("MessageAbortedError") @@ -867,6 +837,6 @@ it.live("session.processor effect tests mark interruptions aborted without manua } expect(state).toMatchObject({ type: "idle" }) }), - { git: true }, - ) -}) + { git: true, config: (url) => providerCfg(url) }, + ), +)