Compare commits

...

2 Commits

Author SHA1 Message Date
Kit Langton 6fe0f52a05 fix(task): preserve subtask cancellation during startup 2026-04-07 11:12:29 -04:00
Kit Langton a3bf978919 refactor(prompt): remove prompt abort-signal plumbing
Keep prompt reads, subtask execution, and shell runner wiring on shared Effect services so prompt no longer carries its own abort-signal adapters or duplicate task flow.
2026-04-05 11:56:39 -04:00
10 changed files with 632 additions and 515 deletions

View File

@ -1,10 +1,10 @@
import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect" import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect"
export interface Runner<A, E = never> { export interface Runner<A, E = never> {
readonly state: Runner.State<A, E> readonly state: Runner.State<A, E>
readonly busy: boolean readonly busy: boolean
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E> readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E> readonly startShell: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly cancel: Effect.Effect<void> readonly cancel: Effect.Effect<void>
} }
@ -20,7 +20,6 @@ export namespace Runner {
interface ShellHandle<A, E> { interface ShellHandle<A, E> {
id: number id: number
fiber: Fiber.Fiber<A, E> fiber: Fiber.Fiber<A, E>
abort: AbortController
} }
interface PendingHandle<A, E> { interface PendingHandle<A, E> {
@ -102,9 +101,7 @@ export namespace Runner {
const stopShell = (shell: ShellHandle<A, E>) => const stopShell = (shell: ShellHandle<A, E>) =>
Effect.gen(function* () { Effect.gen(function* () {
shell.abort.abort() yield* Fiber.interrupt(shell.fiber)
const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid) yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
}) })
@ -138,7 +135,7 @@ export namespace Runner {
), ),
) )
const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) => const startShell = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect( SynchronizedRef.modifyEffect(
ref, ref,
Effect.fnUntraced(function* (st) { Effect.fnUntraced(function* (st) {
@ -153,9 +150,8 @@ export namespace Runner {
} }
yield* busy yield* busy
const id = next() const id = next()
const abort = new AbortController() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) const shell = { id, fiber } satisfies ShellHandle<A, E>
const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
return [ return [
Effect.gen(function* () { Effect.gen(function* () {
const exit = yield* Fiber.await(fiber) const exit = yield* Fiber.await(fiber)

View File

@ -24,7 +24,6 @@ import { ToolRegistry } from "../tool/registry"
import { Runner } from "@/effect/runner" import { Runner } from "@/effect/runner"
import { MCP } from "../mcp" import { MCP } from "../mcp"
import { LSP } from "../lsp" import { LSP } from "../lsp"
import { ReadTool } from "../tool/read"
import { FileTime } from "../file/time" import { FileTime } from "../file/time"
import { Flag } from "../flag/flag" import { Flag } from "../flag/flag"
import { ulid } from "ulid" import { ulid } from "ulid"
@ -33,11 +32,11 @@ import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import * as Stream from "effect/Stream" import * as Stream from "effect/Stream"
import { Command } from "../command" import { Command } from "../command"
import { pathToFileURL, fileURLToPath } from "url" import { pathToFileURL, fileURLToPath } from "url"
import { Config } from "../config/config"
import { ConfigMarkdown } from "../config/markdown" import { ConfigMarkdown } from "../config/markdown"
import { SessionSummary } from "./summary" import { SessionSummary } from "./summary"
import { NamedError } from "@opencode-ai/util/error" import { NamedError } from "@opencode-ai/util/error"
import { SessionProcessor } from "./processor" import { SessionProcessor } from "./processor"
import { TaskTool } from "@/tool/task"
import { Tool } from "@/tool/tool" import { Tool } from "@/tool/tool"
import { Permission } from "@/permission" import { Permission } from "@/permission"
import { SessionStatus } from "./status" import { SessionStatus } from "./status"
@ -47,6 +46,8 @@ import { AppFileSystem } from "@/filesystem"
import { Truncate } from "@/tool/truncate" import { Truncate } from "@/tool/truncate"
import { decodeDataUrl } from "@/util/data-url" import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process" import { Process } from "@/util/process"
import { run as read } from "@/tool/read"
import { output as subtaskOutput, run as subtask } from "@/tool/subtask"
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect" import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state" import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service" import { makeRuntime } from "@/effect/run-service"
@ -101,6 +102,7 @@ export namespace SessionPrompt {
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const scope = yield* Scope.Scope const scope = yield* Scope.Scope
const instruction = yield* Instruction.Service const instruction = yield* Instruction.Service
const llm = yield* LLM.Service
const state = yield* InstanceState.make( const state = yield* InstanceState.make(
Effect.fn("SessionPrompt.state")(function* () { Effect.fn("SessionPrompt.state")(function* () {
@ -218,26 +220,29 @@ export namespace SessionPrompt {
const msgs = onlySubtasks const msgs = onlySubtasks
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
: yield* MessageV2.toModelMessagesEffect(context, mdl) : yield* MessageV2.toModelMessagesEffect(context, mdl)
const text = yield* Effect.promise(async (signal) => { const text = yield* llm
const result = await LLM.stream({ .stream({
agent: ag, agent: ag,
user: firstInfo, user: firstInfo,
system: [], system: [],
small: true, small: true,
tools: {}, tools: {},
model: mdl, model: mdl,
abort: signal,
sessionID: input.session.id, sessionID: input.session.id,
retries: 2, retries: 2,
messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs], messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs],
}) })
return result.text .pipe(
}) Stream.runFold(
() => "",
(text: string, event: LLM.Event) => (event.type === "text-delta" ? text + event.text : text),
),
)
const cleaned = text const cleaned = text
.replace(/<think>[\s\S]*?<\/think>\s*/g, "") .replace(/<think>[\s\S]*?<\/think>\s*/g, "")
.split("\n") .split("\n")
.map((line) => line.trim()) .map((line: string) => line.trim())
.find((line) => line.length > 0) .find((line: string) => line.length > 0)
if (!cleaned) return if (!cleaned) return
const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
yield* sessions yield* sessions
@ -397,41 +402,42 @@ NOTE: At any point in time through this workflow you should feel free to ask the
using _ = log.time("resolveTools") using _ = log.time("resolveTools")
const tools: Record<string, AITool> = {} const tools: Record<string, AITool> = {}
const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({ const context = (args: any, options: ToolExecutionOptions): Tool.Context =>
sessionID: input.session.id, Tool.context({
abort: options.abortSignal!, abort: options.abortSignal,
messageID: input.processor.message.id, callID: options.toolCallId,
callID: options.toolCallId, sessionID: input.session.id,
extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck }, messageID: input.processor.message.id,
agent: input.agent.name, extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck },
messages: input.messages, agent: input.agent.name,
metadata: (val) => messages: input.messages,
Effect.runPromise( metadata: (val) =>
Effect.gen(function* () { Effect.runPromise(
const match = input.processor.partFromToolCall(options.toolCallId) Effect.gen(function* () {
if (!match || !["running", "pending"].includes(match.state.status)) return const match = input.processor.partFromToolCall(options.toolCallId)
yield* sessions.updatePart({ if (!match || !["running", "pending"].includes(match.state.status)) return
...match, yield* sessions.updatePart({
state: { ...match,
title: val.title, state: {
metadata: val.metadata, title: val.title,
status: "running", metadata: val.metadata,
input: args, status: "running",
time: { start: Date.now() }, input: args,
}, time: { start: Date.now() },
}) },
}), })
), }),
ask: (req) => ),
Effect.runPromise( ask: (req) =>
permission.ask({ Effect.runPromise(
...req, permission.ask({
sessionID: input.session.id, ...req,
tool: { messageID: input.processor.message.id, callID: options.toolCallId }, sessionID: input.session.id,
ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []), tool: { messageID: input.processor.message.id, callID: options.toolCallId },
}), ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []),
), }),
}) ),
})
for (const item of yield* registry.tools( for (const item of yield* registry.tools(
{ modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID }, { modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID },
@ -555,13 +561,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the
model: Provider.Model model: Provider.Model
lastUser: MessageV2.User lastUser: MessageV2.User
sessionID: SessionID sessionID: SessionID
session: Session.Info
msgs: MessageV2.WithParts[]
}) { }) {
const { task, model, lastUser, sessionID, session, msgs } = input const { task, model, lastUser, sessionID } = input
const ctx = yield* InstanceState.context const ctx = yield* InstanceState.context
const taskTool = yield* Effect.promise(() => registry.named.task.init()) const taskAgent = yield* agents.get(task.agent)
if (!taskAgent) {
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` })
yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
throw error
}
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
const taskRef = { providerID: taskModel.providerID, modelID: taskModel.id }
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({ const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
id: MessageID.ascending(), id: MessageID.ascending(),
role: "assistant", role: "assistant",
@ -601,57 +614,71 @@ NOTE: At any point in time through this workflow you should feel free to ask the
subagent_type: task.agent, subagent_type: task.agent,
command: task.command, command: task.command,
} }
yield* plugin.trigger("tool.execute.before", { tool: "task", sessionID, callID: part.id }, { args: taskArgs }) yield* plugin.trigger(
"tool.execute.before",
const taskAgent = yield* agents.get(task.agent) { tool: "task", sessionID, callID: part.callID },
if (!taskAgent) { { args: taskArgs },
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) )
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` })
yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
throw error
}
let child: SessionID | undefined
let error: Error | undefined let error: Error | undefined
const result = yield* Effect.promise((signal) => const result = yield* subtask(
taskTool {
.execute(taskArgs, { cfg: Effect.promise(() => Config.get()),
agent: task.agent, get: (taskID) => sessions.get(SessionID.make(taskID)).pipe(Effect.catch(() => Effect.succeed(undefined))),
messageID: assistantMessage.id, create: (input) => sessions.create(input),
sessionID, resolve: resolvePromptParts,
abort: signal, prompt: (input) => prompt({ ...input, messageID: MessageID.ascending() }),
callID: part.callID, },
extra: { bypassAgentCheck: true }, {
messages: msgs, parentID: sessionID,
metadata(val: { title?: string; metadata?: Record<string, any> }) { description: task.description,
return Effect.runPromise( prompt: task.prompt,
Effect.gen(function* () { agent: taskAgent,
part = yield* sessions.updatePart({ model: taskRef,
...part, start(sessionID, model) {
type: "tool", child = sessionID
state: { ...part.state, ...val }, const metadata = { sessionId: sessionID, model }
} satisfies MessageV2.ToolPart) return Effect.runPromise(
}), sessions.updatePart({
) ...part,
}, state: {
ask(req: any) { status: "running",
return Effect.runPromise( input: part.state.input,
permission.ask({ time: part.state.status === "running" ? part.state.time : { start: Date.now() },
...req, title: task.description,
sessionID, metadata,
ruleset: Permission.merge(taskAgent.permission, session.permission ?? []), },
}), } satisfies MessageV2.ToolPart),
) ).then((next) => {
}, part = next
}) })
.catch((e) => { },
error = e instanceof Error ? e : new Error(String(e)) },
log.error("subtask execution failed", { error, agent: task.agent, description: task.description })
return undefined
}),
).pipe( ).pipe(
Effect.flatMap((sub) =>
truncate.output(subtaskOutput(sub.sessionID, sub.text), {}).pipe(
Effect.map((truncated) => ({
title: task.description,
metadata: {
sessionId: sub.sessionID,
model: sub.model,
truncated: truncated.truncated,
...(truncated.truncated && { outputPath: truncated.outputPath }),
},
output: truncated.content,
})),
),
),
Effect.catchCause((cause) => {
const err = Cause.squash(cause)
error = err instanceof Error ? err : new Error(String(err))
log.error("subtask execution failed", { error, agent: task.agent, description: task.description })
return Effect.succeed(undefined)
}),
Effect.onInterrupt(() => Effect.onInterrupt(() =>
Effect.gen(function* () { Effect.gen(function* () {
if (child) yield* cancel(child)
assistantMessage.finish = "tool-calls" assistantMessage.finish = "tool-calls"
assistantMessage.time.completed = Date.now() assistantMessage.time.completed = Date.now()
yield* sessions.updateMessage(assistantMessage) yield* sessions.updateMessage(assistantMessage)
@ -671,16 +698,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
), ),
) )
const attachments = result?.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID,
messageID: assistantMessage.id,
}))
yield* plugin.trigger( yield* plugin.trigger(
"tool.execute.after", "tool.execute.after",
{ tool: "task", sessionID, callID: part.id, args: taskArgs }, { tool: "task", sessionID, callID: part.callID, args: taskArgs },
result, result,
) )
@ -697,7 +717,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
title: result.title, title: result.title,
metadata: result.metadata, metadata: result.metadata,
output: result.output, output: result.output,
attachments,
time: { ...part.state.time, end: Date.now() }, time: { ...part.state.time, end: Date.now() },
}, },
} satisfies MessageV2.ToolPart) } satisfies MessageV2.ToolPart)
@ -740,7 +759,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
} satisfies MessageV2.TextPart) } satisfies MessageV2.TextPart)
}) })
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) { const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) {
const ctx = yield* InstanceState.context const ctx = yield* InstanceState.context
const session = yield* sessions.get(input.sessionID) const session = yield* sessions.get(input.sessionID)
if (session.revert) { if (session.revert) {
@ -1073,6 +1092,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the
log.info("file", { mime: part.mime }) log.info("file", { mime: part.mime })
const filepath = fileURLToPath(part.url) const filepath = fileURLToPath(part.url)
if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory" if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory"
const readCtx = Tool.context({
sessionID: input.sessionID,
agent: info.agent,
messageID: info.id,
extra: { bypassCwdCheck: true },
messages: [],
metadata: () => {},
ask: async () => {},
})
if (part.mime === "text/plain") { if (part.mime === "text/plain") {
let offset: number | undefined let offset: number | undefined
@ -1110,29 +1138,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
text: `Called the Read tool with the following input: ${JSON.stringify(args)}`, text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
}, },
] ]
const read = yield* Effect.promise(() => registry.named.read.init()).pipe( const readResult = yield* read(
Effect.flatMap((t) => { fs: fsys, instruction, lsp, time: filetime, scope },
provider.getModel(info.model.providerID, info.model.modelID).pipe( args,
Effect.flatMap((mdl) => readCtx,
Effect.promise(() => ).pipe(Effect.exit)
t.execute(args, { if (Exit.isSuccess(readResult)) {
sessionID: input.sessionID, const result = readResult.value
abort: new AbortController().signal,
agent: input.agent!,
messageID: info.id,
extra: { bypassCwdCheck: true, model: mdl },
messages: [],
metadata: async () => {},
ask: async () => {},
}),
),
),
),
),
Effect.exit,
)
if (Exit.isSuccess(read)) {
const result = read.value
pieces.push({ pieces.push({
messageID: info.id, messageID: info.id,
sessionID: input.sessionID, sessionID: input.sessionID,
@ -1145,7 +1157,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
...result.attachments.map((a) => ({ ...result.attachments.map((a) => ({
...a, ...a,
synthetic: true, synthetic: true,
filename: a.filename ?? part.filename, filename: "filename" in a && typeof a.filename === "string" ? a.filename : part.filename,
messageID: info.id, messageID: info.id,
sessionID: input.sessionID, sessionID: input.sessionID,
})), })),
@ -1154,7 +1166,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID }) pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
} }
} else { } else {
const error = Cause.squash(read.cause) const error = Cause.squash(readResult.cause)
log.error("failed to read file", { error }) log.error("failed to read file", { error })
const message = error instanceof Error ? error.message : String(error) const message = error instanceof Error ? error.message : String(error)
yield* bus.publish(Session.Event.Error, { yield* bus.publish(Session.Event.Error, {
@ -1174,21 +1186,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the
if (part.mime === "application/x-directory") { if (part.mime === "application/x-directory") {
const args = { filePath: filepath } const args = { filePath: filepath }
const result = yield* Effect.promise(() => registry.named.read.init()).pipe( const result = yield* read({ fs: fsys, instruction, lsp, time: filetime, scope }, args, readCtx).pipe(
Effect.flatMap((t) => Effect.orDie,
Effect.promise(() =>
t.execute(args, {
sessionID: input.sessionID,
abort: new AbortController().signal,
agent: input.agent!,
messageID: info.id,
extra: { bypassCwdCheck: true },
messages: [],
metadata: async () => {},
ask: async () => {},
}),
),
),
) )
return [ return [
{ {
@ -1332,7 +1331,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
} }
if (latest) return latest if (latest) return latest
throw new Error("Impossible") throw new Error("Impossible")
}) }).pipe(Effect.orDie)
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")( const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
function* (sessionID: SessionID) { function* (sessionID: SessionID) {
@ -1393,7 +1392,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const task = tasks.pop() const task = tasks.pop()
if (task?.type === "subtask") { if (task?.type === "subtask") {
yield* handleSubtask({ task, model, lastUser, sessionID, session, msgs }) yield* handleSubtask({ task, model, lastUser, sessionID })
continue continue
} }
@ -1577,7 +1576,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
function* (input: ShellInput) { function* (input: ShellInput) {
const s = yield* InstanceState.get(state) const s = yield* InstanceState.get(state)
const runner = getRunner(s.runners, input.sessionID) const runner = getRunner(s.runners, input.sessionID)
return yield* runner.startShell((signal) => shellImpl(input, signal)) return yield* runner.startShell(shellImpl(input))
}, },
) )
@ -1722,6 +1721,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
Layer.provide(FileTime.defaultLayer), Layer.provide(FileTime.defaultLayer),
Layer.provide(ToolRegistry.defaultLayer), Layer.provide(ToolRegistry.defaultLayer),
Layer.provide(Truncate.layer), Layer.provide(Truncate.layer),
Layer.provide(LLM.defaultLayer),
Layer.provide(Provider.defaultLayer), Layer.provide(Provider.defaultLayer),
Layer.provide(Instruction.defaultLayer), Layer.provide(Instruction.defaultLayer),
Layer.provide(AppFileSystem.defaultLayer), Layer.provide(AppFileSystem.defaultLayer),

View File

@ -11,7 +11,9 @@ type Options = {
kind?: Kind kind?: Kind
} }
export async function assertExternalDirectory(ctx: Tool.Context, target?: string, options?: Options) { type Ctx = Pick<Tool.Context, "ask">
export async function assertExternalDirectory(ctx: Ctx, target?: string, options?: Options) {
if (!target) return if (!target) return
if (options?.bypass) return if (options?.bypass) return
@ -38,7 +40,7 @@ export async function assertExternalDirectory(ctx: Tool.Context, target?: string
} }
export const assertExternalDirectoryEffect = Effect.fn("Tool.assertExternalDirectory")(function* ( export const assertExternalDirectoryEffect = Effect.fn("Tool.assertExternalDirectory")(function* (
ctx: Tool.Context, ctx: Ctx,
target?: string, target?: string,
options?: Options, options?: Options,
) { ) {

View File

@ -25,6 +25,197 @@ const parameters = z.object({
limit: z.coerce.number().describe("The maximum number of lines to read (defaults to 2000)").optional(), limit: z.coerce.number().describe("The maximum number of lines to read (defaults to 2000)").optional(),
}) })
type Ctx = Omit<Tool.Context, "abort">
type Deps = {
fs: AppFileSystem.Interface
instruction: Instruction.Interface
lsp: LSP.Interface
time: FileTime.Interface
scope: Scope.Scope
}
export const run = Effect.fn("ReadTool.run")(function* (deps: Deps, params: z.infer<typeof parameters>, ctx: Ctx) {
const miss = Effect.fn("ReadTool.miss")(function* (filepath: string) {
const dir = path.dirname(filepath)
const base = path.basename(filepath)
const items = yield* deps.fs.readDirectory(dir).pipe(
Effect.map((items) =>
items
.filter(
(item) =>
item.toLowerCase().includes(base.toLowerCase()) || base.toLowerCase().includes(item.toLowerCase()),
)
.map((item) => path.join(dir, item))
.slice(0, 3),
),
Effect.catch(() => Effect.succeed([] as string[])),
)
if (items.length > 0) {
return yield* Effect.fail(
new Error(`File not found: ${filepath}\n\nDid you mean one of these?\n${items.join("\n")}`),
)
}
return yield* Effect.fail(new Error(`File not found: ${filepath}`))
})
const list = Effect.fn("ReadTool.list")(function* (filepath: string) {
const items = yield* deps.fs.readDirectoryEntries(filepath)
return yield* Effect.forEach(
items,
Effect.fnUntraced(function* (item) {
if (item.type === "directory") return item.name + "/"
if (item.type !== "symlink") return item.name
const target = yield* deps.fs
.stat(path.join(filepath, item.name))
.pipe(Effect.catch(() => Effect.succeed(undefined)))
if (target?.type === "Directory") return item.name + "/"
return item.name
}),
{ concurrency: "unbounded" },
).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b))))
})
const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, sessionID: Ctx["sessionID"]) {
yield* deps.lsp.touchFile(filepath, false).pipe(Effect.ignore, Effect.forkIn(deps.scope))
yield* deps.time.read(sessionID, filepath)
})
if (params.offset !== undefined && params.offset < 1) {
return yield* Effect.fail(new Error("offset must be greater than or equal to 1"))
}
let filepath = params.filePath
if (!path.isAbsolute(filepath)) {
filepath = path.resolve(Instance.directory, filepath)
}
if (process.platform === "win32") {
filepath = AppFileSystem.normalizePath(filepath)
}
const title = path.relative(Instance.worktree, filepath)
const stat = yield* deps.fs.stat(filepath).pipe(
Effect.catchIf(
(err) => "reason" in err && err.reason._tag === "NotFound",
() => Effect.succeed(undefined),
),
)
yield* assertExternalDirectoryEffect(ctx, filepath, {
bypass: Boolean(ctx.extra?.["bypassCwdCheck"]),
kind: stat?.type === "Directory" ? "directory" : "file",
})
yield* Effect.promise(() =>
ctx.ask({
permission: "read",
patterns: [filepath],
always: ["*"],
metadata: {},
}),
)
if (!stat) return yield* miss(filepath)
if (stat.type === "Directory") {
const items = yield* list(filepath)
const limit = params.limit ?? DEFAULT_READ_LIMIT
const offset = params.offset ?? 1
const start = offset - 1
const sliced = items.slice(start, start + limit)
const truncated = start + sliced.length < items.length
return {
title,
output: [
`<path>${filepath}</path>`,
`<type>directory</type>`,
`<entries>`,
sliced.join("\n"),
truncated
? `\n(Showing ${sliced.length} of ${items.length} entries. Use 'offset' parameter to read beyond entry ${offset + sliced.length})`
: `\n(${items.length} entries)`,
`</entries>`,
].join("\n"),
metadata: {
preview: sliced.slice(0, 20).join("\n"),
truncated,
loaded: [] as string[],
},
}
}
const loaded = yield* deps.instruction.resolve(ctx.messages, filepath, ctx.messageID)
const mime = AppFileSystem.mimeType(filepath)
const isImage = mime.startsWith("image/") && mime !== "image/svg+xml" && mime !== "image/vnd.fastbidsheet"
const isPdf = mime === "application/pdf"
if (isImage || isPdf) {
const msg = `${isImage ? "Image" : "PDF"} read successfully`
return {
title,
output: msg,
metadata: {
preview: msg,
truncated: false,
loaded: loaded.map((item) => item.filepath),
},
attachments: [
{
type: "file" as const,
mime,
url: `data:${mime};base64,${Buffer.from(yield* deps.fs.readFile(filepath)).toString("base64")}`,
},
],
}
}
if (yield* Effect.promise(() => isBinaryFile(filepath, Number(stat.size)))) {
return yield* Effect.fail(new Error(`Cannot read binary file: ${filepath}`))
}
const file = yield* Effect.promise(() =>
lines(filepath, { limit: params.limit ?? DEFAULT_READ_LIMIT, offset: params.offset ?? 1 }),
)
if (file.count < file.offset && !(file.count === 0 && file.offset === 1)) {
return yield* Effect.fail(new Error(`Offset ${file.offset} is out of range for this file (${file.count} lines)`))
}
let output = [`<path>${filepath}</path>`, `<type>file</type>`, "<content>"].join("\n")
output += file.raw.map((line, i) => `${i + file.offset}: ${line}`).join("\n")
const last = file.offset + file.raw.length - 1
const next = last + 1
const truncated = file.more || file.cut
if (file.cut) {
output += `\n\n(Output capped at ${MAX_BYTES_LABEL}. Showing lines ${file.offset}-${last}. Use offset=${next} to continue.)`
} else if (file.more) {
output += `\n\n(Showing lines ${file.offset}-${last} of ${file.count}. Use offset=${next} to continue.)`
} else {
output += `\n\n(End of file - total ${file.count} lines)`
}
output += "\n</content>"
yield* warm(filepath, ctx.sessionID)
if (loaded.length > 0) {
output += `\n\n<system-reminder>\n${loaded.map((item) => item.content).join("\n\n")}\n</system-reminder>`
}
return {
title,
output,
metadata: {
preview: file.raw.slice(0, 20).join("\n"),
truncated,
loaded: loaded.map((item) => item.filepath),
},
}
})
export const ReadTool = Tool.defineEffect( export const ReadTool = Tool.defineEffect(
"read", "read",
Effect.gen(function* () { Effect.gen(function* () {
@ -33,195 +224,13 @@ export const ReadTool = Tool.defineEffect(
const lsp = yield* LSP.Service const lsp = yield* LSP.Service
const time = yield* FileTime.Service const time = yield* FileTime.Service
const scope = yield* Scope.Scope const scope = yield* Scope.Scope
const deps = { fs, instruction, lsp, time, scope } satisfies Deps
const miss = Effect.fn("ReadTool.miss")(function* (filepath: string) {
const dir = path.dirname(filepath)
const base = path.basename(filepath)
const items = yield* fs.readDirectory(dir).pipe(
Effect.map((items) =>
items
.filter(
(item) =>
item.toLowerCase().includes(base.toLowerCase()) || base.toLowerCase().includes(item.toLowerCase()),
)
.map((item) => path.join(dir, item))
.slice(0, 3),
),
Effect.catch(() => Effect.succeed([] as string[])),
)
if (items.length > 0) {
return yield* Effect.fail(
new Error(`File not found: ${filepath}\n\nDid you mean one of these?\n${items.join("\n")}`),
)
}
return yield* Effect.fail(new Error(`File not found: ${filepath}`))
})
const list = Effect.fn("ReadTool.list")(function* (filepath: string) {
const items = yield* fs.readDirectoryEntries(filepath)
return yield* Effect.forEach(
items,
Effect.fnUntraced(function* (item) {
if (item.type === "directory") return item.name + "/"
if (item.type !== "symlink") return item.name
const target = yield* fs
.stat(path.join(filepath, item.name))
.pipe(Effect.catch(() => Effect.succeed(undefined)))
if (target?.type === "Directory") return item.name + "/"
return item.name
}),
{ concurrency: "unbounded" },
).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b))))
})
const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, sessionID: Tool.Context["sessionID"]) {
yield* lsp.touchFile(filepath, false).pipe(Effect.ignore, Effect.forkIn(scope))
yield* time.read(sessionID, filepath)
})
const run = Effect.fn("ReadTool.execute")(function* (params: z.infer<typeof parameters>, ctx: Tool.Context) {
if (params.offset !== undefined && params.offset < 1) {
return yield* Effect.fail(new Error("offset must be greater than or equal to 1"))
}
let filepath = params.filePath
if (!path.isAbsolute(filepath)) {
filepath = path.resolve(Instance.directory, filepath)
}
if (process.platform === "win32") {
filepath = AppFileSystem.normalizePath(filepath)
}
const title = path.relative(Instance.worktree, filepath)
const stat = yield* fs.stat(filepath).pipe(
Effect.catchIf(
(err) => "reason" in err && err.reason._tag === "NotFound",
() => Effect.succeed(undefined),
),
)
yield* assertExternalDirectoryEffect(ctx, filepath, {
bypass: Boolean(ctx.extra?.["bypassCwdCheck"]),
kind: stat?.type === "Directory" ? "directory" : "file",
})
yield* Effect.promise(() =>
ctx.ask({
permission: "read",
patterns: [filepath],
always: ["*"],
metadata: {},
}),
)
if (!stat) return yield* miss(filepath)
if (stat.type === "Directory") {
const items = yield* list(filepath)
const limit = params.limit ?? DEFAULT_READ_LIMIT
const offset = params.offset ?? 1
const start = offset - 1
const sliced = items.slice(start, start + limit)
const truncated = start + sliced.length < items.length
return {
title,
output: [
`<path>${filepath}</path>`,
`<type>directory</type>`,
`<entries>`,
sliced.join("\n"),
truncated
? `\n(Showing ${sliced.length} of ${items.length} entries. Use 'offset' parameter to read beyond entry ${offset + sliced.length})`
: `\n(${items.length} entries)`,
`</entries>`,
].join("\n"),
metadata: {
preview: sliced.slice(0, 20).join("\n"),
truncated,
loaded: [] as string[],
},
}
}
const loaded = yield* instruction.resolve(ctx.messages, filepath, ctx.messageID)
const mime = AppFileSystem.mimeType(filepath)
const isImage = mime.startsWith("image/") && mime !== "image/svg+xml" && mime !== "image/vnd.fastbidsheet"
const isPdf = mime === "application/pdf"
if (isImage || isPdf) {
const msg = `${isImage ? "Image" : "PDF"} read successfully`
return {
title,
output: msg,
metadata: {
preview: msg,
truncated: false,
loaded: loaded.map((item) => item.filepath),
},
attachments: [
{
type: "file" as const,
mime,
url: `data:${mime};base64,${Buffer.from(yield* fs.readFile(filepath)).toString("base64")}`,
},
],
}
}
if (yield* Effect.promise(() => isBinaryFile(filepath, Number(stat.size)))) {
return yield* Effect.fail(new Error(`Cannot read binary file: ${filepath}`))
}
const file = yield* Effect.promise(() =>
lines(filepath, { limit: params.limit ?? DEFAULT_READ_LIMIT, offset: params.offset ?? 1 }),
)
if (file.count < file.offset && !(file.count === 0 && file.offset === 1)) {
return yield* Effect.fail(
new Error(`Offset ${file.offset} is out of range for this file (${file.count} lines)`),
)
}
let output = [`<path>${filepath}</path>`, `<type>file</type>`, "<content>"].join("\n")
output += file.raw.map((line, i) => `${i + file.offset}: ${line}`).join("\n")
const last = file.offset + file.raw.length - 1
const next = last + 1
const truncated = file.more || file.cut
if (file.cut) {
output += `\n\n(Output capped at ${MAX_BYTES_LABEL}. Showing lines ${file.offset}-${last}. Use offset=${next} to continue.)`
} else if (file.more) {
output += `\n\n(Showing lines ${file.offset}-${last} of ${file.count}. Use offset=${next} to continue.)`
} else {
output += `\n\n(End of file - total ${file.count} lines)`
}
output += "\n</content>"
yield* warm(filepath, ctx.sessionID)
if (loaded.length > 0) {
output += `\n\n<system-reminder>\n${loaded.map((item) => item.content).join("\n\n")}\n</system-reminder>`
}
return {
title,
output,
metadata: {
preview: file.raw.slice(0, 20).join("\n"),
truncated,
loaded: loaded.map((item) => item.filepath),
},
}
})
return { return {
description: DESCRIPTION, description: DESCRIPTION,
parameters, parameters,
async execute(params: z.infer<typeof parameters>, ctx) { async execute(params: z.infer<typeof parameters>, ctx) {
return Effect.runPromise(run(params, ctx).pipe(Effect.orDie)) return Effect.runPromise(run(deps, params, ctx).pipe(Effect.orDie))
}, },
} }
}), }),

View File

@ -0,0 +1,102 @@
import type { Agent } from "../agent/agent"
import { Config } from "../config/config"
import { Session } from "../session"
import { SessionPrompt } from "../session/prompt"
import { SessionID } from "../session/schema"
import type { ModelID, ProviderID } from "../provider/schema"
import { Effect } from "effect"
type Ref = {
providerID: ProviderID
modelID: ModelID
}
type Parts = Awaited<ReturnType<typeof SessionPrompt.resolvePromptParts>>
type Reply = Awaited<ReturnType<typeof SessionPrompt.prompt>>
type Deps = {
cfg: Effect.Effect<Config.Info>
get: (taskID: string) => Effect.Effect<Session.Info | undefined>
create: (input: { parentID: SessionID; title: string }) => Effect.Effect<Session.Info>
resolve: (prompt: string) => Effect.Effect<Parts>
prompt: (input: {
sessionID: SessionID
model: Ref
agent: string
tools: Record<string, boolean>
parts: Parts
}) => Effect.Effect<Reply>
}
type Input = {
parentID: SessionID
taskID?: string
description: string
prompt: string
agent: Agent.Info
model: Ref
abort?: AbortSignal
cancel?: (sessionID: SessionID) => Promise<void> | void
start?: (sessionID: SessionID, model: Ref) => Promise<void> | void
}
export function tools(agent: Agent.Info, cfg: Config.Info) {
const task = agent.permission.some((rule) => rule.permission === "task")
const todo = agent.permission.some((rule) => rule.permission === "todowrite")
return {
...(todo ? {} : { todowrite: false }),
...(task ? {} : { task: false }),
...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((tool) => [tool, false])),
}
}
export function output(sessionID: SessionID, text: string) {
return [
`task_id: ${sessionID} (for resuming to continue this task if needed)`,
"",
"<task_result>",
text,
"</task_result>",
].join("\n")
}
export const run = Effect.fn("Subtask.run")(function* (deps: Deps, input: Input) {
const cfg = yield* deps.cfg
const model = input.agent.model ?? input.model
const session = yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const found = input.taskID ? yield* restore(deps.get(input.taskID)) : undefined
const session = found
? found
: yield* restore(
deps.create({
parentID: input.parentID,
title: input.description + ` (@${input.agent.name} subagent)`,
}),
)
const start = input.start?.(session.id, model)
if (start) yield* Effect.promise(() => Promise.resolve(start))
return session
}),
)
if (input.abort?.aborted) {
const cancel = input.cancel?.(session.id)
if (cancel) yield* Effect.promise(() => Promise.resolve(cancel))
}
const result = yield* deps.prompt({
sessionID: session.id,
model,
agent: input.agent.name,
tools: tools(input.agent, cfg),
parts: yield* deps.resolve(input.prompt),
})
return {
sessionID: session.id,
model,
text: result.parts.findLast((part) => part.type === "text")?.text ?? "",
}
})

