opencode/packages/opencode/test/session/prompt-effect.test.ts

1257 lines
40 KiB
TypeScript

import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import path from "path"
import z from "zod"
import { Agent as AgentSvc } from "../../src/agent/agent"
import { Bus } from "../../src/bus"
import { Command } from "../../src/command"
import { Config } from "../../src/config/config"
import { FileTime } from "../../src/file/time"
import { LSP } from "../../src/lsp"
import { MCP } from "../../src/mcp"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
import { Provider as ProviderSvc } from "../../src/provider/provider"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { Question } from "../../src/question"
import { Todo } from "../../src/session/todo"
import { Session } from "../../src/session"
import { LLM } from "../../src/session/llm"
import { MessageV2 } from "../../src/session/message-v2"
import { AppFileSystem } from "../../src/filesystem"
import { SessionCompaction } from "../../src/session/compaction"
import { Instruction } from "../../src/session/instruction"
import { SessionProcessor } from "../../src/session/processor"
import { SessionPrompt } from "../../src/session/prompt"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status"
import { Shell } from "../../src/shell/shell"
import { Snapshot } from "../../src/snapshot"
import { ToolRegistry } from "../../src/tool/registry"
import { Truncate } from "../../src/tool/truncate"
import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { reply, TestLLMServer } from "../lib/llm-server"
Log.init({ print: false })
const ref = {
providerID: ProviderID.make("test"),
modelID: ModelID.make("test-model"),
}
function defer<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((done) => {
resolve = done
})
return { promise, resolve }
}
function withSh<A, E, R>(fx: () => Effect.Effect<A, E, R>) {
return Effect.acquireUseRelease(
Effect.sync(() => {
const prev = process.env.SHELL
process.env.SHELL = "/bin/sh"
Shell.preferred.reset()
return prev
}),
() => fx(),
(prev) =>
Effect.sync(() => {
if (prev === undefined) delete process.env.SHELL
else process.env.SHELL = prev
Shell.preferred.reset()
}),
)
}
function toolPart(parts: MessageV2.Part[]) {
return parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
}
type CompletedToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateCompleted }
type ErrorToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateError }
function completedTool(parts: MessageV2.Part[]) {
const part = toolPart(parts)
expect(part?.state.status).toBe("completed")
return part?.state.status === "completed" ? (part as CompletedToolPart) : undefined
}
function errorTool(parts: MessageV2.Part[]) {
const part = toolPart(parts)
expect(part?.state.status).toBe("error")
return part?.state.status === "error" ? (part as ErrorToolPart) : undefined
}
const mcp = Layer.succeed(
MCP.Service,
MCP.Service.of({
status: () => Effect.succeed({}),
clients: () => Effect.succeed({}),
tools: () => Effect.succeed({}),
prompts: () => Effect.succeed({}),
resources: () => Effect.succeed({}),
add: () => Effect.succeed({ status: { status: "disabled" as const } }),
connect: () => Effect.void,
disconnect: () => Effect.void,
getPrompt: () => Effect.succeed(undefined),
readResource: () => Effect.succeed(undefined),
startAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
authenticate: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
finishAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
removeAuth: () => Effect.void,
supportsOAuth: () => Effect.succeed(false),
hasStoredTokens: () => Effect.succeed(false),
getAuthStatus: () => Effect.succeed("not_authenticated" as const),
}),
)
const lsp = Layer.succeed(
LSP.Service,
LSP.Service.of({
init: () => Effect.void,
status: () => Effect.succeed([]),
hasClients: () => Effect.succeed(false),
touchFile: () => Effect.void,
diagnostics: () => Effect.succeed({}),
hover: () => Effect.succeed(undefined),
definition: () => Effect.succeed([]),
references: () => Effect.succeed([]),
implementation: () => Effect.succeed([]),
documentSymbol: () => Effect.succeed([]),
workspaceSymbol: () => Effect.succeed([]),
prepareCallHierarchy: () => Effect.succeed([]),
incomingCalls: () => Effect.succeed([]),
outgoingCalls: () => Effect.succeed([]),
}),
)
const filetime = Layer.succeed(
FileTime.Service,
FileTime.Service.of({
read: () => Effect.void,
get: () => Effect.succeed(undefined),
assert: () => Effect.void,
withLock: (_filepath, fn) => Effect.promise(fn),
}),
)
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
function makeHttp() {
const deps = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
LLM.defaultLayer,
AgentSvc.defaultLayer,
Command.defaultLayer,
Permission.defaultLayer,
Plugin.defaultLayer,
Config.defaultLayer,
ProviderSvc.defaultLayer,
filetime,
lsp,
mcp,
AppFileSystem.defaultLayer,
status,
).pipe(Layer.provideMerge(infra))
const question = Question.layer.pipe(Layer.provideMerge(deps))
const todo = Todo.layer.pipe(Layer.provideMerge(deps))
const registry = ToolRegistry.layer.pipe(
Layer.provideMerge(todo),
Layer.provideMerge(question),
Layer.provideMerge(deps),
)
const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps))
return Layer.mergeAll(
TestLLMServer.layer,
SessionPrompt.layer.pipe(
Layer.provideMerge(compact),
Layer.provideMerge(proc),
Layer.provideMerge(registry),
Layer.provideMerge(trunc),
Layer.provide(Instruction.defaultLayer),
Layer.provideMerge(deps),
),
)
}
const it = testEffect(makeHttp())
const unix = process.platform !== "win32" ? it.live : it.live.skip
// Config that registers a custom "test" provider with a "test-model" model
// so Provider.getModel("test", "test-model") succeeds inside the loop.
const cfg = {
provider: {
test: {
name: "Test",
id: "test",
env: [],
npm: "@ai-sdk/openai-compatible",
models: {
"test-model": {
id: "test-model",
name: "Test Model",
attachment: false,
reasoning: false,
temperature: false,
tool_call: true,
release_date: "2025-01-01",
limit: { context: 100000, output: 10000 },
cost: { input: 0, output: 0 },
options: {},
},
},
options: {
apiKey: "test-key",
baseURL: "http://localhost:1/v1",
},
},
},
}
function providerCfg(url: string) {
return {
...cfg,
provider: {
...cfg.provider,
test: {
...cfg.provider.test,
options: {
...cfg.provider.test.options,
baseURL: url,
},
},
},
}
}
const user = Effect.fn("test.user")(function* (sessionID: SessionID, text: string) {
const session = yield* Session.Service
const msg = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
sessionID,
agent: "build",
model: ref,
time: { created: Date.now() },
})
yield* session.updatePart({
id: PartID.ascending(),
messageID: msg.id,
sessionID,
type: "text",
text,
})
return msg
})
const seed = Effect.fn("test.seed")(function* (sessionID: SessionID, opts?: { finish?: string }) {
const session = yield* Session.Service
const msg = yield* user(sessionID, "hello")
const assistant: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
parentID: msg.id,
sessionID,
mode: "build",
agent: "build",
cost: 0,
path: { cwd: "/tmp", root: "/tmp" },
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: ref.modelID,
providerID: ref.providerID,
time: { created: Date.now() },
...(opts?.finish ? { finish: opts.finish } : {}),
}
yield* session.updateMessage(assistant)
yield* session.updatePart({
id: PartID.ascending(),
messageID: assistant.id,
sessionID,
type: "text",
text: "hi there",
})
return { user: msg, assistant }
})
const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
Effect.gen(function* () {
const session = yield* Session.Service
yield* session.updatePart({
id: PartID.ascending(),
messageID,
sessionID,
type: "subtask",
prompt: "look into the cache key path",
description: "inspect bug",
agent: "general",
model,
})
})
const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create(input ?? { title: "Pinned" })
return { prompt, sessions, chat }
})
// Loop semantics
it.live("loop exits immediately when last assistant has stop finish", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* seed(chat.id, { finish: "stop" })
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
expect(yield* llm.calls).toBe(0)
}),
{ git: true, config: providerCfg },
),
)
it.live("loop calls LLM and returns assistant message", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({
title: "Pinned",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
})
yield* prompt.prompt({
sessionID: chat.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello" }],
})
yield* llm.text("world")
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
const parts = result.parts.filter((p) => p.type === "text")
expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true)
expect(yield* llm.hits).toHaveLength(1)
}),
{ git: true, config: providerCfg },
),
)
it.live("static loop returns assistant text through local provider", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const session = yield* Effect.promise(() =>
Session.create({
title: "Prompt provider",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
}),
)
yield* Effect.promise(() =>
SessionPrompt.prompt({
sessionID: session.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello" }],
}),
)
yield* llm.text("world")
const result = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id }))
expect(result.info.role).toBe("assistant")
expect(result.parts.some((part) => part.type === "text" && part.text === "world")).toBe(true)
expect(yield* llm.hits).toHaveLength(1)
expect(yield* llm.pending).toBe(0)
}),
{ git: true, config: providerCfg },
),
)
it.live("static loop consumes queued replies across turns", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const session = yield* Effect.promise(() =>
Session.create({
title: "Prompt provider turns",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
}),
)
yield* Effect.promise(() =>
SessionPrompt.prompt({
sessionID: session.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello one" }],
}),
)
yield* llm.text("world one")
const first = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id }))
expect(first.info.role).toBe("assistant")
expect(first.parts.some((part) => part.type === "text" && part.text === "world one")).toBe(true)
yield* Effect.promise(() =>
SessionPrompt.prompt({
sessionID: session.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello two" }],
}),
)
yield* llm.text("world two")
const second = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id }))
expect(second.info.role).toBe("assistant")
expect(second.parts.some((part) => part.type === "text" && part.text === "world two")).toBe(true)
expect(yield* llm.hits).toHaveLength(2)
expect(yield* llm.pending).toBe(0)
}),
{ git: true, config: providerCfg },
),
)
it.live("loop continues when finish is tool-calls", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const session = yield* sessions.create({
title: "Pinned",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
})
yield* prompt.prompt({
sessionID: session.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello" }],
})
yield* llm.tool("first", { value: "first" })
yield* llm.text("second")
const result = yield* prompt.loop({ sessionID: session.id })
expect(yield* llm.calls).toBe(2)
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") {
expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
expect(result.info.finish).toBe("stop")
}
}),
{ git: true, config: providerCfg },
),
)
it.live("loop continues when finish is stop but assistant has tool parts", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const session = yield* sessions.create({
title: "Pinned",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
})
yield* prompt.prompt({
sessionID: session.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello" }],
})
yield* llm.push(reply().tool("first", { value: "first" }).stop())
yield* llm.text("second")
const result = yield* prompt.loop({ sessionID: session.id })
expect(yield* llm.calls).toBe(2)
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") {
expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
expect(result.info.finish).toBe("stop")
}
}),
{ git: true, config: providerCfg },
),
)
it.live("failed subtask preserves metadata on error tool state", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.tool("task", {
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
})
yield* llm.text("done")
const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id)
const result = yield* prompt.loop({ sessionID: chat.id })
expect(result.info.role).toBe("assistant")
expect(yield* llm.calls).toBe(2)
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
const tool = errorTool(taskMsg.parts)
if (!tool) return
expect(tool.state.error).toContain("Tool execution failed")
expect(tool.state.metadata).toBeDefined()
expect(tool.state.metadata?.sessionId).toBeDefined()
expect(tool.state.metadata?.model).toEqual({
providerID: ProviderID.make("test"),
modelID: ModelID.make("missing-model"),
})
}),
{
git: true,
config: (url) => ({
...providerCfg(url),
agent: {
general: {
model: "test/missing-model",
},
},
}),
},
),
)
it.live(
"loop sets status to busy then idle",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const status = yield* SessionStatus.Service
yield* llm.hang
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
expect((yield* status.get(chat.id)).type).toBe("busy")
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
expect((yield* status.get(chat.id)).type).toBe("idle")
}),
{ git: true, config: providerCfg },
),
3_000,
)
// Cancel semantics
it.live(
"cancel interrupts loop and resolves with an assistant message",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* seed(chat.id)
yield* llm.hang
yield* user(chat.id, "more")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
}
}),
{ git: true, config: providerCfg },
),
3_000,
)
it.live(
"cancel records MessageAbortedError on interrupted process",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hello")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
const info = exit.value.info
if (info.role === "assistant") {
expect(info.error?.name).toBe("MessageAbortedError")
}
}
}),
{ git: true, config: providerCfg },
),
3_000,
)
it.live(
"cancel finalizes subtask tool state",
() =>
provideTmpdirInstance(
() =>
Effect.gen(function* () {
const ready = defer<void>()
const aborted = defer<void>()
const registry = yield* ToolRegistry.Service
const init = registry.named.task.init
registry.named.task.init = async () => ({
description: "task",
parameters: z.object({
description: z.string(),
prompt: z.string(),
subagent_type: z.string(),
task_id: z.string().optional(),
command: z.string().optional(),
}),
execute: async (_args, ctx) => {
ctx.metadata({
title: "inspect bug",
metadata: {
sessionId: SessionID.make("task"),
model: ref,
},
})
ready.resolve()
ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true })
await new Promise<void>(() => {})
return {
title: "",
metadata: {
sessionId: SessionID.make("task"),
model: ref,
},
output: "",
}
},
})
yield* Effect.addFinalizer(() => Effect.sync(() => void (registry.named.task.init = init)))
const { prompt, chat } = yield* boot()
const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
yield* prompt.cancel(chat.id)
yield* Effect.promise(() => aborted.promise)
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
const tool = errorTool(taskMsg.parts)
if (!tool) return
expect(tool.state.error).toBe("Cancelled")
expect(tool.state.input).toEqual({
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
})
expect(tool.state.metadata).toEqual({
sessionId: SessionID.make("task"),
model: ref,
})
expect(taskMsg.info.time.completed).toBeDefined()
expect(taskMsg.info.finish).toBeDefined()
}),
{ git: true, config: cfg },
),
30_000,
)
it.live(
"cancel with queued callers resolves all cleanly",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hello")
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
expect(exitA.value.info.id).toBe(exitB.value.info.id)
}
}),
{ git: true, config: providerCfg },
),
3_000,
)
// Queue semantics
it.live("concurrent loop callers get same result", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
yield* seed(chat.id, { finish: "stop" })
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
concurrency: "unbounded",
})
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
yield* prompt.assertNotBusy(chat.id)
}),
{ git: true },
),
)
it.live(
"concurrent loop callers all receive same error result",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.fail("boom")
yield* user(chat.id, "hello")
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
concurrency: "unbounded",
})
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
}),
{ git: true, config: providerCfg },
),
3_000,
)
it.live(
"prompt submitted during an active run is included in the next LLM input",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const gate = defer<void>()
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hold("first", gate.promise)
yield* llm.text("second")
const a = yield* prompt
.prompt({
sessionID: chat.id,
agent: "build",
model: ref,
parts: [{ type: "text", text: "first" }],
})
.pipe(Effect.forkChild)
yield* llm.wait(1)
const id = MessageID.ascending()
const b = yield* prompt
.prompt({
sessionID: chat.id,
messageID: id,
agent: "build",
model: ref,
parts: [{ type: "text", text: "second" }],
})
.pipe(Effect.forkChild)
yield* Effect.promise(async () => {
const end = Date.now() + 5000
while (Date.now() < end) {
const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
await new Promise((done) => setTimeout(done, 20))
}
throw new Error("timed out waiting for second prompt to save")
})
gate.resolve()
const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(ea)).toBe(true)
expect(Exit.isSuccess(eb)).toBe(true)
expect(yield* llm.calls).toBe(2)
const msgs = yield* sessions.messages({ sessionID: chat.id })
const assistants = msgs.filter((msg) => msg.info.role === "assistant")
expect(assistants).toHaveLength(2)
const last = assistants.at(-1)
if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
expect(last.info.parentID).toBe(id)
expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
const inputs = yield* llm.inputs
expect(inputs).toHaveLength(2)
expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
}),
{ git: true, config: providerCfg },
),
3_000,
)
it.live(
"assertNotBusy throws BusyError when loop running",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
yield* llm.hang
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
}),
{ git: true, config: providerCfg },
),
3_000,
)
it.live("assertNotBusy succeeds when idle", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({})
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isSuccess(exit)).toBe(true)
}),
{ git: true },
),
)
// Shell semantics
it.live(
"shell rejects with BusyError when loop running",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({ title: "Pinned" })
yield* llm.hang
yield* user(chat.id, "hi")
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* llm.wait(1)
const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
}),
{ git: true, config: providerCfg },
),
3_000,
)
unix("shell captures stdout and stderr in completed tool output", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
command: "printf out && printf err >&2",
})
expect(result.info.role).toBe("assistant")
const tool = completedTool(result.parts)
if (!tool) return
expect(tool.state.output).toContain("out")
expect(tool.state.output).toContain("err")
expect(tool.state.metadata.output).toContain("out")
expect(tool.state.metadata.output).toContain("err")
yield* prompt.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
)
unix("shell completes a fast command on the preferred shell", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
command: "pwd",
})
expect(result.info.role).toBe("assistant")
const tool = completedTool(result.parts)
if (!tool) return
expect(tool.state.input.command).toBe("pwd")
expect(tool.state.output).toContain(dir)
expect(tool.state.metadata.output).toContain(dir)
yield* prompt.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
)
unix("shell lists files from the project directory", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
yield* Effect.promise(() => Bun.write(path.join(dir, "README.md"), "# e2e\n"))
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
command: "command ls",
})
expect(result.info.role).toBe("assistant")
const tool = completedTool(result.parts)
if (!tool) return
expect(tool.state.input.command).toBe("command ls")
expect(tool.state.output).toContain("README.md")
expect(tool.state.metadata.output).toContain("README.md")
yield* prompt.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
)
unix("shell captures stderr from a failing command", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const result = yield* prompt.shell({
sessionID: chat.id,
agent: "build",
command: "command -v __nonexistent_cmd_e2e__ || echo 'not found' >&2; exit 1",
})
expect(result.info.role).toBe("assistant")
const tool = completedTool(result.parts)
if (!tool) return
expect(tool.state.output).toContain("not found")
expect(tool.state.metadata.output).toContain("not found")
yield* prompt.assertNotBusy(chat.id)
}),
{ git: true, config: cfg },
),
)
unix(
"shell updates running metadata before process exit",
() =>
withSh(() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const fiber = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "printf first && sleep 0.2 && printf second" })
.pipe(Effect.forkChild)
yield* Effect.promise(async () => {
const start = Date.now()
while (Date.now() - start < 5000) {
const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id))
const taskMsg = msgs.find((item) => item.info.role === "assistant")
const tool = taskMsg ? toolPart(taskMsg.parts) : undefined
if (tool?.state.status === "running" && tool.state.metadata?.output.includes("first")) return
await new Promise((done) => setTimeout(done, 20))
}
throw new Error("timed out waiting for running shell metadata")
})
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
}),
{ git: true, config: cfg },
),
),
30_000,
)
it.live(
"loop waits while shell runs and starts after shell exits",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({
title: "Pinned",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
})
yield* llm.text("after-shell")
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
.pipe(Effect.forkChild)
yield* Effect.sleep(50)
const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
expect(yield* llm.calls).toBe(0)
yield* Fiber.await(sh)
const exit = yield* Fiber.await(loop)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true)
}
expect(yield* llm.calls).toBe(1)
}),
{ git: true, config: providerCfg },
),
3_000,
)
it.live(
"shell completion resumes queued loop callers",
() =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create({
title: "Pinned",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
})
yield* llm.text("done")
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
.pipe(Effect.forkChild)
yield* Effect.sleep(50)
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
expect(yield* llm.calls).toBe(0)
yield* Fiber.await(sh)
const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
expect(Exit.isSuccess(ea)).toBe(true)
expect(Exit.isSuccess(eb)).toBe(true)
if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) {
expect(ea.value.info.id).toBe(eb.value.info.id)
expect(ea.value.info.role).toBe("assistant")
}
expect(yield* llm.calls).toBe(1)
}),
{ git: true, config: providerCfg },
),
3_000,
)
unix(
"cancel interrupts shell and resolves cleanly",
() =>
withSh(() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
const status = yield* SessionStatus.Service
expect((yield* status.get(chat.id)).type).toBe("idle")
const busy = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isSuccess(busy)).toBe(true)
const exit = yield* Fiber.await(sh)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
const tool = completedTool(exit.value.parts)
if (tool) {
expect(tool.state.output).toContain("User aborted the command")
}
}
}),
{ git: true, config: cfg },
),
),
30_000,
)
unix(
"cancel persists aborted shell result when shell ignores TERM",
() =>
withSh(() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" })
.pipe(Effect.forkChild)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(sh)
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
const tool = completedTool(exit.value.parts)
if (tool) {
expect(tool.state.output).toContain("User aborted the command")
}
}
}),
{ git: true, config: cfg },
),
),
30_000,
)
unix(
"cancel interrupts loop queued behind shell",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* Effect.sleep(50)
const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(loop)
expect(Exit.isSuccess(exit)).toBe(true)
yield* Fiber.await(sh)
}),
{ git: true, config: cfg },
),
30_000,
)
unix(
"shell rejects when another shell is already running",
() =>
withSh(() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const a = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* Effect.sleep(50)
const exit = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "echo hi" })
.pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(a)
}),
{ git: true, config: cfg },
),
),
30_000,
)