From a3bf9789198e2b61d26135824f461a4514bba178 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sun, 5 Apr 2026 11:56:39 -0400 Subject: [PATCH] refactor(prompt): remove prompt abort-signal plumbing Keep prompt reads, subtask execution, and shell runner wiring on shared Effect services so prompt no longer carries its own abort-signal adapters or duplicate task flow. --- packages/opencode/src/effect/runner.ts | 16 +- packages/opencode/src/session/prompt.ts | 294 +++++++------- .../opencode/src/tool/external-directory.ts | 6 +- packages/opencode/src/tool/read.ts | 377 +++++++++--------- packages/opencode/src/tool/subtask.ts | 87 ++++ packages/opencode/src/tool/task.ts | 145 +++---- packages/opencode/src/tool/tool.ts | 14 + packages/opencode/test/effect/runner.test.ts | 56 +-- .../test/session/prompt-effect.test.ts | 49 +-- 9 files changed, 530 insertions(+), 514 deletions(-) create mode 100644 packages/opencode/src/tool/subtask.ts diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index cb12b4c52b..62fc42dfb4 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -1,10 +1,10 @@ -import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect" +import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect" export interface Runner { readonly state: Runner.State readonly busy: boolean readonly ensureRunning: (work: Effect.Effect) => Effect.Effect - readonly startShell: (work: (signal: AbortSignal) => Effect.Effect) => Effect.Effect + readonly startShell: (work: Effect.Effect) => Effect.Effect readonly cancel: Effect.Effect } @@ -20,7 +20,6 @@ export namespace Runner { interface ShellHandle { id: number fiber: Fiber.Fiber - abort: AbortController } interface PendingHandle { @@ -102,9 +101,7 @@ export namespace Runner { const stopShell = (shell: ShellHandle) => Effect.gen(function* () { - shell.abort.abort() - const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis")) - if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber) + yield* Fiber.interrupt(shell.fiber) yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid) }) @@ -138,7 +135,7 @@ export namespace Runner { ), ) - const startShell = (work: (signal: AbortSignal) => Effect.Effect) => + const startShell = (work: Effect.Effect) => SynchronizedRef.modifyEffect( ref, Effect.fnUntraced(function* (st) { @@ -153,9 +150,8 @@ export namespace Runner { } yield* busy const id = next() - const abort = new AbortController() - const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber, abort } satisfies ShellHandle + const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) + const shell = { id, fiber } satisfies ShellHandle return [ Effect.gen(function* () { const exit = yield* Fiber.await(fiber) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 24996c8d4b..8a01672ea4 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -24,7 +24,6 @@ import { ToolRegistry } from "../tool/registry" import { Runner } from "@/effect/runner" import { MCP } from "../mcp" import { LSP } from "../lsp" -import { ReadTool } from "../tool/read" import { FileTime } from "../file/time" import { Flag } from "../flag/flag" import { ulid } from "ulid" @@ -33,11 +32,11 @@ import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import * as Stream from "effect/Stream" import { Command } from "../command" import { pathToFileURL, fileURLToPath } from "url" +import { Config } from "../config/config" import { ConfigMarkdown } from "../config/markdown" import { SessionSummary } from "./summary" import { NamedError } from "@opencode-ai/util/error" import { SessionProcessor } from "./processor" -import { TaskTool } from "@/tool/task" import { Tool } from "@/tool/tool" import { Permission } from "@/permission" import { SessionStatus } from "./status" @@ -47,6 +46,8 @@ import { AppFileSystem } from "@/filesystem" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" +import { run as read } from "@/tool/read" +import { output as subtaskOutput, run as subtask } from "@/tool/subtask" import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" import { makeRuntime } from "@/effect/run-service" @@ -101,6 +102,7 @@ export namespace SessionPrompt { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const scope = yield* Scope.Scope const instruction = yield* Instruction.Service + const llm = yield* LLM.Service const state = yield* InstanceState.make( Effect.fn("SessionPrompt.state")(function* () { @@ -218,26 +220,29 @@ export namespace SessionPrompt { const msgs = onlySubtasks ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] : yield* MessageV2.toModelMessagesEffect(context, mdl) - const text = yield* Effect.promise(async (signal) => { - const result = await LLM.stream({ + const text = yield* llm + .stream({ agent: ag, user: firstInfo, system: [], small: true, tools: {}, model: mdl, - abort: signal, sessionID: input.session.id, retries: 2, messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs], }) - return result.text - }) + .pipe( + Stream.runFold( + () => "", + (text: string, event: LLM.Event) => (event.type === "text-delta" ? text + event.text : text), + ), + ) const cleaned = text .replace(/[\s\S]*?<\/think>\s*/g, "") .split("\n") - .map((line) => line.trim()) - .find((line) => line.length > 0) + .map((line: string) => line.trim()) + .find((line: string) => line.length > 0) if (!cleaned) return const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned yield* sessions @@ -397,41 +402,42 @@ NOTE: At any point in time through this workflow you should feel free to ask the using _ = log.time("resolveTools") const tools: Record = {} - const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({ - sessionID: input.session.id, - abort: options.abortSignal!, - messageID: input.processor.message.id, - callID: options.toolCallId, - extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck }, - agent: input.agent.name, - messages: input.messages, - metadata: (val) => - Effect.runPromise( - Effect.gen(function* () { - const match = input.processor.partFromToolCall(options.toolCallId) - if (!match || !["running", "pending"].includes(match.state.status)) return - yield* sessions.updatePart({ - ...match, - state: { - title: val.title, - metadata: val.metadata, - status: "running", - input: args, - time: { start: Date.now() }, - }, - }) - }), - ), - ask: (req) => - Effect.runPromise( - permission.ask({ - ...req, - sessionID: input.session.id, - tool: { messageID: input.processor.message.id, callID: options.toolCallId }, - ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []), - }), - ), - }) + const context = (args: any, options: ToolExecutionOptions): Tool.Context => + Tool.context({ + abort: options.abortSignal, + callID: options.toolCallId, + sessionID: input.session.id, + messageID: input.processor.message.id, + extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck }, + agent: input.agent.name, + messages: input.messages, + metadata: (val) => + Effect.runPromise( + Effect.gen(function* () { + const match = input.processor.partFromToolCall(options.toolCallId) + if (!match || !["running", "pending"].includes(match.state.status)) return + yield* sessions.updatePart({ + ...match, + state: { + title: val.title, + metadata: val.metadata, + status: "running", + input: args, + time: { start: Date.now() }, + }, + }) + }), + ), + ask: (req) => + Effect.runPromise( + permission.ask({ + ...req, + sessionID: input.session.id, + tool: { messageID: input.processor.message.id, callID: options.toolCallId }, + ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []), + }), + ), + }) for (const item of yield* registry.tools( { modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID }, @@ -555,13 +561,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the model: Provider.Model lastUser: MessageV2.User sessionID: SessionID - session: Session.Info - msgs: MessageV2.WithParts[] }) { - const { task, model, lastUser, sessionID, session, msgs } = input + const { task, model, lastUser, sessionID } = input const ctx = yield* InstanceState.context - const taskTool = yield* Effect.promise(() => registry.named.task.init()) + const taskAgent = yield* agents.get(task.agent) + if (!taskAgent) { + const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) + const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" + const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` }) + yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() }) + throw error + } + const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model + const taskRef = { providerID: taskModel.providerID, modelID: taskModel.id } const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({ id: MessageID.ascending(), role: "assistant", @@ -601,57 +614,71 @@ NOTE: At any point in time through this workflow you should feel free to ask the subagent_type: task.agent, command: task.command, } - yield* plugin.trigger("tool.execute.before", { tool: "task", sessionID, callID: part.id }, { args: taskArgs }) - - const taskAgent = yield* agents.get(task.agent) - if (!taskAgent) { - const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) - const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" - const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` }) - yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() }) - throw error - } + yield* plugin.trigger( + "tool.execute.before", + { tool: "task", sessionID, callID: part.callID }, + { args: taskArgs }, + ) + let child: SessionID | undefined let error: Error | undefined - const result = yield* Effect.promise((signal) => - taskTool - .execute(taskArgs, { - agent: task.agent, - messageID: assistantMessage.id, - sessionID, - abort: signal, - callID: part.callID, - extra: { bypassAgentCheck: true }, - messages: msgs, - metadata(val: { title?: string; metadata?: Record }) { - return Effect.runPromise( - Effect.gen(function* () { - part = yield* sessions.updatePart({ - ...part, - type: "tool", - state: { ...part.state, ...val }, - } satisfies MessageV2.ToolPart) - }), - ) - }, - ask(req: any) { - return Effect.runPromise( - permission.ask({ - ...req, - sessionID, - ruleset: Permission.merge(taskAgent.permission, session.permission ?? []), - }), - ) - }, - }) - .catch((e) => { - error = e instanceof Error ? e : new Error(String(e)) - log.error("subtask execution failed", { error, agent: task.agent, description: task.description }) - return undefined - }), + const result = yield* subtask( + { + cfg: Effect.promise(() => Config.get()), + get: (taskID) => sessions.get(SessionID.make(taskID)).pipe(Effect.catch(() => Effect.succeed(undefined))), + create: (input) => sessions.create(input), + resolve: resolvePromptParts, + prompt: (input) => prompt({ ...input, messageID: MessageID.ascending() }), + }, + { + parentID: sessionID, + description: task.description, + prompt: task.prompt, + agent: taskAgent, + model: taskRef, + start(sessionID, model) { + child = sessionID + const metadata = { sessionId: sessionID, model } + return Effect.runPromise( + sessions.updatePart({ + ...part, + state: { + status: "running", + input: part.state.input, + time: part.state.status === "running" ? part.state.time : { start: Date.now() }, + title: task.description, + metadata, + }, + } satisfies MessageV2.ToolPart), + ).then((next) => { + part = next + }) + }, + }, ).pipe( + Effect.flatMap((sub) => + truncate.output(subtaskOutput(sub.sessionID, sub.text), {}).pipe( + Effect.map((truncated) => ({ + title: task.description, + metadata: { + sessionId: sub.sessionID, + model: sub.model, + truncated: truncated.truncated, + ...(truncated.truncated && { outputPath: truncated.outputPath }), + }, + output: truncated.content, + })), + ), + ), + Effect.catchCause((cause) => { + const err = Cause.squash(cause) + error = err instanceof Error ? err : new Error(String(err)) + log.error("subtask execution failed", { error, agent: task.agent, description: task.description }) + return Effect.succeed(undefined) + }), Effect.onInterrupt(() => Effect.gen(function* () { + if (child) yield* cancel(child) assistantMessage.finish = "tool-calls" assistantMessage.time.completed = Date.now() yield* sessions.updateMessage(assistantMessage) @@ -671,16 +698,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the ), ) - const attachments = result?.attachments?.map((attachment) => ({ - ...attachment, - id: PartID.ascending(), - sessionID, - messageID: assistantMessage.id, - })) - yield* plugin.trigger( "tool.execute.after", - { tool: "task", sessionID, callID: part.id, args: taskArgs }, + { tool: "task", sessionID, callID: part.callID, args: taskArgs }, result, ) @@ -697,7 +717,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the title: result.title, metadata: result.metadata, output: result.output, - attachments, time: { ...part.state.time, end: Date.now() }, }, } satisfies MessageV2.ToolPart) @@ -740,7 +759,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the } satisfies MessageV2.TextPart) }) - const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) { + const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { const ctx = yield* InstanceState.context const session = yield* sessions.get(input.sessionID) if (session.revert) { @@ -1073,6 +1092,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the log.info("file", { mime: part.mime }) const filepath = fileURLToPath(part.url) if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory" + const readCtx = Tool.context({ + sessionID: input.sessionID, + agent: info.agent, + messageID: info.id, + extra: { bypassCwdCheck: true }, + messages: [], + metadata: () => {}, + ask: async () => {}, + }) if (part.mime === "text/plain") { let offset: number | undefined @@ -1110,29 +1138,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the text: `Called the Read tool with the following input: ${JSON.stringify(args)}`, }, ] - const read = yield* Effect.promise(() => registry.named.read.init()).pipe( - Effect.flatMap((t) => - provider.getModel(info.model.providerID, info.model.modelID).pipe( - Effect.flatMap((mdl) => - Effect.promise(() => - t.execute(args, { - sessionID: input.sessionID, - abort: new AbortController().signal, - agent: input.agent!, - messageID: info.id, - extra: { bypassCwdCheck: true, model: mdl }, - messages: [], - metadata: async () => {}, - ask: async () => {}, - }), - ), - ), - ), - ), - Effect.exit, - ) - if (Exit.isSuccess(read)) { - const result = read.value + const readResult = yield* read( + { fs: fsys, instruction, lsp, time: filetime, scope }, + args, + readCtx, + ).pipe(Effect.exit) + if (Exit.isSuccess(readResult)) { + const result = readResult.value pieces.push({ messageID: info.id, sessionID: input.sessionID, @@ -1145,7 +1157,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the ...result.attachments.map((a) => ({ ...a, synthetic: true, - filename: a.filename ?? part.filename, + filename: "filename" in a && typeof a.filename === "string" ? a.filename : part.filename, messageID: info.id, sessionID: input.sessionID, })), @@ -1154,7 +1166,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID }) } } else { - const error = Cause.squash(read.cause) + const error = Cause.squash(readResult.cause) log.error("failed to read file", { error }) const message = error instanceof Error ? error.message : String(error) yield* bus.publish(Session.Event.Error, { @@ -1174,21 +1186,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the if (part.mime === "application/x-directory") { const args = { filePath: filepath } - const result = yield* Effect.promise(() => registry.named.read.init()).pipe( - Effect.flatMap((t) => - Effect.promise(() => - t.execute(args, { - sessionID: input.sessionID, - abort: new AbortController().signal, - agent: input.agent!, - messageID: info.id, - extra: { bypassCwdCheck: true }, - messages: [], - metadata: async () => {}, - ask: async () => {}, - }), - ), - ), + const result = yield* read({ fs: fsys, instruction, lsp, time: filetime, scope }, args, readCtx).pipe( + Effect.orDie, ) return [ { @@ -1332,7 +1331,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the } if (latest) return latest throw new Error("Impossible") - }) + }).pipe(Effect.orDie) const runLoop: (sessionID: SessionID) => Effect.Effect = Effect.fn("SessionPrompt.run")( function* (sessionID: SessionID) { @@ -1393,7 +1392,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const task = tasks.pop() if (task?.type === "subtask") { - yield* handleSubtask({ task, model, lastUser, sessionID, session, msgs }) + yield* handleSubtask({ task, model, lastUser, sessionID }) continue } @@ -1577,7 +1576,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the function* (input: ShellInput) { const s = yield* InstanceState.get(state) const runner = getRunner(s.runners, input.sessionID) - return yield* runner.startShell((signal) => shellImpl(input, signal)) + return yield* runner.startShell(shellImpl(input)) }, ) @@ -1722,6 +1721,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the Layer.provide(FileTime.defaultLayer), Layer.provide(ToolRegistry.defaultLayer), Layer.provide(Truncate.layer), + Layer.provide(LLM.defaultLayer), Layer.provide(Provider.defaultLayer), Layer.provide(Instruction.defaultLayer), Layer.provide(AppFileSystem.defaultLayer), diff --git a/packages/opencode/src/tool/external-directory.ts b/packages/opencode/src/tool/external-directory.ts index f11455cf59..186949b041 100644 --- a/packages/opencode/src/tool/external-directory.ts +++ b/packages/opencode/src/tool/external-directory.ts @@ -11,7 +11,9 @@ type Options = { kind?: Kind } -export async function assertExternalDirectory(ctx: Tool.Context, target?: string, options?: Options) { +type Ctx = Pick + +export async function assertExternalDirectory(ctx: Ctx, target?: string, options?: Options) { if (!target) return if (options?.bypass) return @@ -38,7 +40,7 @@ export async function assertExternalDirectory(ctx: Tool.Context, target?: string } export const assertExternalDirectoryEffect = Effect.fn("Tool.assertExternalDirectory")(function* ( - ctx: Tool.Context, + ctx: Ctx, target?: string, options?: Options, ) { diff --git a/packages/opencode/src/tool/read.ts b/packages/opencode/src/tool/read.ts index 366993020b..a12c3a0616 100644 --- a/packages/opencode/src/tool/read.ts +++ b/packages/opencode/src/tool/read.ts @@ -25,6 +25,197 @@ const parameters = z.object({ limit: z.coerce.number().describe("The maximum number of lines to read (defaults to 2000)").optional(), }) +type Ctx = Omit + +type Deps = { + fs: AppFileSystem.Interface + instruction: Instruction.Interface + lsp: LSP.Interface + time: FileTime.Interface + scope: Scope.Scope +} + +export const run = Effect.fn("ReadTool.run")(function* (deps: Deps, params: z.infer, ctx: Ctx) { + const miss = Effect.fn("ReadTool.miss")(function* (filepath: string) { + const dir = path.dirname(filepath) + const base = path.basename(filepath) + const items = yield* deps.fs.readDirectory(dir).pipe( + Effect.map((items) => + items + .filter( + (item) => + item.toLowerCase().includes(base.toLowerCase()) || base.toLowerCase().includes(item.toLowerCase()), + ) + .map((item) => path.join(dir, item)) + .slice(0, 3), + ), + Effect.catch(() => Effect.succeed([] as string[])), + ) + + if (items.length > 0) { + return yield* Effect.fail( + new Error(`File not found: ${filepath}\n\nDid you mean one of these?\n${items.join("\n")}`), + ) + } + + return yield* Effect.fail(new Error(`File not found: ${filepath}`)) + }) + + const list = Effect.fn("ReadTool.list")(function* (filepath: string) { + const items = yield* deps.fs.readDirectoryEntries(filepath) + return yield* Effect.forEach( + items, + Effect.fnUntraced(function* (item) { + if (item.type === "directory") return item.name + "/" + if (item.type !== "symlink") return item.name + + const target = yield* deps.fs + .stat(path.join(filepath, item.name)) + .pipe(Effect.catch(() => Effect.succeed(undefined))) + if (target?.type === "Directory") return item.name + "/" + return item.name + }), + { concurrency: "unbounded" }, + ).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b)))) + }) + + const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, sessionID: Ctx["sessionID"]) { + yield* deps.lsp.touchFile(filepath, false).pipe(Effect.ignore, Effect.forkIn(deps.scope)) + yield* deps.time.read(sessionID, filepath) + }) + + if (params.offset !== undefined && params.offset < 1) { + return yield* Effect.fail(new Error("offset must be greater than or equal to 1")) + } + + let filepath = params.filePath + if (!path.isAbsolute(filepath)) { + filepath = path.resolve(Instance.directory, filepath) + } + if (process.platform === "win32") { + filepath = AppFileSystem.normalizePath(filepath) + } + const title = path.relative(Instance.worktree, filepath) + + const stat = yield* deps.fs.stat(filepath).pipe( + Effect.catchIf( + (err) => "reason" in err && err.reason._tag === "NotFound", + () => Effect.succeed(undefined), + ), + ) + + yield* assertExternalDirectoryEffect(ctx, filepath, { + bypass: Boolean(ctx.extra?.["bypassCwdCheck"]), + kind: stat?.type === "Directory" ? "directory" : "file", + }) + + yield* Effect.promise(() => + ctx.ask({ + permission: "read", + patterns: [filepath], + always: ["*"], + metadata: {}, + }), + ) + + if (!stat) return yield* miss(filepath) + + if (stat.type === "Directory") { + const items = yield* list(filepath) + const limit = params.limit ?? DEFAULT_READ_LIMIT + const offset = params.offset ?? 1 + const start = offset - 1 + const sliced = items.slice(start, start + limit) + const truncated = start + sliced.length < items.length + + return { + title, + output: [ + `${filepath}`, + `directory`, + ``, + sliced.join("\n"), + truncated + ? `\n(Showing ${sliced.length} of ${items.length} entries. Use 'offset' parameter to read beyond entry ${offset + sliced.length})` + : `\n(${items.length} entries)`, + ``, + ].join("\n"), + metadata: { + preview: sliced.slice(0, 20).join("\n"), + truncated, + loaded: [] as string[], + }, + } + } + + const loaded = yield* deps.instruction.resolve(ctx.messages, filepath, ctx.messageID) + + const mime = AppFileSystem.mimeType(filepath) + const isImage = mime.startsWith("image/") && mime !== "image/svg+xml" && mime !== "image/vnd.fastbidsheet" + const isPdf = mime === "application/pdf" + if (isImage || isPdf) { + const msg = `${isImage ? "Image" : "PDF"} read successfully` + return { + title, + output: msg, + metadata: { + preview: msg, + truncated: false, + loaded: loaded.map((item) => item.filepath), + }, + attachments: [ + { + type: "file" as const, + mime, + url: `data:${mime};base64,${Buffer.from(yield* deps.fs.readFile(filepath)).toString("base64")}`, + }, + ], + } + } + + if (yield* Effect.promise(() => isBinaryFile(filepath, Number(stat.size)))) { + return yield* Effect.fail(new Error(`Cannot read binary file: ${filepath}`)) + } + + const file = yield* Effect.promise(() => + lines(filepath, { limit: params.limit ?? DEFAULT_READ_LIMIT, offset: params.offset ?? 1 }), + ) + if (file.count < file.offset && !(file.count === 0 && file.offset === 1)) { + return yield* Effect.fail(new Error(`Offset ${file.offset} is out of range for this file (${file.count} lines)`)) + } + + let output = [`${filepath}`, `file`, ""].join("\n") + output += file.raw.map((line, i) => `${i + file.offset}: ${line}`).join("\n") + + const last = file.offset + file.raw.length - 1 + const next = last + 1 + const truncated = file.more || file.cut + if (file.cut) { + output += `\n\n(Output capped at ${MAX_BYTES_LABEL}. Showing lines ${file.offset}-${last}. Use offset=${next} to continue.)` + } else if (file.more) { + output += `\n\n(Showing lines ${file.offset}-${last} of ${file.count}. Use offset=${next} to continue.)` + } else { + output += `\n\n(End of file - total ${file.count} lines)` + } + output += "\n" + + yield* warm(filepath, ctx.sessionID) + + if (loaded.length > 0) { + output += `\n\n\n${loaded.map((item) => item.content).join("\n\n")}\n` + } + + return { + title, + output, + metadata: { + preview: file.raw.slice(0, 20).join("\n"), + truncated, + loaded: loaded.map((item) => item.filepath), + }, + } +}) + export const ReadTool = Tool.defineEffect( "read", Effect.gen(function* () { @@ -33,195 +224,13 @@ export const ReadTool = Tool.defineEffect( const lsp = yield* LSP.Service const time = yield* FileTime.Service const scope = yield* Scope.Scope - - const miss = Effect.fn("ReadTool.miss")(function* (filepath: string) { - const dir = path.dirname(filepath) - const base = path.basename(filepath) - const items = yield* fs.readDirectory(dir).pipe( - Effect.map((items) => - items - .filter( - (item) => - item.toLowerCase().includes(base.toLowerCase()) || base.toLowerCase().includes(item.toLowerCase()), - ) - .map((item) => path.join(dir, item)) - .slice(0, 3), - ), - Effect.catch(() => Effect.succeed([] as string[])), - ) - - if (items.length > 0) { - return yield* Effect.fail( - new Error(`File not found: ${filepath}\n\nDid you mean one of these?\n${items.join("\n")}`), - ) - } - - return yield* Effect.fail(new Error(`File not found: ${filepath}`)) - }) - - const list = Effect.fn("ReadTool.list")(function* (filepath: string) { - const items = yield* fs.readDirectoryEntries(filepath) - return yield* Effect.forEach( - items, - Effect.fnUntraced(function* (item) { - if (item.type === "directory") return item.name + "/" - if (item.type !== "symlink") return item.name - - const target = yield* fs - .stat(path.join(filepath, item.name)) - .pipe(Effect.catch(() => Effect.succeed(undefined))) - if (target?.type === "Directory") return item.name + "/" - return item.name - }), - { concurrency: "unbounded" }, - ).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b)))) - }) - - const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, sessionID: Tool.Context["sessionID"]) { - yield* lsp.touchFile(filepath, false).pipe(Effect.ignore, Effect.forkIn(scope)) - yield* time.read(sessionID, filepath) - }) - - const run = Effect.fn("ReadTool.execute")(function* (params: z.infer, ctx: Tool.Context) { - if (params.offset !== undefined && params.offset < 1) { - return yield* Effect.fail(new Error("offset must be greater than or equal to 1")) - } - - let filepath = params.filePath - if (!path.isAbsolute(filepath)) { - filepath = path.resolve(Instance.directory, filepath) - } - if (process.platform === "win32") { - filepath = AppFileSystem.normalizePath(filepath) - } - const title = path.relative(Instance.worktree, filepath) - - const stat = yield* fs.stat(filepath).pipe( - Effect.catchIf( - (err) => "reason" in err && err.reason._tag === "NotFound", - () => Effect.succeed(undefined), - ), - ) - - yield* assertExternalDirectoryEffect(ctx, filepath, { - bypass: Boolean(ctx.extra?.["bypassCwdCheck"]), - kind: stat?.type === "Directory" ? "directory" : "file", - }) - - yield* Effect.promise(() => - ctx.ask({ - permission: "read", - patterns: [filepath], - always: ["*"], - metadata: {}, - }), - ) - - if (!stat) return yield* miss(filepath) - - if (stat.type === "Directory") { - const items = yield* list(filepath) - const limit = params.limit ?? DEFAULT_READ_LIMIT - const offset = params.offset ?? 1 - const start = offset - 1 - const sliced = items.slice(start, start + limit) - const truncated = start + sliced.length < items.length - - return { - title, - output: [ - `${filepath}`, - `directory`, - ``, - sliced.join("\n"), - truncated - ? `\n(Showing ${sliced.length} of ${items.length} entries. Use 'offset' parameter to read beyond entry ${offset + sliced.length})` - : `\n(${items.length} entries)`, - ``, - ].join("\n"), - metadata: { - preview: sliced.slice(0, 20).join("\n"), - truncated, - loaded: [] as string[], - }, - } - } - - const loaded = yield* instruction.resolve(ctx.messages, filepath, ctx.messageID) - - const mime = AppFileSystem.mimeType(filepath) - const isImage = mime.startsWith("image/") && mime !== "image/svg+xml" && mime !== "image/vnd.fastbidsheet" - const isPdf = mime === "application/pdf" - if (isImage || isPdf) { - const msg = `${isImage ? "Image" : "PDF"} read successfully` - return { - title, - output: msg, - metadata: { - preview: msg, - truncated: false, - loaded: loaded.map((item) => item.filepath), - }, - attachments: [ - { - type: "file" as const, - mime, - url: `data:${mime};base64,${Buffer.from(yield* fs.readFile(filepath)).toString("base64")}`, - }, - ], - } - } - - if (yield* Effect.promise(() => isBinaryFile(filepath, Number(stat.size)))) { - return yield* Effect.fail(new Error(`Cannot read binary file: ${filepath}`)) - } - - const file = yield* Effect.promise(() => - lines(filepath, { limit: params.limit ?? DEFAULT_READ_LIMIT, offset: params.offset ?? 1 }), - ) - if (file.count < file.offset && !(file.count === 0 && file.offset === 1)) { - return yield* Effect.fail( - new Error(`Offset ${file.offset} is out of range for this file (${file.count} lines)`), - ) - } - - let output = [`${filepath}`, `file`, ""].join("\n") - output += file.raw.map((line, i) => `${i + file.offset}: ${line}`).join("\n") - - const last = file.offset + file.raw.length - 1 - const next = last + 1 - const truncated = file.more || file.cut - if (file.cut) { - output += `\n\n(Output capped at ${MAX_BYTES_LABEL}. Showing lines ${file.offset}-${last}. Use offset=${next} to continue.)` - } else if (file.more) { - output += `\n\n(Showing lines ${file.offset}-${last} of ${file.count}. Use offset=${next} to continue.)` - } else { - output += `\n\n(End of file - total ${file.count} lines)` - } - output += "\n" - - yield* warm(filepath, ctx.sessionID) - - if (loaded.length > 0) { - output += `\n\n\n${loaded.map((item) => item.content).join("\n\n")}\n` - } - - return { - title, - output, - metadata: { - preview: file.raw.slice(0, 20).join("\n"), - truncated, - loaded: loaded.map((item) => item.filepath), - }, - } - }) + const deps = { fs, instruction, lsp, time, scope } satisfies Deps return { description: DESCRIPTION, parameters, async execute(params: z.infer, ctx) { - return Effect.runPromise(run(params, ctx).pipe(Effect.orDie)) + return Effect.runPromise(run(deps, params, ctx).pipe(Effect.orDie)) }, } }), diff --git a/packages/opencode/src/tool/subtask.ts b/packages/opencode/src/tool/subtask.ts new file mode 100644 index 0000000000..1f7f75815f --- /dev/null +++ b/packages/opencode/src/tool/subtask.ts @@ -0,0 +1,87 @@ +import type { Agent } from "../agent/agent" +import { Config } from "../config/config" +import { Session } from "../session" +import { SessionPrompt } from "../session/prompt" +import { SessionID } from "../session/schema" +import type { ModelID, ProviderID } from "../provider/schema" +import { Effect } from "effect" + +type Ref = { + providerID: ProviderID + modelID: ModelID +} + +type Parts = Awaited> +type Reply = Awaited> + +type Deps = { + cfg: Effect.Effect + get: (taskID: string) => Effect.Effect + create: (input: { parentID: SessionID; title: string }) => Effect.Effect + resolve: (prompt: string) => Effect.Effect + prompt: (input: { + sessionID: SessionID + model: Ref + agent: string + tools: Record + parts: Parts + }) => Effect.Effect +} + +type Input = { + parentID: SessionID + taskID?: string + description: string + prompt: string + agent: Agent.Info + model: Ref + start?: (sessionID: SessionID, model: Ref) => Promise | void +} + +export function tools(agent: Agent.Info, cfg: Config.Info) { + const task = agent.permission.some((rule) => rule.permission === "task") + const todo = agent.permission.some((rule) => rule.permission === "todowrite") + return { + ...(todo ? {} : { todowrite: false }), + ...(task ? {} : { task: false }), + ...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((tool) => [tool, false])), + } +} + +export function output(sessionID: SessionID, text: string) { + return [ + `task_id: ${sessionID} (for resuming to continue this task if needed)`, + "", + "", + text, + "", + ].join("\n") +} + +export const run = Effect.fn("Subtask.run")(function* (deps: Deps, input: Input) { + const cfg = yield* deps.cfg + const model = input.agent.model ?? input.model + const found = input.taskID ? yield* deps.get(input.taskID) : undefined + const session = found + ? found + : yield* deps.create({ + parentID: input.parentID, + title: input.description + ` (@${input.agent.name} subagent)`, + }) + + yield* Effect.promise(() => Promise.resolve(input.start?.(session.id, model))) + + const result = yield* deps.prompt({ + sessionID: session.id, + model, + agent: input.agent.name, + tools: tools(input.agent, cfg), + parts: yield* deps.resolve(input.prompt), + }) + + return { + sessionID: session.id, + model, + text: result.parts.findLast((part) => part.type === "text")?.text ?? "", + } +}) diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index af130a70d9..280cc2a533 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -1,16 +1,17 @@ import { Tool } from "./tool" import DESCRIPTION from "./task.txt" import z from "zod" -import { Session } from "../session" -import { SessionID, MessageID } from "../session/schema" -import { MessageV2 } from "../session/message-v2" -import { Identifier } from "../id/id" -import { Agent } from "../agent/agent" -import { SessionPrompt } from "../session/prompt" -import { iife } from "@/util/iife" -import { defer } from "@/util/defer" +import { Effect } from "effect" import { Config } from "../config/config" +import { Session } from "../session" +import { SessionPrompt } from "../session/prompt" +import { MessageV2 } from "../session/message-v2" +import { Agent } from "../agent/agent" +import type { SessionID } from "../session/schema" +import { MessageID, SessionID as SessionRef } from "../session/schema" +import { defer } from "@/util/defer" import { Permission } from "@/permission" +import { output, run } from "./subtask" const parameters = z.object({ description: z.string().describe("A short (3-5 words) description of the task"), @@ -45,8 +46,6 @@ export const TaskTool = Tool.define("task", async (ctx) => { description, parameters, async execute(params: z.infer, ctx) { - const config = await Config.get() - // Skip permission check when user explicitly invoked via @ or command subtask if (!ctx.extra?.bypassAgentCheck) { await ctx.ask({ @@ -62,104 +61,58 @@ export const TaskTool = Tool.define("task", async (ctx) => { const agent = await Agent.get(params.subagent_type) if (!agent) throw new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`) - - const hasTaskPermission = agent.permission.some((rule) => rule.permission === "task") - const hasTodoWritePermission = agent.permission.some((rule) => rule.permission === "todowrite") - - const session = await iife(async () => { - if (params.task_id) { - const found = await Session.get(SessionID.make(params.task_id)).catch(() => {}) - if (found) return found - } - - return await Session.create({ - parentID: ctx.sessionID, - title: params.description + ` (@${agent.name} subagent)`, - permission: [ - ...(hasTodoWritePermission - ? [] - : [ - { - permission: "todowrite" as const, - pattern: "*" as const, - action: "deny" as const, - }, - ]), - ...(hasTaskPermission - ? [] - : [ - { - permission: "task" as const, - pattern: "*" as const, - action: "deny" as const, - }, - ]), - ...(config.experimental?.primary_tools?.map((t) => ({ - pattern: "*", - action: "allow" as const, - permission: t, - })) ?? []), - ], - }) - }) const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }) if (msg.info.role !== "assistant") throw new Error("Not an assistant message") - const model = agent.model ?? { - modelID: msg.info.modelID, - providerID: msg.info.providerID, - } - - ctx.metadata({ - title: params.description, - metadata: { - sessionId: session.id, - model, - }, - }) - - const messageID = MessageID.ascending() - - function cancel() { - SessionPrompt.cancel(session.id) + let child: SessionID | undefined + const cancel = () => { + if (!child) return + SessionPrompt.cancel(child) } ctx.abort.addEventListener("abort", cancel) using _ = defer(() => ctx.abort.removeEventListener("abort", cancel)) - const promptParts = await SessionPrompt.resolvePromptParts(params.prompt) - const result = await SessionPrompt.prompt({ - messageID, - sessionID: session.id, - model: { - modelID: model.modelID, - providerID: model.providerID, - }, - agent: agent.name, - tools: { - ...(hasTodoWritePermission ? {} : { todowrite: false }), - ...(hasTaskPermission ? {} : { task: false }), - ...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])), - }, - parts: promptParts, - }) - - const text = result.parts.findLast((x) => x.type === "text")?.text ?? "" - - const output = [ - `task_id: ${session.id} (for resuming to continue this task if needed)`, - "", - "", - text, - "", - ].join("\n") + const task = await Effect.runPromise( + run( + { + cfg: Effect.promise(() => Config.get()), + get: (taskID) => Effect.promise(() => Session.get(SessionRef.make(taskID)).catch(() => undefined)), + create: (input) => Effect.promise(() => Session.create(input)), + resolve: (prompt) => Effect.promise(() => SessionPrompt.resolvePromptParts(prompt)), + prompt: (input) => + Effect.promise(() => SessionPrompt.prompt({ ...input, messageID: MessageID.ascending() })), + }, + { + parentID: ctx.sessionID, + taskID: params.task_id, + description: params.description, + prompt: params.prompt, + agent, + model: { + modelID: msg.info.modelID, + providerID: msg.info.providerID, + }, + start(sessionID, model) { + child = sessionID + ctx.metadata({ + title: params.description, + metadata: { + sessionId: sessionID, + model, + }, + }) + }, + }, + ), + ) return { title: params.description, metadata: { - sessionId: session.id, - model, + sessionId: task.sessionID, + model: task.model, }, - output, + output: output(task.sessionID, task.text), } }, } diff --git a/packages/opencode/src/tool/tool.ts b/packages/opencode/src/tool/tool.ts index a107dad7e8..f49ffdb35b 100644 --- a/packages/opencode/src/tool/tool.ts +++ b/packages/opencode/src/tool/tool.ts @@ -26,6 +26,20 @@ export namespace Tool { metadata(input: { title?: string; metadata?: M }): void ask(input: Omit): Promise } + + export function context( + input: Omit, "abort" | "callID"> & { + abort?: AbortSignal + callID?: string + }, + ): Context { + return { + ...input, + abort: input.abort ?? new AbortController().signal, + callID: input.callID, + } + } + export interface Def { description: string parameters: Parameters diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index 9dc395876e..a91df76ebf 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -250,7 +250,7 @@ describe("Runner", () => { Effect.gen(function* () { const s = yield* Scope.Scope const runner = Runner.make(s) - const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done")) + const result = yield* runner.startShell(Effect.succeed("shell-done")) expect(result).toBe("shell-done") expect(runner.busy).toBe(false) }), @@ -264,7 +264,7 @@ describe("Runner", () => { const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") - const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit) + const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) yield* runner.cancel @@ -279,12 +279,10 @@ describe("Runner", () => { const runner = Runner.make(s) const gate = yield* Deferred.make() - const sh = yield* runner - .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first"))) - .pipe(Effect.forkChild) + const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") - const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit) + const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) yield* Deferred.succeed(gate, undefined) @@ -302,37 +300,26 @@ describe("Runner", () => { }, }) - const sh = yield* runner - .startShell((signal) => - Effect.promise( - () => - new Promise((resolve) => { - signal.addEventListener("abort", () => resolve("aborted"), { once: true }) - }), - ), - ) - .pipe(Effect.forkChild) + const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") - const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit) + const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit) expect(Exit.isFailure(exit)).toBe(true) yield* runner.cancel const done = yield* Fiber.await(sh) - expect(Exit.isSuccess(done)).toBe(true) + expect(Exit.isFailure(done)).toBe(true) }), ) it.live( - "cancel interrupts shell that ignores abort signal", + "cancel interrupts shell", Effect.gen(function* () { const s = yield* Scope.Scope const runner = Runner.make(s) const gate = yield* Deferred.make() - const sh = yield* runner - .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored"))) - .pipe(Effect.forkChild) + const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") const stop = yield* runner.cancel.pipe(Effect.forkChild) @@ -356,9 +343,7 @@ describe("Runner", () => { const runner = Runner.make(s) const gate = yield* Deferred.make() - const sh = yield* runner - .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result"))) - .pipe(Effect.forkChild) + const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") expect(runner.state._tag).toBe("Shell") @@ -384,9 +369,7 @@ describe("Runner", () => { const calls = yield* Ref.make(0) const gate = yield* Deferred.make() - const sh = yield* runner - .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell"))) - .pipe(Effect.forkChild) + const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") const work = Effect.gen(function* () { @@ -414,16 +397,7 @@ describe("Runner", () => { const runner = Runner.make(s) const gate = yield* Deferred.make() - const sh = yield* runner - .startShell((signal) => - Effect.promise( - () => - new Promise((resolve) => { - signal.addEventListener("abort", () => resolve("aborted"), { once: true }) - }), - ), - ) - .pipe(Effect.forkChild) + const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild) @@ -478,7 +452,7 @@ describe("Runner", () => { const runner = Runner.make(s, { onBusy: Ref.update(count, (n) => n + 1), }) - yield* runner.startShell((_signal) => Effect.succeed("done")) + yield* runner.startShell(Effect.succeed("done")) expect(yield* Ref.get(count)).toBe(1) }), ) @@ -509,9 +483,7 @@ describe("Runner", () => { const runner = Runner.make(s) const gate = yield* Deferred.make() - const fiber = yield* runner - .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok"))) - .pipe(Effect.forkChild) + const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild) yield* Effect.sleep("10 millis") expect(runner.busy).toBe(true) diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 17689cf274..6bc9583ce3 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -1,8 +1,7 @@ import { NodeFileSystem } from "@effect/platform-node" -import { expect, spyOn } from "bun:test" +import { expect } from "bun:test" import { Cause, Effect, Exit, Fiber, Layer } from "effect" import path from "path" -import z from "zod" import { Agent as AgentSvc } from "../../src/agent/agent" import { Bus } from "../../src/bus" import { Command } from "../../src/command" @@ -29,7 +28,6 @@ import { MessageID, PartID, SessionID } from "../../src/session/schema" import { SessionStatus } from "../../src/session/status" import { Shell } from "../../src/shell/shell" import { Snapshot } from "../../src/snapshot" -import { TaskTool } from "../../src/tool/task" import { ToolRegistry } from "../../src/tool/registry" import { Truncate } from "../../src/tool/truncate" import { Log } from "../../src/util/log" @@ -629,41 +627,26 @@ it.live( provideTmpdirInstance( (dir) => Effect.gen(function* () { - const ready = defer() - const aborted = defer() - const init = spyOn(TaskTool, "init").mockImplementation(async () => ({ - description: "task", - parameters: z.object({ - description: z.string(), - prompt: z.string(), - subagent_type: z.string(), - task_id: z.string().optional(), - command: z.string().optional(), - }), - execute: async (_args, ctx) => { - ready.resolve() - ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true }) - await new Promise(() => {}) - return { - title: "", - metadata: { - sessionId: SessionID.make("task"), - model: ref, - }, - output: "", - } - }, - })) - yield* Effect.addFinalizer(() => Effect.sync(() => init.mockRestore())) - - const { prompt, chat } = yield* boot() + const { prompt, chat, sessions } = yield* boot() + const llm = yield* TestLLMServer + yield* llm.hang const msg = yield* user(chat.id, "hello") yield* addSubtask(chat.id, msg.id) const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + + yield* Effect.gen(function* () { + while (true) { + const [child] = yield* sessions.children(chat.id) + if (child) { + const msgs = yield* sessions.messages({ sessionID: child.id }) + if (msgs.some((msg) => msg.info.role === "assistant")) return + } + yield* Effect.sleep("10 millis") + } + }) + yield* prompt.cancel(chat.id) - yield* Effect.promise(() => aborted.promise) const exit = yield* Fiber.await(fiber) expect(Exit.isSuccess(exit)).toBe(true)