View File

@ -1,16 +1,17 @@
import { Tool } from "./tool" import { Tool } from "./tool"
import DESCRIPTION from "./task.txt" import DESCRIPTION from "./task.txt"
import z from "zod" import z from "zod"
import { Session } from "../session" import { Effect } from "effect"
import { SessionID, MessageID } from "../session/schema"
import { MessageV2 } from "../session/message-v2"
import { Identifier } from "../id/id"
import { Agent } from "../agent/agent"
import { SessionPrompt } from "../session/prompt"
import { iife } from "@/util/iife"
import { defer } from "@/util/defer"
import { Config } from "../config/config" import { Config } from "../config/config"
import { Session } from "../session"
import { SessionPrompt } from "../session/prompt"
import { MessageV2 } from "../session/message-v2"
import { Agent } from "../agent/agent"
import type { SessionID } from "../session/schema"
import { MessageID, SessionID as SessionRef } from "../session/schema"
import { defer } from "@/util/defer"
import { Permission } from "@/permission" import { Permission } from "@/permission"
import { output, run } from "./subtask"
const parameters = z.object({ const parameters = z.object({
description: z.string().describe("A short (3-5 words) description of the task"), description: z.string().describe("A short (3-5 words) description of the task"),
@ -45,8 +46,6 @@ export const TaskTool = Tool.define("task", async (ctx) => {
description, description,
parameters, parameters,
async execute(params: z.infer<typeof parameters>, ctx) { async execute(params: z.infer<typeof parameters>, ctx) {
const config = await Config.get()
// Skip permission check when user explicitly invoked via @ or command subtask // Skip permission check when user explicitly invoked via @ or command subtask
if (!ctx.extra?.bypassAgentCheck) { if (!ctx.extra?.bypassAgentCheck) {
await ctx.ask({ await ctx.ask({
@ -62,104 +61,60 @@ export const TaskTool = Tool.define("task", async (ctx) => {
const agent = await Agent.get(params.subagent_type) const agent = await Agent.get(params.subagent_type)
if (!agent) throw new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`) if (!agent) throw new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`)
const hasTaskPermission = agent.permission.some((rule) => rule.permission === "task")
const hasTodoWritePermission = agent.permission.some((rule) => rule.permission === "todowrite")
const session = await iife(async () => {
if (params.task_id) {
const found = await Session.get(SessionID.make(params.task_id)).catch(() => {})
if (found) return found
}
return await Session.create({
parentID: ctx.sessionID,
title: params.description + ` (@${agent.name} subagent)`,
permission: [
...(hasTodoWritePermission
? []
: [
{
permission: "todowrite" as const,
pattern: "*" as const,
action: "deny" as const,
},
]),
...(hasTaskPermission
? []
: [
{
permission: "task" as const,
pattern: "*" as const,
action: "deny" as const,
},
]),
...(config.experimental?.primary_tools?.map((t) => ({
pattern: "*",
action: "allow" as const,
permission: t,
})) ?? []),
],
})
})
const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }) const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID })
if (msg.info.role !== "assistant") throw new Error("Not an assistant message") if (msg.info.role !== "assistant") throw new Error("Not an assistant message")
const model = agent.model ?? { let child: SessionID | undefined
modelID: msg.info.modelID, const cancel = () => {
providerID: msg.info.providerID, if (!child) return
} SessionPrompt.cancel(child)
ctx.metadata({
title: params.description,
metadata: {
sessionId: session.id,
model,
},
})
const messageID = MessageID.ascending()
function cancel() {
SessionPrompt.cancel(session.id)
} }
ctx.abort.addEventListener("abort", cancel) ctx.abort.addEventListener("abort", cancel)
using _ = defer(() => ctx.abort.removeEventListener("abort", cancel)) using _ = defer(() => ctx.abort.removeEventListener("abort", cancel))
const promptParts = await SessionPrompt.resolvePromptParts(params.prompt)
const result = await SessionPrompt.prompt({ const task = await Effect.runPromise(
messageID, run(
sessionID: session.id, {
model: { cfg: Effect.promise(() => Config.get()),
modelID: model.modelID, get: (taskID) => Effect.promise(() => Session.get(SessionRef.make(taskID)).catch(() => undefined)),
providerID: model.providerID, create: (input) => Effect.promise(() => Session.create(input)),
}, resolve: (prompt) => Effect.promise(() => SessionPrompt.resolvePromptParts(prompt)),
agent: agent.name, prompt: (input) =>
tools: { Effect.promise(() => SessionPrompt.prompt({ ...input, messageID: MessageID.ascending() })),
...(hasTodoWritePermission ? {} : { todowrite: false }), },
...(hasTaskPermission ? {} : { task: false }), {
...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])), parentID: ctx.sessionID,
}, taskID: params.task_id,
parts: promptParts, description: params.description,
}) prompt: params.prompt,
agent,
const text = result.parts.findLast((x) => x.type === "text")?.text ?? "" abort: ctx.abort,
cancel: SessionPrompt.cancel,
const output = [ model: {
`task_id: ${session.id} (for resuming to continue this task if needed)`, modelID: msg.info.modelID,
"", providerID: msg.info.providerID,
"<task_result>", },
text, start(sessionID, model) {
"</task_result>", child = sessionID
].join("\n") ctx.metadata({
title: params.description,
metadata: {
sessionId: sessionID,
model,
},
})
},
},
),
)
return { return {
title: params.description, title: params.description,
metadata: { metadata: {
sessionId: session.id, sessionId: task.sessionID,
model, model: task.model,
}, },
output, output: output(task.sessionID, task.text),
} }
}, },
} }

View File

@ -26,6 +26,20 @@ export namespace Tool {
metadata(input: { title?: string; metadata?: M }): void metadata(input: { title?: string; metadata?: M }): void
ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void> ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void>
} }
export function context<M extends Metadata = Metadata>(
input: Omit<Context<M>, "abort" | "callID"> & {
abort?: AbortSignal
callID?: string
},
): Context<M> {
return {
...input,
abort: input.abort ?? new AbortController().signal,
callID: input.callID,
}
}
export interface Def<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> { export interface Def<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
description: string description: string
parameters: Parameters parameters: Parameters

View File

@ -250,7 +250,7 @@ describe("Runner", () => {
Effect.gen(function* () { Effect.gen(function* () {
const s = yield* Scope.Scope const s = yield* Scope.Scope
const runner = Runner.make<string>(s) const runner = Runner.make<string>(s)
const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done")) const result = yield* runner.startShell(Effect.succeed("shell-done"))
expect(result).toBe("shell-done") expect(result).toBe("shell-done")
expect(runner.busy).toBe(false) expect(runner.busy).toBe(false)
}), }),
@ -264,7 +264,7 @@ describe("Runner", () => {
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild) const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit) const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true) expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel yield* runner.cancel
@ -279,12 +279,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s) const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>() const gate = yield* Deferred.make<void>()
const sh = yield* runner const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit) const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true) expect(Exit.isFailure(exit)).toBe(true)
yield* Deferred.succeed(gate, undefined) yield* Deferred.succeed(gate, undefined)
@ -302,37 +300,26 @@ describe("Runner", () => {
}, },
}) })
const sh = yield* runner const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
.startShell((signal) =>
Effect.promise(
() =>
new Promise<string>((resolve) => {
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
}),
),
)
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit) const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true) expect(Exit.isFailure(exit)).toBe(true)
yield* runner.cancel yield* runner.cancel
const done = yield* Fiber.await(sh) const done = yield* Fiber.await(sh)
expect(Exit.isSuccess(done)).toBe(true) expect(Exit.isFailure(done)).toBe(true)
}), }),
) )
it.live( it.live(
"cancel interrupts shell that ignores abort signal", "cancel interrupts shell",
Effect.gen(function* () { Effect.gen(function* () {
const s = yield* Scope.Scope const s = yield* Scope.Scope
const runner = Runner.make<string>(s) const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>() const gate = yield* Deferred.make<void>()
const sh = yield* runner const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
const stop = yield* runner.cancel.pipe(Effect.forkChild) const stop = yield* runner.cancel.pipe(Effect.forkChild)
@ -356,9 +343,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s) const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>() const gate = yield* Deferred.make<void>()
const sh = yield* runner const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
expect(runner.state._tag).toBe("Shell") expect(runner.state._tag).toBe("Shell")
@ -384,9 +369,7 @@ describe("Runner", () => {
const calls = yield* Ref.make(0) const calls = yield* Ref.make(0)
const gate = yield* Deferred.make<void>() const gate = yield* Deferred.make<void>()
const sh = yield* runner const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
const work = Effect.gen(function* () { const work = Effect.gen(function* () {
@ -414,16 +397,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s) const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>() const gate = yield* Deferred.make<void>()
const sh = yield* runner const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
.startShell((signal) =>
Effect.promise(
() =>
new Promise<string>((resolve) => {
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
}),
),
)
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild) const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
@ -478,7 +452,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, { const runner = Runner.make<string>(s, {
onBusy: Ref.update(count, (n) => n + 1), onBusy: Ref.update(count, (n) => n + 1),
}) })
yield* runner.startShell((_signal) => Effect.succeed("done")) yield* runner.startShell(Effect.succeed("done"))
expect(yield* Ref.get(count)).toBe(1) expect(yield* Ref.get(count)).toBe(1)
}), }),
) )
@ -509,9 +483,7 @@ describe("Runner", () => {
const runner = Runner.make<string>(s) const runner = Runner.make<string>(s)
const gate = yield* Deferred.make<void>() const gate = yield* Deferred.make<void>()
const fiber = yield* runner const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
.pipe(Effect.forkChild)
yield* Effect.sleep("10 millis") yield* Effect.sleep("10 millis")
expect(runner.busy).toBe(true) expect(runner.busy).toBe(true)

View File

@ -1,8 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node" import { NodeFileSystem } from "@effect/platform-node"
import { expect, spyOn } from "bun:test" import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect" import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import path from "path" import path from "path"
import z from "zod"
import { Agent as AgentSvc } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent"
import { Bus } from "../../src/bus" import { Bus } from "../../src/bus"
import { Command } from "../../src/command" import { Command } from "../../src/command"
@ -29,7 +28,6 @@ import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { SessionStatus } from "../../src/session/status" import { SessionStatus } from "../../src/session/status"
import { Shell } from "../../src/shell/shell" import { Shell } from "../../src/shell/shell"
import { Snapshot } from "../../src/snapshot" import { Snapshot } from "../../src/snapshot"
import { TaskTool } from "../../src/tool/task"
import { ToolRegistry } from "../../src/tool/registry" import { ToolRegistry } from "../../src/tool/registry"
import { Truncate } from "../../src/tool/truncate" import { Truncate } from "../../src/tool/truncate"
import { Log } from "../../src/util/log" import { Log } from "../../src/util/log"
@ -629,41 +627,26 @@ it.live(
provideTmpdirInstance( provideTmpdirInstance(
(dir) => (dir) =>
Effect.gen(function* () { Effect.gen(function* () {
const ready = defer<void>() const { prompt, chat, sessions } = yield* boot()
const aborted = defer<void>() const llm = yield* TestLLMServer
const init = spyOn(TaskTool, "init").mockImplementation(async () => ({ yield* llm.hang
description: "task",
parameters: z.object({
description: z.string(),
prompt: z.string(),
subagent_type: z.string(),
task_id: z.string().optional(),
command: z.string().optional(),
}),
execute: async (_args, ctx) => {
ready.resolve()
ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true })
await new Promise<void>(() => {})
return {
title: "",
metadata: {
sessionId: SessionID.make("task"),
model: ref,
},
output: "",
}
},
}))
yield* Effect.addFinalizer(() => Effect.sync(() => init.mockRestore()))
const { prompt, chat } = yield* boot()
const msg = yield* user(chat.id, "hello") const msg = yield* user(chat.id, "hello")
yield* addSubtask(chat.id, msg.id) yield* addSubtask(chat.id, msg.id)
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
yield* Effect.gen(function* () {
while (true) {
const [child] = yield* sessions.children(chat.id)
if (child) {
const msgs = yield* sessions.messages({ sessionID: child.id })
if (msgs.some((msg) => msg.info.role === "assistant")) return
}
yield* Effect.sleep("10 millis")
}
})
yield* prompt.cancel(chat.id) yield* prompt.cancel(chat.id)
yield* Effect.promise(() => aborted.promise)
const exit = yield* Fiber.await(fiber) const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true) expect(Exit.isSuccess(exit)).toBe(true)

View File

@ -1,13 +1,28 @@
import { afterEach, describe, expect, test } from "bun:test" import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
import { Agent } from "../../src/agent/agent" import { Agent } from "../../src/agent/agent"
import { Config } from "../../src/config/config"
import { Instance } from "../../src/project/instance" import { Instance } from "../../src/project/instance"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { MessageV2 } from "../../src/session/message-v2"
import { SessionPrompt } from "../../src/session/prompt"
import { MessageID, SessionID } from "../../src/session/schema"
import { Session } from "../../src/session"
import { TaskTool } from "../../src/tool/task" import { TaskTool } from "../../src/tool/task"
import { tmpdir } from "../fixture/fixture" import { tmpdir } from "../fixture/fixture"
afterEach(async () => { afterEach(async () => {
mock.restore()
await Instance.disposeAll() await Instance.disposeAll()
}) })
function wait<T>() {
let done!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((resolve) => {
done = resolve
})
return { promise, done }
}
describe("tool.task", () => { describe("tool.task", () => {
test("description sorts subagents by name and is stable across calls", async () => { test("description sorts subagents by name and is stable across calls", async () => {
await using tmp = await tmpdir({ await using tmp = await tmpdir({
@ -46,4 +61,73 @@ describe("tool.task", () => {
}, },
}) })
}) })
test("cancels child session when aborted during creation", async () => {
const started = wait<void>()
const gate = wait<void>()
const parent = SessionID.make("parent")
const child = SessionID.make("child")
const messageID = MessageID.ascending()
const abort = new AbortController()
const agent: Agent.Info = {
name: "general",
description: "General agent",
mode: "subagent",
options: {},
permission: [],
}
const ref = {
providerID: ProviderID.make("test"),
modelID: ModelID.make("test-model"),
}
spyOn(Agent, "list").mockResolvedValue([agent])
spyOn(Agent, "get").mockResolvedValue(agent)
spyOn(Config, "get").mockResolvedValue({ experimental: {} } as Awaited<ReturnType<typeof Config.get>>)
spyOn(MessageV2, "get").mockResolvedValue({
info: {
role: "assistant",
providerID: ref.providerID,
modelID: ref.modelID,
},
} as Awaited<ReturnType<typeof MessageV2.get>>)
spyOn(Session, "get").mockRejectedValue(new Error("missing"))
spyOn(Session, "create").mockImplementation(async () => {
started.done()
await gate.promise
return { id: child } as Awaited<ReturnType<typeof Session.create>>
})
const cancel = spyOn(SessionPrompt, "cancel").mockResolvedValue()
spyOn(SessionPrompt, "resolvePromptParts").mockResolvedValue(
[] as Awaited<ReturnType<typeof SessionPrompt.resolvePromptParts>>,
)
spyOn(SessionPrompt, "prompt").mockResolvedValue({
parts: [{ type: "text", text: "done" }],
} as Awaited<ReturnType<typeof SessionPrompt.prompt>>)
const tool = await TaskTool.init()
const run = tool.execute(
{
description: "inspect bug",
prompt: "check it",
subagent_type: "general",
},
{
sessionID: parent,
messageID,
agent: "build",
abort: abort.signal,
messages: [],
metadata: () => {},
ask: async () => {},
},
)
await started.promise
abort.abort()
gate.done()
await run
expect(cancel).toHaveBeenCalledWith(child)
})
}) })