refactor(prompt): remove prompt abort-signal plumbing
Keep prompt reads, subtask execution, and shell runner wiring on shared Effect services so prompt no longer carries its own abort-signal adapters or duplicate task flow.pull/21116/head
parent
8b8d4fa066
commit
a3bf978919
|
|
@ -1,10 +1,10 @@
|
||||||
import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect"
|
import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect"
|
||||||
|
|
||||||
export interface Runner<A, E = never> {
|
export interface Runner<A, E = never> {
|
||||||
readonly state: Runner.State<A, E>
|
readonly state: Runner.State<A, E>
|
||||||
readonly busy: boolean
|
readonly busy: boolean
|
||||||
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||||
readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E>
|
readonly startShell: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||||
readonly cancel: Effect.Effect<void>
|
readonly cancel: Effect.Effect<void>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -20,7 +20,6 @@ export namespace Runner {
|
||||||
interface ShellHandle<A, E> {
|
interface ShellHandle<A, E> {
|
||||||
id: number
|
id: number
|
||||||
fiber: Fiber.Fiber<A, E>
|
fiber: Fiber.Fiber<A, E>
|
||||||
abort: AbortController
|
|
||||||
}
|
}
|
||||||
|
|
||||||
interface PendingHandle<A, E> {
|
interface PendingHandle<A, E> {
|
||||||
|
|
@ -102,9 +101,7 @@ export namespace Runner {
|
||||||
|
|
||||||
const stopShell = (shell: ShellHandle<A, E>) =>
|
const stopShell = (shell: ShellHandle<A, E>) =>
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
shell.abort.abort()
|
yield* Fiber.interrupt(shell.fiber)
|
||||||
const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
|
|
||||||
if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
|
|
||||||
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
|
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -138,7 +135,7 @@ export namespace Runner {
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) =>
|
const startShell = (work: Effect.Effect<A, E>) =>
|
||||||
SynchronizedRef.modifyEffect(
|
SynchronizedRef.modifyEffect(
|
||||||
ref,
|
ref,
|
||||||
Effect.fnUntraced(function* (st) {
|
Effect.fnUntraced(function* (st) {
|
||||||
|
|
@ -153,9 +150,8 @@ export namespace Runner {
|
||||||
}
|
}
|
||||||
yield* busy
|
yield* busy
|
||||||
const id = next()
|
const id = next()
|
||||||
const abort = new AbortController()
|
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
|
||||||
const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
|
const shell = { id, fiber } satisfies ShellHandle<A, E>
|
||||||
const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
|
|
||||||
return [
|
return [
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
const exit = yield* Fiber.await(fiber)
|
const exit = yield* Fiber.await(fiber)
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import { ToolRegistry } from "../tool/registry"
|
||||||
import { Runner } from "@/effect/runner"
|
import { Runner } from "@/effect/runner"
|
||||||
import { MCP } from "../mcp"
|
import { MCP } from "../mcp"
|
||||||
import { LSP } from "../lsp"
|
import { LSP } from "../lsp"
|
||||||
import { ReadTool } from "../tool/read"
|
|
||||||
import { FileTime } from "../file/time"
|
import { FileTime } from "../file/time"
|
||||||
import { Flag } from "../flag/flag"
|
import { Flag } from "../flag/flag"
|
||||||
import { ulid } from "ulid"
|
import { ulid } from "ulid"
|
||||||
|
|
@ -33,11 +32,11 @@ import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||||
import * as Stream from "effect/Stream"
|
import * as Stream from "effect/Stream"
|
||||||
import { Command } from "../command"
|
import { Command } from "../command"
|
||||||
import { pathToFileURL, fileURLToPath } from "url"
|
import { pathToFileURL, fileURLToPath } from "url"
|
||||||
|
import { Config } from "../config/config"
|
||||||
import { ConfigMarkdown } from "../config/markdown"
|
import { ConfigMarkdown } from "../config/markdown"
|
||||||
import { SessionSummary } from "./summary"
|
import { SessionSummary } from "./summary"
|
||||||
import { NamedError } from "@opencode-ai/util/error"
|
import { NamedError } from "@opencode-ai/util/error"
|
||||||
import { SessionProcessor } from "./processor"
|
import { SessionProcessor } from "./processor"
|
||||||
import { TaskTool } from "@/tool/task"
|
|
||||||
import { Tool } from "@/tool/tool"
|
import { Tool } from "@/tool/tool"
|
||||||
import { Permission } from "@/permission"
|
import { Permission } from "@/permission"
|
||||||
import { SessionStatus } from "./status"
|
import { SessionStatus } from "./status"
|
||||||
|
|
@ -47,6 +46,8 @@ import { AppFileSystem } from "@/filesystem"
|
||||||
import { Truncate } from "@/tool/truncate"
|
import { Truncate } from "@/tool/truncate"
|
||||||
import { decodeDataUrl } from "@/util/data-url"
|
import { decodeDataUrl } from "@/util/data-url"
|
||||||
import { Process } from "@/util/process"
|
import { Process } from "@/util/process"
|
||||||
|
import { run as read } from "@/tool/read"
|
||||||
|
import { output as subtaskOutput, run as subtask } from "@/tool/subtask"
|
||||||
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
|
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
|
||||||
import { InstanceState } from "@/effect/instance-state"
|
import { InstanceState } from "@/effect/instance-state"
|
||||||
import { makeRuntime } from "@/effect/run-service"
|
import { makeRuntime } from "@/effect/run-service"
|
||||||
|
|
@ -101,6 +102,7 @@ export namespace SessionPrompt {
|
||||||
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
|
||||||
const scope = yield* Scope.Scope
|
const scope = yield* Scope.Scope
|
||||||
const instruction = yield* Instruction.Service
|
const instruction = yield* Instruction.Service
|
||||||
|
const llm = yield* LLM.Service
|
||||||
|
|
||||||
const state = yield* InstanceState.make(
|
const state = yield* InstanceState.make(
|
||||||
Effect.fn("SessionPrompt.state")(function* () {
|
Effect.fn("SessionPrompt.state")(function* () {
|
||||||
|
|
@ -218,26 +220,29 @@ export namespace SessionPrompt {
|
||||||
const msgs = onlySubtasks
|
const msgs = onlySubtasks
|
||||||
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
|
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
|
||||||
: yield* MessageV2.toModelMessagesEffect(context, mdl)
|
: yield* MessageV2.toModelMessagesEffect(context, mdl)
|
||||||
const text = yield* Effect.promise(async (signal) => {
|
const text = yield* llm
|
||||||
const result = await LLM.stream({
|
.stream({
|
||||||
agent: ag,
|
agent: ag,
|
||||||
user: firstInfo,
|
user: firstInfo,
|
||||||
system: [],
|
system: [],
|
||||||
small: true,
|
small: true,
|
||||||
tools: {},
|
tools: {},
|
||||||
model: mdl,
|
model: mdl,
|
||||||
abort: signal,
|
|
||||||
sessionID: input.session.id,
|
sessionID: input.session.id,
|
||||||
retries: 2,
|
retries: 2,
|
||||||
messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs],
|
messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs],
|
||||||
})
|
})
|
||||||
return result.text
|
.pipe(
|
||||||
})
|
Stream.runFold(
|
||||||
|
() => "",
|
||||||
|
(text: string, event: LLM.Event) => (event.type === "text-delta" ? text + event.text : text),
|
||||||
|
),
|
||||||
|
)
|
||||||
const cleaned = text
|
const cleaned = text
|
||||||
.replace(/<think>[\s\S]*?<\/think>\s*/g, "")
|
.replace(/<think>[\s\S]*?<\/think>\s*/g, "")
|
||||||
.split("\n")
|
.split("\n")
|
||||||
.map((line) => line.trim())
|
.map((line: string) => line.trim())
|
||||||
.find((line) => line.length > 0)
|
.find((line: string) => line.length > 0)
|
||||||
if (!cleaned) return
|
if (!cleaned) return
|
||||||
const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
|
const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
|
||||||
yield* sessions
|
yield* sessions
|
||||||
|
|
@ -397,41 +402,42 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
using _ = log.time("resolveTools")
|
using _ = log.time("resolveTools")
|
||||||
const tools: Record<string, AITool> = {}
|
const tools: Record<string, AITool> = {}
|
||||||
|
|
||||||
const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({
|
const context = (args: any, options: ToolExecutionOptions): Tool.Context =>
|
||||||
sessionID: input.session.id,
|
Tool.context({
|
||||||
abort: options.abortSignal!,
|
abort: options.abortSignal,
|
||||||
messageID: input.processor.message.id,
|
callID: options.toolCallId,
|
||||||
callID: options.toolCallId,
|
sessionID: input.session.id,
|
||||||
extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck },
|
messageID: input.processor.message.id,
|
||||||
agent: input.agent.name,
|
extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck },
|
||||||
messages: input.messages,
|
agent: input.agent.name,
|
||||||
metadata: (val) =>
|
messages: input.messages,
|
||||||
Effect.runPromise(
|
metadata: (val) =>
|
||||||
Effect.gen(function* () {
|
Effect.runPromise(
|
||||||
const match = input.processor.partFromToolCall(options.toolCallId)
|
Effect.gen(function* () {
|
||||||
if (!match || !["running", "pending"].includes(match.state.status)) return
|
const match = input.processor.partFromToolCall(options.toolCallId)
|
||||||
yield* sessions.updatePart({
|
if (!match || !["running", "pending"].includes(match.state.status)) return
|
||||||
...match,
|
yield* sessions.updatePart({
|
||||||
state: {
|
...match,
|
||||||
title: val.title,
|
state: {
|
||||||
metadata: val.metadata,
|
title: val.title,
|
||||||
status: "running",
|
metadata: val.metadata,
|
||||||
input: args,
|
status: "running",
|
||||||
time: { start: Date.now() },
|
input: args,
|
||||||
},
|
time: { start: Date.now() },
|
||||||
})
|
},
|
||||||
}),
|
})
|
||||||
),
|
}),
|
||||||
ask: (req) =>
|
),
|
||||||
Effect.runPromise(
|
ask: (req) =>
|
||||||
permission.ask({
|
Effect.runPromise(
|
||||||
...req,
|
permission.ask({
|
||||||
sessionID: input.session.id,
|
...req,
|
||||||
tool: { messageID: input.processor.message.id, callID: options.toolCallId },
|
sessionID: input.session.id,
|
||||||
ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []),
|
tool: { messageID: input.processor.message.id, callID: options.toolCallId },
|
||||||
}),
|
ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []),
|
||||||
),
|
}),
|
||||||
})
|
),
|
||||||
|
})
|
||||||
|
|
||||||
for (const item of yield* registry.tools(
|
for (const item of yield* registry.tools(
|
||||||
{ modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID },
|
{ modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID },
|
||||||
|
|
@ -555,13 +561,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
model: Provider.Model
|
model: Provider.Model
|
||||||
lastUser: MessageV2.User
|
lastUser: MessageV2.User
|
||||||
sessionID: SessionID
|
sessionID: SessionID
|
||||||
session: Session.Info
|
|
||||||
msgs: MessageV2.WithParts[]
|
|
||||||
}) {
|
}) {
|
||||||
const { task, model, lastUser, sessionID, session, msgs } = input
|
const { task, model, lastUser, sessionID } = input
|
||||||
const ctx = yield* InstanceState.context
|
const ctx = yield* InstanceState.context
|
||||||
const taskTool = yield* Effect.promise(() => registry.named.task.init())
|
const taskAgent = yield* agents.get(task.agent)
|
||||||
|
if (!taskAgent) {
|
||||||
|
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
|
||||||
|
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
|
||||||
|
const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` })
|
||||||
|
yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
|
||||||
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
|
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
|
||||||
|
const taskRef = { providerID: taskModel.providerID, modelID: taskModel.id }
|
||||||
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
|
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
|
||||||
id: MessageID.ascending(),
|
id: MessageID.ascending(),
|
||||||
role: "assistant",
|
role: "assistant",
|
||||||
|
|
@ -601,57 +614,71 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
subagent_type: task.agent,
|
subagent_type: task.agent,
|
||||||
command: task.command,
|
command: task.command,
|
||||||
}
|
}
|
||||||
yield* plugin.trigger("tool.execute.before", { tool: "task", sessionID, callID: part.id }, { args: taskArgs })
|
yield* plugin.trigger(
|
||||||
|
"tool.execute.before",
|
||||||
const taskAgent = yield* agents.get(task.agent)
|
{ tool: "task", sessionID, callID: part.callID },
|
||||||
if (!taskAgent) {
|
{ args: taskArgs },
|
||||||
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
|
)
|
||||||
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
|
|
||||||
const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` })
|
|
||||||
yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
|
|
||||||
throw error
|
|
||||||
}
|
|
||||||
|
|
||||||
|
let child: SessionID | undefined
|
||||||
let error: Error | undefined
|
let error: Error | undefined
|
||||||
const result = yield* Effect.promise((signal) =>
|
const result = yield* subtask(
|
||||||
taskTool
|
{
|
||||||
.execute(taskArgs, {
|
cfg: Effect.promise(() => Config.get()),
|
||||||
agent: task.agent,
|
get: (taskID) => sessions.get(SessionID.make(taskID)).pipe(Effect.catch(() => Effect.succeed(undefined))),
|
||||||
messageID: assistantMessage.id,
|
create: (input) => sessions.create(input),
|
||||||
sessionID,
|
resolve: resolvePromptParts,
|
||||||
abort: signal,
|
prompt: (input) => prompt({ ...input, messageID: MessageID.ascending() }),
|
||||||
callID: part.callID,
|
},
|
||||||
extra: { bypassAgentCheck: true },
|
{
|
||||||
messages: msgs,
|
parentID: sessionID,
|
||||||
metadata(val: { title?: string; metadata?: Record<string, any> }) {
|
description: task.description,
|
||||||
return Effect.runPromise(
|
prompt: task.prompt,
|
||||||
Effect.gen(function* () {
|
agent: taskAgent,
|
||||||
part = yield* sessions.updatePart({
|
model: taskRef,
|
||||||
...part,
|
start(sessionID, model) {
|
||||||
type: "tool",
|
child = sessionID
|
||||||
state: { ...part.state, ...val },
|
const metadata = { sessionId: sessionID, model }
|
||||||
} satisfies MessageV2.ToolPart)
|
return Effect.runPromise(
|
||||||
}),
|
sessions.updatePart({
|
||||||
)
|
...part,
|
||||||
},
|
state: {
|
||||||
ask(req: any) {
|
status: "running",
|
||||||
return Effect.runPromise(
|
input: part.state.input,
|
||||||
permission.ask({
|
time: part.state.status === "running" ? part.state.time : { start: Date.now() },
|
||||||
...req,
|
title: task.description,
|
||||||
sessionID,
|
metadata,
|
||||||
ruleset: Permission.merge(taskAgent.permission, session.permission ?? []),
|
},
|
||||||
}),
|
} satisfies MessageV2.ToolPart),
|
||||||
)
|
).then((next) => {
|
||||||
},
|
part = next
|
||||||
})
|
})
|
||||||
.catch((e) => {
|
},
|
||||||
error = e instanceof Error ? e : new Error(String(e))
|
},
|
||||||
log.error("subtask execution failed", { error, agent: task.agent, description: task.description })
|
|
||||||
return undefined
|
|
||||||
}),
|
|
||||||
).pipe(
|
).pipe(
|
||||||
|
Effect.flatMap((sub) =>
|
||||||
|
truncate.output(subtaskOutput(sub.sessionID, sub.text), {}).pipe(
|
||||||
|
Effect.map((truncated) => ({
|
||||||
|
title: task.description,
|
||||||
|
metadata: {
|
||||||
|
sessionId: sub.sessionID,
|
||||||
|
model: sub.model,
|
||||||
|
truncated: truncated.truncated,
|
||||||
|
...(truncated.truncated && { outputPath: truncated.outputPath }),
|
||||||
|
},
|
||||||
|
output: truncated.content,
|
||||||
|
})),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
Effect.catchCause((cause) => {
|
||||||
|
const err = Cause.squash(cause)
|
||||||
|
error = err instanceof Error ? err : new Error(String(err))
|
||||||
|
log.error("subtask execution failed", { error, agent: task.agent, description: task.description })
|
||||||
|
return Effect.succeed(undefined)
|
||||||
|
}),
|
||||||
Effect.onInterrupt(() =>
|
Effect.onInterrupt(() =>
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
|
if (child) yield* cancel(child)
|
||||||
assistantMessage.finish = "tool-calls"
|
assistantMessage.finish = "tool-calls"
|
||||||
assistantMessage.time.completed = Date.now()
|
assistantMessage.time.completed = Date.now()
|
||||||
yield* sessions.updateMessage(assistantMessage)
|
yield* sessions.updateMessage(assistantMessage)
|
||||||
|
|
@ -671,16 +698,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
const attachments = result?.attachments?.map((attachment) => ({
|
|
||||||
...attachment,
|
|
||||||
id: PartID.ascending(),
|
|
||||||
sessionID,
|
|
||||||
messageID: assistantMessage.id,
|
|
||||||
}))
|
|
||||||
|
|
||||||
yield* plugin.trigger(
|
yield* plugin.trigger(
|
||||||
"tool.execute.after",
|
"tool.execute.after",
|
||||||
{ tool: "task", sessionID, callID: part.id, args: taskArgs },
|
{ tool: "task", sessionID, callID: part.callID, args: taskArgs },
|
||||||
result,
|
result,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -697,7 +717,6 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
title: result.title,
|
title: result.title,
|
||||||
metadata: result.metadata,
|
metadata: result.metadata,
|
||||||
output: result.output,
|
output: result.output,
|
||||||
attachments,
|
|
||||||
time: { ...part.state.time, end: Date.now() },
|
time: { ...part.state.time, end: Date.now() },
|
||||||
},
|
},
|
||||||
} satisfies MessageV2.ToolPart)
|
} satisfies MessageV2.ToolPart)
|
||||||
|
|
@ -740,7 +759,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
} satisfies MessageV2.TextPart)
|
} satisfies MessageV2.TextPart)
|
||||||
})
|
})
|
||||||
|
|
||||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
|
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) {
|
||||||
const ctx = yield* InstanceState.context
|
const ctx = yield* InstanceState.context
|
||||||
const session = yield* sessions.get(input.sessionID)
|
const session = yield* sessions.get(input.sessionID)
|
||||||
if (session.revert) {
|
if (session.revert) {
|
||||||
|
|
@ -1073,6 +1092,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
log.info("file", { mime: part.mime })
|
log.info("file", { mime: part.mime })
|
||||||
const filepath = fileURLToPath(part.url)
|
const filepath = fileURLToPath(part.url)
|
||||||
if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory"
|
if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory"
|
||||||
|
const readCtx = Tool.context({
|
||||||
|
sessionID: input.sessionID,
|
||||||
|
agent: info.agent,
|
||||||
|
messageID: info.id,
|
||||||
|
extra: { bypassCwdCheck: true },
|
||||||
|
messages: [],
|
||||||
|
metadata: () => {},
|
||||||
|
ask: async () => {},
|
||||||
|
})
|
||||||
|
|
||||||
if (part.mime === "text/plain") {
|
if (part.mime === "text/plain") {
|
||||||
let offset: number | undefined
|
let offset: number | undefined
|
||||||
|
|
@ -1110,29 +1138,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
|
text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
const read = yield* Effect.promise(() => registry.named.read.init()).pipe(
|
const readResult = yield* read(
|
||||||
Effect.flatMap((t) =>
|
{ fs: fsys, instruction, lsp, time: filetime, scope },
|
||||||
provider.getModel(info.model.providerID, info.model.modelID).pipe(
|
args,
|
||||||
Effect.flatMap((mdl) =>
|
readCtx,
|
||||||
Effect.promise(() =>
|
).pipe(Effect.exit)
|
||||||
t.execute(args, {
|
if (Exit.isSuccess(readResult)) {
|
||||||
sessionID: input.sessionID,
|
const result = readResult.value
|
||||||
abort: new AbortController().signal,
|
|
||||||
agent: input.agent!,
|
|
||||||
messageID: info.id,
|
|
||||||
extra: { bypassCwdCheck: true, model: mdl },
|
|
||||||
messages: [],
|
|
||||||
metadata: async () => {},
|
|
||||||
ask: async () => {},
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
Effect.exit,
|
|
||||||
)
|
|
||||||
if (Exit.isSuccess(read)) {
|
|
||||||
const result = read.value
|
|
||||||
pieces.push({
|
pieces.push({
|
||||||
messageID: info.id,
|
messageID: info.id,
|
||||||
sessionID: input.sessionID,
|
sessionID: input.sessionID,
|
||||||
|
|
@ -1145,7 +1157,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
...result.attachments.map((a) => ({
|
...result.attachments.map((a) => ({
|
||||||
...a,
|
...a,
|
||||||
synthetic: true,
|
synthetic: true,
|
||||||
filename: a.filename ?? part.filename,
|
filename: "filename" in a && typeof a.filename === "string" ? a.filename : part.filename,
|
||||||
messageID: info.id,
|
messageID: info.id,
|
||||||
sessionID: input.sessionID,
|
sessionID: input.sessionID,
|
||||||
})),
|
})),
|
||||||
|
|
@ -1154,7 +1166,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
|
pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const error = Cause.squash(read.cause)
|
const error = Cause.squash(readResult.cause)
|
||||||
log.error("failed to read file", { error })
|
log.error("failed to read file", { error })
|
||||||
const message = error instanceof Error ? error.message : String(error)
|
const message = error instanceof Error ? error.message : String(error)
|
||||||
yield* bus.publish(Session.Event.Error, {
|
yield* bus.publish(Session.Event.Error, {
|
||||||
|
|
@ -1174,21 +1186,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
|
|
||||||
if (part.mime === "application/x-directory") {
|
if (part.mime === "application/x-directory") {
|
||||||
const args = { filePath: filepath }
|
const args = { filePath: filepath }
|
||||||
const result = yield* Effect.promise(() => registry.named.read.init()).pipe(
|
const result = yield* read({ fs: fsys, instruction, lsp, time: filetime, scope }, args, readCtx).pipe(
|
||||||
Effect.flatMap((t) =>
|
Effect.orDie,
|
||||||
Effect.promise(() =>
|
|
||||||
t.execute(args, {
|
|
||||||
sessionID: input.sessionID,
|
|
||||||
abort: new AbortController().signal,
|
|
||||||
agent: input.agent!,
|
|
||||||
messageID: info.id,
|
|
||||||
extra: { bypassCwdCheck: true },
|
|
||||||
messages: [],
|
|
||||||
metadata: async () => {},
|
|
||||||
ask: async () => {},
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
|
|
@ -1332,7 +1331,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
}
|
}
|
||||||
if (latest) return latest
|
if (latest) return latest
|
||||||
throw new Error("Impossible")
|
throw new Error("Impossible")
|
||||||
})
|
}).pipe(Effect.orDie)
|
||||||
|
|
||||||
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
|
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
|
||||||
function* (sessionID: SessionID) {
|
function* (sessionID: SessionID) {
|
||||||
|
|
@ -1393,7 +1392,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
const task = tasks.pop()
|
const task = tasks.pop()
|
||||||
|
|
||||||
if (task?.type === "subtask") {
|
if (task?.type === "subtask") {
|
||||||
yield* handleSubtask({ task, model, lastUser, sessionID, session, msgs })
|
yield* handleSubtask({ task, model, lastUser, sessionID })
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1577,7 +1576,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
function* (input: ShellInput) {
|
function* (input: ShellInput) {
|
||||||
const s = yield* InstanceState.get(state)
|
const s = yield* InstanceState.get(state)
|
||||||
const runner = getRunner(s.runners, input.sessionID)
|
const runner = getRunner(s.runners, input.sessionID)
|
||||||
return yield* runner.startShell((signal) => shellImpl(input, signal))
|
return yield* runner.startShell(shellImpl(input))
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -1722,6 +1721,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||||
Layer.provide(FileTime.defaultLayer),
|
Layer.provide(FileTime.defaultLayer),
|
||||||
Layer.provide(ToolRegistry.defaultLayer),
|
Layer.provide(ToolRegistry.defaultLayer),
|
||||||
Layer.provide(Truncate.layer),
|
Layer.provide(Truncate.layer),
|
||||||
|
Layer.provide(LLM.defaultLayer),
|
||||||
Layer.provide(Provider.defaultLayer),
|
Layer.provide(Provider.defaultLayer),
|
||||||
Layer.provide(Instruction.defaultLayer),
|
Layer.provide(Instruction.defaultLayer),
|
||||||
Layer.provide(AppFileSystem.defaultLayer),
|
Layer.provide(AppFileSystem.defaultLayer),
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,9 @@ type Options = {
|
||||||
kind?: Kind
|
kind?: Kind
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function assertExternalDirectory(ctx: Tool.Context, target?: string, options?: Options) {
|
type Ctx = Pick<Tool.Context, "ask">
|
||||||
|
|
||||||
|
export async function assertExternalDirectory(ctx: Ctx, target?: string, options?: Options) {
|
||||||
if (!target) return
|
if (!target) return
|
||||||
|
|
||||||
if (options?.bypass) return
|
if (options?.bypass) return
|
||||||
|
|
@ -38,7 +40,7 @@ export async function assertExternalDirectory(ctx: Tool.Context, target?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
export const assertExternalDirectoryEffect = Effect.fn("Tool.assertExternalDirectory")(function* (
|
export const assertExternalDirectoryEffect = Effect.fn("Tool.assertExternalDirectory")(function* (
|
||||||
ctx: Tool.Context,
|
ctx: Ctx,
|
||||||
target?: string,
|
target?: string,
|
||||||
options?: Options,
|
options?: Options,
|
||||||
) {
|
) {
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,197 @@ const parameters = z.object({
|
||||||
limit: z.coerce.number().describe("The maximum number of lines to read (defaults to 2000)").optional(),
|
limit: z.coerce.number().describe("The maximum number of lines to read (defaults to 2000)").optional(),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
type Ctx = Omit<Tool.Context, "abort">
|
||||||
|
|
||||||
|
type Deps = {
|
||||||
|
fs: AppFileSystem.Interface
|
||||||
|
instruction: Instruction.Interface
|
||||||
|
lsp: LSP.Interface
|
||||||
|
time: FileTime.Interface
|
||||||
|
scope: Scope.Scope
|
||||||
|
}
|
||||||
|
|
||||||
|
export const run = Effect.fn("ReadTool.run")(function* (deps: Deps, params: z.infer<typeof parameters>, ctx: Ctx) {
|
||||||
|
const miss = Effect.fn("ReadTool.miss")(function* (filepath: string) {
|
||||||
|
const dir = path.dirname(filepath)
|
||||||
|
const base = path.basename(filepath)
|
||||||
|
const items = yield* deps.fs.readDirectory(dir).pipe(
|
||||||
|
Effect.map((items) =>
|
||||||
|
items
|
||||||
|
.filter(
|
||||||
|
(item) =>
|
||||||
|
item.toLowerCase().includes(base.toLowerCase()) || base.toLowerCase().includes(item.toLowerCase()),
|
||||||
|
)
|
||||||
|
.map((item) => path.join(dir, item))
|
||||||
|
.slice(0, 3),
|
||||||
|
),
|
||||||
|
Effect.catch(() => Effect.succeed([] as string[])),
|
||||||
|
)
|
||||||
|
|
||||||
|
if (items.length > 0) {
|
||||||
|
return yield* Effect.fail(
|
||||||
|
new Error(`File not found: ${filepath}\n\nDid you mean one of these?\n${items.join("\n")}`),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return yield* Effect.fail(new Error(`File not found: ${filepath}`))
|
||||||
|
})
|
||||||
|
|
||||||
|
const list = Effect.fn("ReadTool.list")(function* (filepath: string) {
|
||||||
|
const items = yield* deps.fs.readDirectoryEntries(filepath)
|
||||||
|
return yield* Effect.forEach(
|
||||||
|
items,
|
||||||
|
Effect.fnUntraced(function* (item) {
|
||||||
|
if (item.type === "directory") return item.name + "/"
|
||||||
|
if (item.type !== "symlink") return item.name
|
||||||
|
|
||||||
|
const target = yield* deps.fs
|
||||||
|
.stat(path.join(filepath, item.name))
|
||||||
|
.pipe(Effect.catch(() => Effect.succeed(undefined)))
|
||||||
|
if (target?.type === "Directory") return item.name + "/"
|
||||||
|
return item.name
|
||||||
|
}),
|
||||||
|
{ concurrency: "unbounded" },
|
||||||
|
).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b))))
|
||||||
|
})
|
||||||
|
|
||||||
|
const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, sessionID: Ctx["sessionID"]) {
|
||||||
|
yield* deps.lsp.touchFile(filepath, false).pipe(Effect.ignore, Effect.forkIn(deps.scope))
|
||||||
|
yield* deps.time.read(sessionID, filepath)
|
||||||
|
})
|
||||||
|
|
||||||
|
if (params.offset !== undefined && params.offset < 1) {
|
||||||
|
return yield* Effect.fail(new Error("offset must be greater than or equal to 1"))
|
||||||
|
}
|
||||||
|
|
||||||
|
let filepath = params.filePath
|
||||||
|
if (!path.isAbsolute(filepath)) {
|
||||||
|
filepath = path.resolve(Instance.directory, filepath)
|
||||||
|
}
|
||||||
|
if (process.platform === "win32") {
|
||||||
|
filepath = AppFileSystem.normalizePath(filepath)
|
||||||
|
}
|
||||||
|
const title = path.relative(Instance.worktree, filepath)
|
||||||
|
|
||||||
|
const stat = yield* deps.fs.stat(filepath).pipe(
|
||||||
|
Effect.catchIf(
|
||||||
|
(err) => "reason" in err && err.reason._tag === "NotFound",
|
||||||
|
() => Effect.succeed(undefined),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
yield* assertExternalDirectoryEffect(ctx, filepath, {
|
||||||
|
bypass: Boolean(ctx.extra?.["bypassCwdCheck"]),
|
||||||
|
kind: stat?.type === "Directory" ? "directory" : "file",
|
||||||
|
})
|
||||||
|
|
||||||
|
yield* Effect.promise(() =>
|
||||||
|
ctx.ask({
|
||||||
|
permission: "read",
|
||||||
|
patterns: [filepath],
|
||||||
|
always: ["*"],
|
||||||
|
metadata: {},
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
if (!stat) return yield* miss(filepath)
|
||||||
|
|
||||||
|
if (stat.type === "Directory") {
|
||||||
|
const items = yield* list(filepath)
|
||||||
|
const limit = params.limit ?? DEFAULT_READ_LIMIT
|
||||||
|
const offset = params.offset ?? 1
|
||||||
|
const start = offset - 1
|
||||||
|
const sliced = items.slice(start, start + limit)
|
||||||
|
const truncated = start + sliced.length < items.length
|
||||||
|
|
||||||
|
return {
|
||||||
|
title,
|
||||||
|
output: [
|
||||||
|
`<path>${filepath}</path>`,
|
||||||
|
`<type>directory</type>`,
|
||||||
|
`<entries>`,
|
||||||
|
sliced.join("\n"),
|
||||||
|
truncated
|
||||||
|
? `\n(Showing ${sliced.length} of ${items.length} entries. Use 'offset' parameter to read beyond entry ${offset + sliced.length})`
|
||||||
|
: `\n(${items.length} entries)`,
|
||||||
|
`</entries>`,
|
||||||
|
].join("\n"),
|
||||||
|
metadata: {
|
||||||
|
preview: sliced.slice(0, 20).join("\n"),
|
||||||
|
truncated,
|
||||||
|
loaded: [] as string[],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const loaded = yield* deps.instruction.resolve(ctx.messages, filepath, ctx.messageID)
|
||||||
|
|
||||||
|
const mime = AppFileSystem.mimeType(filepath)
|
||||||
|
const isImage = mime.startsWith("image/") && mime !== "image/svg+xml" && mime !== "image/vnd.fastbidsheet"
|
||||||
|
const isPdf = mime === "application/pdf"
|
||||||
|
if (isImage || isPdf) {
|
||||||
|
const msg = `${isImage ? "Image" : "PDF"} read successfully`
|
||||||
|
return {
|
||||||
|
title,
|
||||||
|
output: msg,
|
||||||
|
metadata: {
|
||||||
|
preview: msg,
|
||||||
|
truncated: false,
|
||||||
|
loaded: loaded.map((item) => item.filepath),
|
||||||
|
},
|
||||||
|
attachments: [
|
||||||
|
{
|
||||||
|
type: "file" as const,
|
||||||
|
mime,
|
||||||
|
url: `data:${mime};base64,${Buffer.from(yield* deps.fs.readFile(filepath)).toString("base64")}`,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (yield* Effect.promise(() => isBinaryFile(filepath, Number(stat.size)))) {
|
||||||
|
return yield* Effect.fail(new Error(`Cannot read binary file: ${filepath}`))
|
||||||
|
}
|
||||||
|
|
||||||
|
const file = yield* Effect.promise(() =>
|
||||||
|
lines(filepath, { limit: params.limit ?? DEFAULT_READ_LIMIT, offset: params.offset ?? 1 }),
|
||||||
|
)
|
||||||
|
if (file.count < file.offset && !(file.count === 0 && file.offset === 1)) {
|
||||||
|
return yield* Effect.fail(new Error(`Offset ${file.offset} is out of range for this file (${file.count} lines)`))
|
||||||
|
}
|
||||||
|
|
||||||
|
let output = [`<path>${filepath}</path>`, `<type>file</type>`, "<content>"].join("\n")
|
||||||
|
output += file.raw.map((line, i) => `${i + file.offset}: ${line}`).join("\n")
|
||||||
|
|
||||||
|
const last = file.offset + file.raw.length - 1
|
||||||
|
const next = last + 1
|
||||||
|
const truncated = file.more || file.cut
|
||||||
|
if (file.cut) {
|
||||||
|
output += `\n\n(Output capped at ${MAX_BYTES_LABEL}. Showing lines ${file.offset}-${last}. Use offset=${next} to continue.)`
|
||||||
|
} else if (file.more) {
|
||||||
|
output += `\n\n(Showing lines ${file.offset}-${last} of ${file.count}. Use offset=${next} to continue.)`
|
||||||
|
} else {
|
||||||
|
output += `\n\n(End of file - total ${file.count} lines)`
|
||||||
|
}
|
||||||
|
output += "\n</content>"
|
||||||
|
|
||||||
|
yield* warm(filepath, ctx.sessionID)
|
||||||
|
|
||||||
|
if (loaded.length > 0) {
|
||||||
|
output += `\n\n<system-reminder>\n${loaded.map((item) => item.content).join("\n\n")}\n</system-reminder>`
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
title,
|
||||||
|
output,
|
||||||
|
metadata: {
|
||||||
|
preview: file.raw.slice(0, 20).join("\n"),
|
||||||
|
truncated,
|
||||||
|
loaded: loaded.map((item) => item.filepath),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
export const ReadTool = Tool.defineEffect(
|
export const ReadTool = Tool.defineEffect(
|
||||||
"read",
|
"read",
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
|
|
@ -33,195 +224,13 @@ export const ReadTool = Tool.defineEffect(
|
||||||
const lsp = yield* LSP.Service
|
const lsp = yield* LSP.Service
|
||||||
const time = yield* FileTime.Service
|
const time = yield* FileTime.Service
|
||||||
const scope = yield* Scope.Scope
|
const scope = yield* Scope.Scope
|
||||||
|
const deps = { fs, instruction, lsp, time, scope } satisfies Deps
|
||||||
const miss = Effect.fn("ReadTool.miss")(function* (filepath: string) {
|
|
||||||
const dir = path.dirname(filepath)
|
|
||||||
const base = path.basename(filepath)
|
|
||||||
const items = yield* fs.readDirectory(dir).pipe(
|
|
||||||
Effect.map((items) =>
|
|
||||||
items
|
|
||||||
.filter(
|
|
||||||
(item) =>
|
|
||||||
item.toLowerCase().includes(base.toLowerCase()) || base.toLowerCase().includes(item.toLowerCase()),
|
|
||||||
)
|
|
||||||
.map((item) => path.join(dir, item))
|
|
||||||
.slice(0, 3),
|
|
||||||
),
|
|
||||||
Effect.catch(() => Effect.succeed([] as string[])),
|
|
||||||
)
|
|
||||||
|
|
||||||
if (items.length > 0) {
|
|
||||||
return yield* Effect.fail(
|
|
||||||
new Error(`File not found: ${filepath}\n\nDid you mean one of these?\n${items.join("\n")}`),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return yield* Effect.fail(new Error(`File not found: ${filepath}`))
|
|
||||||
})
|
|
||||||
|
|
||||||
const list = Effect.fn("ReadTool.list")(function* (filepath: string) {
|
|
||||||
const items = yield* fs.readDirectoryEntries(filepath)
|
|
||||||
return yield* Effect.forEach(
|
|
||||||
items,
|
|
||||||
Effect.fnUntraced(function* (item) {
|
|
||||||
if (item.type === "directory") return item.name + "/"
|
|
||||||
if (item.type !== "symlink") return item.name
|
|
||||||
|
|
||||||
const target = yield* fs
|
|
||||||
.stat(path.join(filepath, item.name))
|
|
||||||
.pipe(Effect.catch(() => Effect.succeed(undefined)))
|
|
||||||
if (target?.type === "Directory") return item.name + "/"
|
|
||||||
return item.name
|
|
||||||
}),
|
|
||||||
{ concurrency: "unbounded" },
|
|
||||||
).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b))))
|
|
||||||
})
|
|
||||||
|
|
||||||
const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, sessionID: Tool.Context["sessionID"]) {
|
|
||||||
yield* lsp.touchFile(filepath, false).pipe(Effect.ignore, Effect.forkIn(scope))
|
|
||||||
yield* time.read(sessionID, filepath)
|
|
||||||
})
|
|
||||||
|
|
||||||
const run = Effect.fn("ReadTool.execute")(function* (params: z.infer<typeof parameters>, ctx: Tool.Context) {
|
|
||||||
if (params.offset !== undefined && params.offset < 1) {
|
|
||||||
return yield* Effect.fail(new Error("offset must be greater than or equal to 1"))
|
|
||||||
}
|
|
||||||
|
|
||||||
let filepath = params.filePath
|
|
||||||
if (!path.isAbsolute(filepath)) {
|
|
||||||
filepath = path.resolve(Instance.directory, filepath)
|
|
||||||
}
|
|
||||||
if (process.platform === "win32") {
|
|
||||||
filepath = AppFileSystem.normalizePath(filepath)
|
|
||||||
}
|
|
||||||
const title = path.relative(Instance.worktree, filepath)
|
|
||||||
|
|
||||||
const stat = yield* fs.stat(filepath).pipe(
|
|
||||||
Effect.catchIf(
|
|
||||||
(err) => "reason" in err && err.reason._tag === "NotFound",
|
|
||||||
() => Effect.succeed(undefined),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
yield* assertExternalDirectoryEffect(ctx, filepath, {
|
|
||||||
bypass: Boolean(ctx.extra?.["bypassCwdCheck"]),
|
|
||||||
kind: stat?.type === "Directory" ? "directory" : "file",
|
|
||||||
})
|
|
||||||
|
|
||||||
yield* Effect.promise(() =>
|
|
||||||
ctx.ask({
|
|
||||||
permission: "read",
|
|
||||||
patterns: [filepath],
|
|
||||||
always: ["*"],
|
|
||||||
metadata: {},
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
if (!stat) return yield* miss(filepath)
|
|
||||||
|
|
||||||
if (stat.type === "Directory") {
|
|
||||||
const items = yield* list(filepath)
|
|
||||||
const limit = params.limit ?? DEFAULT_READ_LIMIT
|
|
||||||
const offset = params.offset ?? 1
|
|
||||||
const start = offset - 1
|
|
||||||
const sliced = items.slice(start, start + limit)
|
|
||||||
const truncated = start + sliced.length < items.length
|
|
||||||
|
|
||||||
return {
|
|
||||||
title,
|
|
||||||
output: [
|
|
||||||
`<path>${filepath}</path>`,
|
|
||||||
`<type>directory</type>`,
|
|
||||||
`<entries>`,
|
|
||||||
sliced.join("\n"),
|
|
||||||
truncated
|
|
||||||
? `\n(Showing ${sliced.length} of ${items.length} entries. Use 'offset' parameter to read beyond entry ${offset + sliced.length})`
|
|
||||||
: `\n(${items.length} entries)`,
|
|
||||||
`</entries>`,
|
|
||||||
].join("\n"),
|
|
||||||
metadata: {
|
|
||||||
preview: sliced.slice(0, 20).join("\n"),
|
|
||||||
truncated,
|
|
||||||
loaded: [] as string[],
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const loaded = yield* instruction.resolve(ctx.messages, filepath, ctx.messageID)
|
|
||||||
|
|
||||||
const mime = AppFileSystem.mimeType(filepath)
|
|
||||||
const isImage = mime.startsWith("image/") && mime !== "image/svg+xml" && mime !== "image/vnd.fastbidsheet"
|
|
||||||
const isPdf = mime === "application/pdf"
|
|
||||||
if (isImage || isPdf) {
|
|
||||||
const msg = `${isImage ? "Image" : "PDF"} read successfully`
|
|
||||||
return {
|
|
||||||
title,
|
|
||||||
output: msg,
|
|
||||||
metadata: {
|
|
||||||
preview: msg,
|
|
||||||
truncated: false,
|
|
||||||
loaded: loaded.map((item) => item.filepath),
|
|
||||||
},
|
|
||||||
attachments: [
|
|
||||||
{
|
|
||||||
type: "file" as const,
|
|
||||||
mime,
|
|
||||||
url: `data:${mime};base64,${Buffer.from(yield* fs.readFile(filepath)).toString("base64")}`,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (yield* Effect.promise(() => isBinaryFile(filepath, Number(stat.size)))) {
|
|
||||||
return yield* Effect.fail(new Error(`Cannot read binary file: ${filepath}`))
|
|
||||||
}
|
|
||||||
|
|
||||||
const file = yield* Effect.promise(() =>
|
|
||||||
lines(filepath, { limit: params.limit ?? DEFAULT_READ_LIMIT, offset: params.offset ?? 1 }),
|
|
||||||
)
|
|
||||||
if (file.count < file.offset && !(file.count === 0 && file.offset === 1)) {
|
|
||||||
return yield* Effect.fail(
|
|
||||||
new Error(`Offset ${file.offset} is out of range for this file (${file.count} lines)`),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
let output = [`<path>${filepath}</path>`, `<type>file</type>`, "<content>"].join("\n")
|
|
||||||
output += file.raw.map((line, i) => `${i + file.offset}: ${line}`).join("\n")
|
|
||||||
|
|
||||||
const last = file.offset + file.raw.length - 1
|
|
||||||
const next = last + 1
|
|
||||||
const truncated = file.more || file.cut
|
|
||||||
if (file.cut) {
|
|
||||||
output += `\n\n(Output capped at ${MAX_BYTES_LABEL}. Showing lines ${file.offset}-${last}. Use offset=${next} to continue.)`
|
|
||||||
} else if (file.more) {
|
|
||||||
output += `\n\n(Showing lines ${file.offset}-${last} of ${file.count}. Use offset=${next} to continue.)`
|
|
||||||
} else {
|
|
||||||
output += `\n\n(End of file - total ${file.count} lines)`
|
|
||||||
}
|
|
||||||
output += "\n</content>"
|
|
||||||
|
|
||||||
yield* warm(filepath, ctx.sessionID)
|
|
||||||
|
|
||||||
if (loaded.length > 0) {
|
|
||||||
output += `\n\n<system-reminder>\n${loaded.map((item) => item.content).join("\n\n")}\n</system-reminder>`
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
title,
|
|
||||||
output,
|
|
||||||
metadata: {
|
|
||||||
preview: file.raw.slice(0, 20).join("\n"),
|
|
||||||
truncated,
|
|
||||||
loaded: loaded.map((item) => item.filepath),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
description: DESCRIPTION,
|
description: DESCRIPTION,
|
||||||
parameters,
|
parameters,
|
||||||
async execute(params: z.infer<typeof parameters>, ctx) {
|
async execute(params: z.infer<typeof parameters>, ctx) {
|
||||||
return Effect.runPromise(run(params, ctx).pipe(Effect.orDie))
|
return Effect.runPromise(run(deps, params, ctx).pipe(Effect.orDie))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,87 @@
|
||||||
|
import type { Agent } from "../agent/agent"
|
||||||
|
import { Config } from "../config/config"
|
||||||
|
import { Session } from "../session"
|
||||||
|
import { SessionPrompt } from "../session/prompt"
|
||||||
|
import { SessionID } from "../session/schema"
|
||||||
|
import type { ModelID, ProviderID } from "../provider/schema"
|
||||||
|
import { Effect } from "effect"
|
||||||
|
|
||||||
|
type Ref = {
|
||||||
|
providerID: ProviderID
|
||||||
|
modelID: ModelID
|
||||||
|
}
|
||||||
|
|
||||||
|
type Parts = Awaited<ReturnType<typeof SessionPrompt.resolvePromptParts>>
|
||||||
|
type Reply = Awaited<ReturnType<typeof SessionPrompt.prompt>>
|
||||||
|
|
||||||
|
type Deps = {
|
||||||
|
cfg: Effect.Effect<Config.Info>
|
||||||
|
get: (taskID: string) => Effect.Effect<Session.Info | undefined>
|
||||||
|
create: (input: { parentID: SessionID; title: string }) => Effect.Effect<Session.Info>
|
||||||
|
resolve: (prompt: string) => Effect.Effect<Parts>
|
||||||
|
prompt: (input: {
|
||||||
|
sessionID: SessionID
|
||||||
|
model: Ref
|
||||||
|
agent: string
|
||||||
|
tools: Record<string, boolean>
|
||||||
|
parts: Parts
|
||||||
|
}) => Effect.Effect<Reply>
|
||||||
|
}
|
||||||
|
|
||||||
|
type Input = {
|
||||||
|
parentID: SessionID
|
||||||
|
taskID?: string
|
||||||
|
description: string
|
||||||
|
prompt: string
|
||||||
|
agent: Agent.Info
|
||||||
|
model: Ref
|
||||||
|
start?: (sessionID: SessionID, model: Ref) => Promise<void> | void
|
||||||
|
}
|
||||||
|
|
||||||
|
export function tools(agent: Agent.Info, cfg: Config.Info) {
|
||||||
|
const task = agent.permission.some((rule) => rule.permission === "task")
|
||||||
|
const todo = agent.permission.some((rule) => rule.permission === "todowrite")
|
||||||
|
return {
|
||||||
|
...(todo ? {} : { todowrite: false }),
|
||||||
|
...(task ? {} : { task: false }),
|
||||||
|
...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((tool) => [tool, false])),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function output(sessionID: SessionID, text: string) {
|
||||||
|
return [
|
||||||
|
`task_id: ${sessionID} (for resuming to continue this task if needed)`,
|
||||||
|
"",
|
||||||
|
"<task_result>",
|
||||||
|
text,
|
||||||
|
"</task_result>",
|
||||||
|
].join("\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
export const run = Effect.fn("Subtask.run")(function* (deps: Deps, input: Input) {
|
||||||
|
const cfg = yield* deps.cfg
|
||||||
|
const model = input.agent.model ?? input.model
|
||||||
|
const found = input.taskID ? yield* deps.get(input.taskID) : undefined
|
||||||
|
const session = found
|
||||||
|
? found
|
||||||
|
: yield* deps.create({
|
||||||
|
parentID: input.parentID,
|
||||||
|
title: input.description + ` (@${input.agent.name} subagent)`,
|
||||||
|
})
|
||||||
|
|
||||||
|
yield* Effect.promise(() => Promise.resolve(input.start?.(session.id, model)))
|
||||||
|
|
||||||
|
const result = yield* deps.prompt({
|
||||||
|
sessionID: session.id,
|
||||||
|
model,
|
||||||
|
agent: input.agent.name,
|
||||||
|
tools: tools(input.agent, cfg),
|
||||||
|
parts: yield* deps.resolve(input.prompt),
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
sessionID: session.id,
|
||||||
|
model,
|
||||||
|
text: result.parts.findLast((part) => part.type === "text")?.text ?? "",
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
@ -1,16 +1,17 @@
|
||||||
import { Tool } from "./tool"
|
import { Tool } from "./tool"
|
||||||
import DESCRIPTION from "./task.txt"
|
import DESCRIPTION from "./task.txt"
|
||||||
import z from "zod"
|
import z from "zod"
|
||||||
import { Session } from "../session"
|
import { Effect } from "effect"
|
||||||
import { SessionID, MessageID } from "../session/schema"
|
|
||||||
import { MessageV2 } from "../session/message-v2"
|
|
||||||
import { Identifier } from "../id/id"
|
|
||||||
import { Agent } from "../agent/agent"
|
|
||||||
import { SessionPrompt } from "../session/prompt"
|
|
||||||
import { iife } from "@/util/iife"
|
|
||||||
import { defer } from "@/util/defer"
|
|
||||||
import { Config } from "../config/config"
|
import { Config } from "../config/config"
|
||||||
|
import { Session } from "../session"
|
||||||
|
import { SessionPrompt } from "../session/prompt"
|
||||||
|
import { MessageV2 } from "../session/message-v2"
|
||||||
|
import { Agent } from "../agent/agent"
|
||||||
|
import type { SessionID } from "../session/schema"
|
||||||
|
import { MessageID, SessionID as SessionRef } from "../session/schema"
|
||||||
|
import { defer } from "@/util/defer"
|
||||||
import { Permission } from "@/permission"
|
import { Permission } from "@/permission"
|
||||||
|
import { output, run } from "./subtask"
|
||||||
|
|
||||||
const parameters = z.object({
|
const parameters = z.object({
|
||||||
description: z.string().describe("A short (3-5 words) description of the task"),
|
description: z.string().describe("A short (3-5 words) description of the task"),
|
||||||
|
|
@ -45,8 +46,6 @@ export const TaskTool = Tool.define("task", async (ctx) => {
|
||||||
description,
|
description,
|
||||||
parameters,
|
parameters,
|
||||||
async execute(params: z.infer<typeof parameters>, ctx) {
|
async execute(params: z.infer<typeof parameters>, ctx) {
|
||||||
const config = await Config.get()
|
|
||||||
|
|
||||||
// Skip permission check when user explicitly invoked via @ or command subtask
|
// Skip permission check when user explicitly invoked via @ or command subtask
|
||||||
if (!ctx.extra?.bypassAgentCheck) {
|
if (!ctx.extra?.bypassAgentCheck) {
|
||||||
await ctx.ask({
|
await ctx.ask({
|
||||||
|
|
@ -62,104 +61,58 @@ export const TaskTool = Tool.define("task", async (ctx) => {
|
||||||
|
|
||||||
const agent = await Agent.get(params.subagent_type)
|
const agent = await Agent.get(params.subagent_type)
|
||||||
if (!agent) throw new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`)
|
if (!agent) throw new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`)
|
||||||
|
|
||||||
const hasTaskPermission = agent.permission.some((rule) => rule.permission === "task")
|
|
||||||
const hasTodoWritePermission = agent.permission.some((rule) => rule.permission === "todowrite")
|
|
||||||
|
|
||||||
const session = await iife(async () => {
|
|
||||||
if (params.task_id) {
|
|
||||||
const found = await Session.get(SessionID.make(params.task_id)).catch(() => {})
|
|
||||||
if (found) return found
|
|
||||||
}
|
|
||||||
|
|
||||||
return await Session.create({
|
|
||||||
parentID: ctx.sessionID,
|
|
||||||
title: params.description + ` (@${agent.name} subagent)`,
|
|
||||||
permission: [
|
|
||||||
...(hasTodoWritePermission
|
|
||||||
? []
|
|
||||||
: [
|
|
||||||
{
|
|
||||||
permission: "todowrite" as const,
|
|
||||||
pattern: "*" as const,
|
|
||||||
action: "deny" as const,
|
|
||||||
},
|
|
||||||
]),
|
|
||||||
...(hasTaskPermission
|
|
||||||
? []
|
|
||||||
: [
|
|
||||||
{
|
|
||||||
permission: "task" as const,
|
|
||||||
pattern: "*" as const,
|
|
||||||
action: "deny" as const,
|
|
||||||
},
|
|
||||||
]),
|
|
||||||
...(config.experimental?.primary_tools?.map((t) => ({
|
|
||||||
pattern: "*",
|
|
||||||
action: "allow" as const,
|
|
||||||
permission: t,
|
|
||||||
})) ?? []),
|
|
||||||
],
|
|
||||||
})
|
|
||||||
})
|
|
||||||
const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID })
|
const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID })
|
||||||
if (msg.info.role !== "assistant") throw new Error("Not an assistant message")
|
if (msg.info.role !== "assistant") throw new Error("Not an assistant message")
|
||||||
|
|
||||||
const model = agent.model ?? {
|
let child: SessionID | undefined
|
||||||
modelID: msg.info.modelID,
|
const cancel = () => {
|
||||||
providerID: msg.info.providerID,
|
if (!child) return
|
||||||
}
|
SessionPrompt.cancel(child)
|
||||||
|
|
||||||
ctx.metadata({
|
|
||||||
title: params.description,
|
|
||||||
metadata: {
|
|
||||||
sessionId: session.id,
|
|
||||||
model,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
const messageID = MessageID.ascending()
|
|
||||||
|
|
||||||
function cancel() {
|
|
||||||
SessionPrompt.cancel(session.id)
|
|
||||||
}
|
}
|
||||||
ctx.abort.addEventListener("abort", cancel)
|
ctx.abort.addEventListener("abort", cancel)
|
||||||
using _ = defer(() => ctx.abort.removeEventListener("abort", cancel))
|
using _ = defer(() => ctx.abort.removeEventListener("abort", cancel))
|
||||||
const promptParts = await SessionPrompt.resolvePromptParts(params.prompt)
|
|
||||||
|
|
||||||
const result = await SessionPrompt.prompt({
|
const task = await Effect.runPromise(
|
||||||
messageID,
|
run(
|
||||||
sessionID: session.id,
|
{
|
||||||
model: {
|
cfg: Effect.promise(() => Config.get()),
|
||||||
modelID: model.modelID,
|
get: (taskID) => Effect.promise(() => Session.get(SessionRef.make(taskID)).catch(() => undefined)),
|
||||||
providerID: model.providerID,
|
create: (input) => Effect.promise(() => Session.create(input)),
|
||||||
},
|
resolve: (prompt) => Effect.promise(() => SessionPrompt.resolvePromptParts(prompt)),
|
||||||
agent: agent.name,
|
prompt: (input) =>
|
||||||
tools: {
|
Effect.promise(() => SessionPrompt.prompt({ ...input, messageID: MessageID.ascending() })),
|
||||||
...(hasTodoWritePermission ? {} : { todowrite: false }),
|
},
|
||||||
...(hasTaskPermission ? {} : { task: false }),
|
{
|
||||||
...Object.fromEntries((config.experimental?.primary_tools ?? []).map((t) => [t, false])),
|
parentID: ctx.sessionID,
|
||||||
},
|
taskID: params.task_id,
|
||||||
parts: promptParts,
|
description: params.description,
|
||||||
})
|
prompt: params.prompt,
|
||||||
|
agent,
|
||||||
const text = result.parts.findLast((x) => x.type === "text")?.text ?? ""
|
model: {
|
||||||
|
modelID: msg.info.modelID,
|
||||||
const output = [
|
providerID: msg.info.providerID,
|
||||||
`task_id: ${session.id} (for resuming to continue this task if needed)`,
|
},
|
||||||
"",
|
start(sessionID, model) {
|
||||||
"<task_result>",
|
child = sessionID
|
||||||
text,
|
ctx.metadata({
|
||||||
"</task_result>",
|
title: params.description,
|
||||||
].join("\n")
|
metadata: {
|
||||||
|
sessionId: sessionID,
|
||||||
|
model,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
title: params.description,
|
title: params.description,
|
||||||
metadata: {
|
metadata: {
|
||||||
sessionId: session.id,
|
sessionId: task.sessionID,
|
||||||
model,
|
model: task.model,
|
||||||
},
|
},
|
||||||
output,
|
output: output(task.sessionID, task.text),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,20 @@ export namespace Tool {
|
||||||
metadata(input: { title?: string; metadata?: M }): void
|
metadata(input: { title?: string; metadata?: M }): void
|
||||||
ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void>
|
ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function context<M extends Metadata = Metadata>(
|
||||||
|
input: Omit<Context<M>, "abort" | "callID"> & {
|
||||||
|
abort?: AbortSignal
|
||||||
|
callID?: string
|
||||||
|
},
|
||||||
|
): Context<M> {
|
||||||
|
return {
|
||||||
|
...input,
|
||||||
|
abort: input.abort ?? new AbortController().signal,
|
||||||
|
callID: input.callID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export interface Def<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
|
export interface Def<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
|
||||||
description: string
|
description: string
|
||||||
parameters: Parameters
|
parameters: Parameters
|
||||||
|
|
|
||||||
|
|
@ -250,7 +250,7 @@ describe("Runner", () => {
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
const s = yield* Scope.Scope
|
const s = yield* Scope.Scope
|
||||||
const runner = Runner.make<string>(s)
|
const runner = Runner.make<string>(s)
|
||||||
const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done"))
|
const result = yield* runner.startShell(Effect.succeed("shell-done"))
|
||||||
expect(result).toBe("shell-done")
|
expect(result).toBe("shell-done")
|
||||||
expect(runner.busy).toBe(false)
|
expect(runner.busy).toBe(false)
|
||||||
}),
|
}),
|
||||||
|
|
@ -264,7 +264,7 @@ describe("Runner", () => {
|
||||||
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
|
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
|
|
||||||
const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit)
|
const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
|
||||||
expect(Exit.isFailure(exit)).toBe(true)
|
expect(Exit.isFailure(exit)).toBe(true)
|
||||||
|
|
||||||
yield* runner.cancel
|
yield* runner.cancel
|
||||||
|
|
@ -279,12 +279,10 @@ describe("Runner", () => {
|
||||||
const runner = Runner.make<string>(s)
|
const runner = Runner.make<string>(s)
|
||||||
const gate = yield* Deferred.make<void>()
|
const gate = yield* Deferred.make<void>()
|
||||||
|
|
||||||
const sh = yield* runner
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
|
||||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
|
|
||||||
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
|
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
|
||||||
expect(Exit.isFailure(exit)).toBe(true)
|
expect(Exit.isFailure(exit)).toBe(true)
|
||||||
|
|
||||||
yield* Deferred.succeed(gate, undefined)
|
yield* Deferred.succeed(gate, undefined)
|
||||||
|
|
@ -302,37 +300,26 @@ describe("Runner", () => {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
const sh = yield* runner
|
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
|
||||||
.startShell((signal) =>
|
|
||||||
Effect.promise(
|
|
||||||
() =>
|
|
||||||
new Promise<string>((resolve) => {
|
|
||||||
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
|
|
||||||
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
|
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
|
||||||
expect(Exit.isFailure(exit)).toBe(true)
|
expect(Exit.isFailure(exit)).toBe(true)
|
||||||
|
|
||||||
yield* runner.cancel
|
yield* runner.cancel
|
||||||
const done = yield* Fiber.await(sh)
|
const done = yield* Fiber.await(sh)
|
||||||
expect(Exit.isSuccess(done)).toBe(true)
|
expect(Exit.isFailure(done)).toBe(true)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
it.live(
|
it.live(
|
||||||
"cancel interrupts shell that ignores abort signal",
|
"cancel interrupts shell",
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
const s = yield* Scope.Scope
|
const s = yield* Scope.Scope
|
||||||
const runner = Runner.make<string>(s)
|
const runner = Runner.make<string>(s)
|
||||||
const gate = yield* Deferred.make<void>()
|
const gate = yield* Deferred.make<void>()
|
||||||
|
|
||||||
const sh = yield* runner
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
|
||||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
|
|
||||||
const stop = yield* runner.cancel.pipe(Effect.forkChild)
|
const stop = yield* runner.cancel.pipe(Effect.forkChild)
|
||||||
|
|
@ -356,9 +343,7 @@ describe("Runner", () => {
|
||||||
const runner = Runner.make<string>(s)
|
const runner = Runner.make<string>(s)
|
||||||
const gate = yield* Deferred.make<void>()
|
const gate = yield* Deferred.make<void>()
|
||||||
|
|
||||||
const sh = yield* runner
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
|
||||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
expect(runner.state._tag).toBe("Shell")
|
expect(runner.state._tag).toBe("Shell")
|
||||||
|
|
||||||
|
|
@ -384,9 +369,7 @@ describe("Runner", () => {
|
||||||
const calls = yield* Ref.make(0)
|
const calls = yield* Ref.make(0)
|
||||||
const gate = yield* Deferred.make<void>()
|
const gate = yield* Deferred.make<void>()
|
||||||
|
|
||||||
const sh = yield* runner
|
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
|
||||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
|
|
||||||
const work = Effect.gen(function* () {
|
const work = Effect.gen(function* () {
|
||||||
|
|
@ -414,16 +397,7 @@ describe("Runner", () => {
|
||||||
const runner = Runner.make<string>(s)
|
const runner = Runner.make<string>(s)
|
||||||
const gate = yield* Deferred.make<void>()
|
const gate = yield* Deferred.make<void>()
|
||||||
|
|
||||||
const sh = yield* runner
|
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
|
||||||
.startShell((signal) =>
|
|
||||||
Effect.promise(
|
|
||||||
() =>
|
|
||||||
new Promise<string>((resolve) => {
|
|
||||||
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
|
|
||||||
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
|
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
|
||||||
|
|
@ -478,7 +452,7 @@ describe("Runner", () => {
|
||||||
const runner = Runner.make<string>(s, {
|
const runner = Runner.make<string>(s, {
|
||||||
onBusy: Ref.update(count, (n) => n + 1),
|
onBusy: Ref.update(count, (n) => n + 1),
|
||||||
})
|
})
|
||||||
yield* runner.startShell((_signal) => Effect.succeed("done"))
|
yield* runner.startShell(Effect.succeed("done"))
|
||||||
expect(yield* Ref.get(count)).toBe(1)
|
expect(yield* Ref.get(count)).toBe(1)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
@ -509,9 +483,7 @@ describe("Runner", () => {
|
||||||
const runner = Runner.make<string>(s)
|
const runner = Runner.make<string>(s)
|
||||||
const gate = yield* Deferred.make<void>()
|
const gate = yield* Deferred.make<void>()
|
||||||
|
|
||||||
const fiber = yield* runner
|
const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
|
||||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
|
|
||||||
.pipe(Effect.forkChild)
|
|
||||||
yield* Effect.sleep("10 millis")
|
yield* Effect.sleep("10 millis")
|
||||||
expect(runner.busy).toBe(true)
|
expect(runner.busy).toBe(true)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,7 @@
|
||||||
import { NodeFileSystem } from "@effect/platform-node"
|
import { NodeFileSystem } from "@effect/platform-node"
|
||||||
import { expect, spyOn } from "bun:test"
|
import { expect } from "bun:test"
|
||||||
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
|
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
|
||||||
import path from "path"
|
import path from "path"
|
||||||
import z from "zod"
|
|
||||||
import { Agent as AgentSvc } from "../../src/agent/agent"
|
import { Agent as AgentSvc } from "../../src/agent/agent"
|
||||||
import { Bus } from "../../src/bus"
|
import { Bus } from "../../src/bus"
|
||||||
import { Command } from "../../src/command"
|
import { Command } from "../../src/command"
|
||||||
|
|
@ -29,7 +28,6 @@ import { MessageID, PartID, SessionID } from "../../src/session/schema"
|
||||||
import { SessionStatus } from "../../src/session/status"
|
import { SessionStatus } from "../../src/session/status"
|
||||||
import { Shell } from "../../src/shell/shell"
|
import { Shell } from "../../src/shell/shell"
|
||||||
import { Snapshot } from "../../src/snapshot"
|
import { Snapshot } from "../../src/snapshot"
|
||||||
import { TaskTool } from "../../src/tool/task"
|
|
||||||
import { ToolRegistry } from "../../src/tool/registry"
|
import { ToolRegistry } from "../../src/tool/registry"
|
||||||
import { Truncate } from "../../src/tool/truncate"
|
import { Truncate } from "../../src/tool/truncate"
|
||||||
import { Log } from "../../src/util/log"
|
import { Log } from "../../src/util/log"
|
||||||
|
|
@ -629,41 +627,26 @@ it.live(
|
||||||
provideTmpdirInstance(
|
provideTmpdirInstance(
|
||||||
(dir) =>
|
(dir) =>
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
const ready = defer<void>()
|
const { prompt, chat, sessions } = yield* boot()
|
||||||
const aborted = defer<void>()
|
const llm = yield* TestLLMServer
|
||||||
const init = spyOn(TaskTool, "init").mockImplementation(async () => ({
|
yield* llm.hang
|
||||||
description: "task",
|
|
||||||
parameters: z.object({
|
|
||||||
description: z.string(),
|
|
||||||
prompt: z.string(),
|
|
||||||
subagent_type: z.string(),
|
|
||||||
task_id: z.string().optional(),
|
|
||||||
command: z.string().optional(),
|
|
||||||
}),
|
|
||||||
execute: async (_args, ctx) => {
|
|
||||||
ready.resolve()
|
|
||||||
ctx.abort.addEventListener("abort", () => aborted.resolve(), { once: true })
|
|
||||||
await new Promise<void>(() => {})
|
|
||||||
return {
|
|
||||||
title: "",
|
|
||||||
metadata: {
|
|
||||||
sessionId: SessionID.make("task"),
|
|
||||||
model: ref,
|
|
||||||
},
|
|
||||||
output: "",
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
yield* Effect.addFinalizer(() => Effect.sync(() => init.mockRestore()))
|
|
||||||
|
|
||||||
const { prompt, chat } = yield* boot()
|
|
||||||
const msg = yield* user(chat.id, "hello")
|
const msg = yield* user(chat.id, "hello")
|
||||||
yield* addSubtask(chat.id, msg.id)
|
yield* addSubtask(chat.id, msg.id)
|
||||||
|
|
||||||
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
|
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
|
||||||
yield* Effect.promise(() => ready.promise)
|
|
||||||
|
yield* Effect.gen(function* () {
|
||||||
|
while (true) {
|
||||||
|
const [child] = yield* sessions.children(chat.id)
|
||||||
|
if (child) {
|
||||||
|
const msgs = yield* sessions.messages({ sessionID: child.id })
|
||||||
|
if (msgs.some((msg) => msg.info.role === "assistant")) return
|
||||||
|
}
|
||||||
|
yield* Effect.sleep("10 millis")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
yield* prompt.cancel(chat.id)
|
yield* prompt.cancel(chat.id)
|
||||||
yield* Effect.promise(() => aborted.promise)
|
|
||||||
|
|
||||||
const exit = yield* Fiber.await(fiber)
|
const exit = yield* Fiber.await(fiber)
|
||||||
expect(Exit.isSuccess(exit)).toBe(true)
|
expect(Exit.isSuccess(exit)).toBe(true)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue