test: migrate processor tests to HTTP mock LLM server

Replace the custom TestLLM Effect service with the real LLM layer +
TestLLMServer HTTP mock for 9 of 10 processor tests. Tests now exercise
the full HTTP→SSE→AI SDK→processor pipeline.

- Export Provider.defaultLayer for test layer composition
- Add boot() helper for common service access (processor, session, provider)
- Extend TestLLMServer with usage support and httpError step type
- Tool abort test registers a real tool with hanging execute
- Reasoning test stays with in-process TestLLM (needs fine-grained events)
test/processor-mock-server
Kit Langton 2026-03-31 19:59:56 -04:00
parent 537cc32bf0
commit 7f6a5bb2c8
2 changed files with 92 additions and 52 deletions

View File

@ -1541,10 +1541,9 @@ export namespace Provider {
}),
)
const { runPromise } = makeRuntime(
Service,
layer.pipe(Layer.provide(Config.defaultLayer), Layer.provide(Auth.defaultLayer)),
)
export const defaultLayer = layer.pipe(Layer.provide(Config.defaultLayer), Layer.provide(Auth.defaultLayer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function list() {
return runPromise((svc) => svc.list())

View File

@ -1,6 +1,6 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { APICallError } from "ai"
import { APICallError, jsonSchema, tool } from "ai"
import { Cause, Effect, Exit, Fiber, Layer, ServiceMap, Stream } from "effect"
import path from "path"
import type { Agent } from "../../src/agent/agent"
@ -26,6 +26,13 @@ import { TestLLMServer } from "../lib/llm-server"
Log.init({ print: false })
const DEBUG = process.env.OPENCODE_TEST_DEBUG === "1"
function trace(label: string, value: unknown) {
if (!DEBUG) return
console.log(label, JSON.stringify(value, null, 2))
}
const ref = {
providerID: ProviderID.make("test"),
modelID: ModelID.make("test-model"),
@ -154,12 +161,10 @@ const deps = Layer.mergeAll(
Plugin.defaultLayer,
Config.defaultLayer,
LLM.defaultLayer,
Provider.defaultLayer,
status,
).pipe(Layer.provideMerge(infra))
const env = Layer.mergeAll(
TestLLMServer.layer,
SessionProcessor.layer.pipe(Layer.provideMerge(deps)),
)
const env = Layer.mergeAll(TestLLMServer.layer, SessionProcessor.layer.pipe(Layer.provideMerge(deps)))
const it = testEffect(env)
@ -266,6 +271,13 @@ function reasoningModel(context: number): Provider.Model {
} as Provider.Model
}
const boot = Effect.fn("test.boot")(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const provider = yield* Provider.Service
return { processors, session, provider }
})
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
@ -274,15 +286,14 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
yield* llm.text("hello")
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
@ -322,15 +333,14 @@ it.live("session.processor effect tests stop after token overflow requests compa
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
yield* llm.text("after", { usage: { input: 100, output: 0 } })
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const base = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const base = yield* provider.getModel(ref.providerID, ref.modelID)
const mdl = { ...base, limit: { context: 20, output: 10 } }
const handle = yield* processors.create({
assistantMessage: msg,
@ -357,8 +367,14 @@ it.live("session.processor effect tests stop after token overflow requests compa
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
trace("overflow", {
value,
parts: parts.map((part) => part.type),
inputs: yield* llm.inputs,
})
expect(value).toBe("compact")
expect(parts.some((part) => part.type === "text")).toBe(false)
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
expect(parts.some((part) => part.type === "step-finish")).toBe(true)
}),
{ git: true, config: (url) => providerCfg(url) },
@ -375,11 +391,11 @@ reasoningIt.live("session.processor effect tests reset reasoning state across re
const session = yield* Session.Service
yield* test.push(
Stream.make<LLM.Event>(
Stream.fromIterable<LLM.Event>([
{ type: "start" },
{ type: "reasoning-start", id: "r" },
{ type: "reasoning-delta", id: "r", text: "one" },
).pipe(
]).pipe(
Stream.concat(
Stream.fail(
new APICallError({
@ -440,9 +456,7 @@ reasoningIt.live("session.processor effect tests reset reasoning state across re
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const reasoning = parts.filter(
(part): part is MessageV2.ReasoningPart => part.type === "reasoning",
)
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
expect(value).toBe("continue")
expect(yield* test.calls).toBe(2)
@ -457,15 +471,14 @@ it.live("session.processor effect tests do not retry unknown json errors", () =>
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
yield* llm.error(500, { error: { message: "no_kv_space" } })
yield* llm.error(400, { error: { message: "no_kv_space" } })
const chat = yield* session.create({})
const parent = yield* user(chat.id, "json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
@ -489,9 +502,16 @@ it.live("session.processor effect tests do not retry unknown json errors", () =>
tools: {},
})
trace("unknown-error", {
value,
calls: yield* llm.calls,
inputs: yield* llm.inputs,
error: handle.message.error,
})
expect(value).toBe("stop")
expect(yield* llm.calls).toBe(1)
expect(handle.message.error?.name).toBe("UnknownError")
expect(handle.message.error?.name).toBe("APIError")
}),
{ git: true, config: (url) => providerCfg(url) },
),
@ -501,8 +521,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
yield* llm.error(429, { type: "error", error: { type: "too_many_requests" } })
yield* llm.text("after")
@ -510,7 +529,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
@ -549,8 +568,7 @@ it.live("session.processor effect tests publish retry status updates", () =>
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
const bus = yield* Bus.Service
yield* llm.error(503, { error: "boom" })
@ -559,7 +577,7 @@ it.live("session.processor effect tests publish retry status updates", () =>
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const states: number[] = []
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
if (evt.properties.sessionID !== chat.id) return
@ -602,15 +620,14 @@ it.live("session.processor effect tests compact on structured context overflow",
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
yield* llm.error(400, { type: "error", error: { code: "context_length_exceeded" } })
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
@ -646,16 +663,15 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
const wait = new Promise<{ output: string }>(() => {})
yield* llm.tool("bash", { cmd: "pwd" })
yield* llm.hang
const chat = yield* session.create({})
const parent = yield* user(chat.id, "tool abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
@ -677,12 +693,29 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
agent: agent(),
system: [],
messages: [{ role: "user", content: "tool abort" }],
tools: {},
tools: {
bash: tool({
description: "Run shell commands",
inputSchema: jsonSchema({
type: "object",
properties: {
cmd: { type: "string" },
},
required: ["cmd"],
}),
execute: async () => wait,
}),
},
})
.pipe(Effect.forkChild)
yield* llm.wait(1)
yield* Effect.sleep("100 millis")
yield* Effect.promise(async () => {
const end = Date.now() + 500
while (!handle.partFromToolCall("call_1") && Date.now() < end) {
await Bun.sleep(10)
}
})
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
@ -690,17 +723,27 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
yield* handle.abort()
}
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
trace("tool-abort", {
calls: yield* llm.calls,
inputs: yield* llm.inputs,
pending: handle.partFromToolCall("call_1"),
parts: parts.map((part) => ({
type: part.type,
...(part.type === "tool" ? { state: part.state.status } : {}),
})),
})
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
}
expect(yield* llm.calls).toBe(1)
expect(tool?.state.status).toBe("error")
if (tool?.state.status === "error") {
expect(tool.state.error).toBe("Tool execution aborted")
expect(tool.state.time.end).toBeDefined()
expect(call?.state.status).toBe("error")
if (call?.state.status === "error") {
expect(call.state.error).toBe("Tool execution aborted")
expect(call.state.time.end).toBeDefined()
}
}),
{ git: true, config: (url) => providerCfg(url) },
@ -712,8 +755,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
({ dir, llm }) =>
Effect.gen(function* () {
const seen = defer<void>()
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
const bus = yield* Bus.Service
const sts = yield* SessionStatus.Service
@ -722,7 +764,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
const chat = yield* session.create({})
const parent = yield* user(chat.id, "abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const errs: string[] = []
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
if (evt.properties.sessionID !== chat.id) return
@ -787,8 +829,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
const { processors, session, provider } = yield* boot()
const sts = yield* SessionStatus.Service
yield* llm.hang
@ -796,7 +837,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
const chat = yield* session.create({})
const parent = yield* user(chat.id, "interrupt")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* Effect.promise(() => Provider.getModel(ref.providerID, ref.modelID))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,