refactor(effect): effectify task tool execution
Move the task tool body onto named Effect helpers and keep the Promise bridge at the outer init/execute boundaries. This matches the read tool migration shape while preserving the existing SessionPrompt runtime flow.pull/21017/head
parent
78f7258c6d
commit
babb46327f
|
|
@ -5,11 +5,9 @@ import { Effect } from "effect"
|
|||
import { Session } from "../session"
|
||||
import { SessionID, MessageID } from "../session/schema"
|
||||
import { MessageV2 } from "../session/message-v2"
|
||||
import { Identifier } from "../id/id"
|
||||
import { Agent } from "../agent/agent"
|
||||
import { SessionPrompt } from "../session/prompt"
|
||||
import { iife } from "@/util/iife"
|
||||
import { defer } from "@/util/defer"
|
||||
import { Config } from "../config/config"
|
||||
import { Permission } from "@/permission"
|
||||
|
||||
|
|
@ -32,148 +30,169 @@ export const TaskTool = Tool.defineEffect(
|
|||
const agent = yield* Agent.Service
|
||||
const config = yield* Config.Service
|
||||
|
||||
return async (ctx) => {
|
||||
const agents = await agent.list().pipe(
|
||||
Effect.map((x) => x.filter((a) => a.mode !== "primary")),
|
||||
Effect.runPromise,
|
||||
)
|
||||
const list = Effect.fn("TaskTool.list")(function* (caller?: Tool.InitContext["agent"]) {
|
||||
const items = yield* agent.list().pipe(Effect.map((items) => items.filter((item) => item.mode !== "primary")))
|
||||
const filtered = caller
|
||||
? items.filter((item) => Permission.evaluate("task", item.name, caller.permission).action !== "deny")
|
||||
: items
|
||||
return filtered.toSorted((a, b) => a.name.localeCompare(b.name))
|
||||
})
|
||||
|
||||
const caller = ctx?.agent
|
||||
const accessibleAgents = caller
|
||||
? agents.filter((a) => Permission.evaluate("task", a.name, caller.permission).action !== "deny")
|
||||
: agents
|
||||
const list = accessibleAgents.toSorted((a, b) => a.name.localeCompare(b.name))
|
||||
|
||||
const description = DESCRIPTION.replace(
|
||||
const desc = Effect.fn("TaskTool.desc")(function* (caller?: Tool.InitContext["agent"]) {
|
||||
const items = yield* list(caller)
|
||||
return DESCRIPTION.replace(
|
||||
"{agents}",
|
||||
list
|
||||
.map((a) => `- ${a.name}: ${a.description ?? "This subagent should only be called manually by the user."}`)
|
||||
items
|
||||
.map(
|
||||
(item) =>
|
||||
`- ${item.name}: ${item.description ?? "This subagent should only be called manually by the user."}`,
|
||||
)
|
||||
.join("\n"),
|
||||
)
|
||||
})
|
||||
|
||||
return {
|
||||
description,
|
||||
parameters,
|
||||
async execute(params: z.infer<typeof parameters>, ctx) {
|
||||
const cfg = await config.get().pipe(Effect.runPromise)
|
||||
const run = Effect.fn("TaskTool.execute")(function* (params: z.infer<typeof parameters>, ctx: Tool.Context) {
|
||||
const cfg = yield* config.get()
|
||||
|
||||
// Skip permission check when user explicitly invoked via @ or command subtask
|
||||
if (!ctx.extra?.bypassAgentCheck) {
|
||||
await ctx.ask({
|
||||
permission: "task",
|
||||
patterns: [params.subagent_type],
|
||||
always: ["*"],
|
||||
metadata: {
|
||||
description: params.description,
|
||||
subagent_type: params.subagent_type,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
const next = await agent
|
||||
.get(params.subagent_type)
|
||||
.pipe(Effect.runPromise)
|
||||
.catch(() => undefined)
|
||||
if (!next) throw new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`)
|
||||
|
||||
const hasTaskPermission = next.permission.some((rule) => rule.permission === "task")
|
||||
const hasTodoWritePermission = next.permission.some((rule) => rule.permission === "todowrite")
|
||||
|
||||
const session = await iife(async () => {
|
||||
if (params.task_id) {
|
||||
const found = await Session.get(SessionID.make(params.task_id)).catch(() => {})
|
||||
if (found) return found
|
||||
}
|
||||
|
||||
return await Session.create({
|
||||
parentID: ctx.sessionID,
|
||||
title: params.description + ` (@${next.name} subagent)`,
|
||||
permission: [
|
||||
...(hasTodoWritePermission
|
||||
? []
|
||||
: [
|
||||
{
|
||||
permission: "todowrite" as const,
|
||||
pattern: "*" as const,
|
||||
action: "deny" as const,
|
||||
},
|
||||
]),
|
||||
...(hasTaskPermission
|
||||
? []
|
||||
: [
|
||||
{
|
||||
permission: "task" as const,
|
||||
pattern: "*" as const,
|
||||
action: "deny" as const,
|
||||
},
|
||||
]),
|
||||
...(cfg.experimental?.primary_tools?.map((t) => ({
|
||||
pattern: "*",
|
||||
action: "allow" as const,
|
||||
permission: t,
|
||||
})) ?? []),
|
||||
],
|
||||
})
|
||||
})
|
||||
const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID })
|
||||
if (msg.info.role !== "assistant") throw new Error("Not an assistant message")
|
||||
|
||||
const model = next.model ?? {
|
||||
modelID: msg.info.modelID,
|
||||
providerID: msg.info.providerID,
|
||||
}
|
||||
|
||||
ctx.metadata({
|
||||
title: params.description,
|
||||
if (!ctx.extra?.bypassAgentCheck) {
|
||||
yield* Effect.promise(() =>
|
||||
ctx.ask({
|
||||
permission: "task",
|
||||
patterns: [params.subagent_type],
|
||||
always: ["*"],
|
||||
metadata: {
|
||||
sessionId: session.id,
|
||||
model,
|
||||
description: params.description,
|
||||
subagent_type: params.subagent_type,
|
||||
},
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const messageID = MessageID.ascending()
|
||||
const next = yield* agent.get(params.subagent_type).pipe(Effect.catch(() => Effect.succeed(undefined)))
|
||||
if (!next) {
|
||||
return yield* Effect.fail(new Error(`Unknown agent type: ${params.subagent_type} is not a valid agent type`))
|
||||
}
|
||||
|
||||
function cancel() {
|
||||
SessionPrompt.cancel(session.id)
|
||||
const hasTask = next.permission.some((rule) => rule.permission === "task")
|
||||
const hasTodo = next.permission.some((rule) => rule.permission === "todowrite")
|
||||
|
||||
const session = yield* Effect.promise(() =>
|
||||
iife(async () => {
|
||||
if (params.task_id) {
|
||||
const found = await Session.get(SessionID.make(params.task_id)).catch(() => {})
|
||||
if (found) return found
|
||||
}
|
||||
ctx.abort.addEventListener("abort", cancel)
|
||||
using _ = defer(() => ctx.abort.removeEventListener("abort", cancel))
|
||||
const promptParts = await SessionPrompt.resolvePromptParts(params.prompt)
|
||||
|
||||
const result = await SessionPrompt.prompt({
|
||||
messageID,
|
||||
sessionID: session.id,
|
||||
model: {
|
||||
modelID: model.modelID,
|
||||
providerID: model.providerID,
|
||||
},
|
||||
agent: next.name,
|
||||
tools: {
|
||||
...(hasTodoWritePermission ? {} : { todowrite: false }),
|
||||
...(hasTaskPermission ? {} : { task: false }),
|
||||
...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((t) => [t, false])),
|
||||
},
|
||||
parts: promptParts,
|
||||
return Session.create({
|
||||
parentID: ctx.sessionID,
|
||||
title: params.description + ` (@${next.name} subagent)`,
|
||||
permission: [
|
||||
...(hasTodo
|
||||
? []
|
||||
: [
|
||||
{
|
||||
permission: "todowrite" as const,
|
||||
pattern: "*" as const,
|
||||
action: "deny" as const,
|
||||
},
|
||||
]),
|
||||
...(hasTask
|
||||
? []
|
||||
: [
|
||||
{
|
||||
permission: "task" as const,
|
||||
pattern: "*" as const,
|
||||
action: "deny" as const,
|
||||
},
|
||||
]),
|
||||
...(cfg.experimental?.primary_tools?.map((item) => ({
|
||||
pattern: "*",
|
||||
action: "allow" as const,
|
||||
permission: item,
|
||||
})) ?? []),
|
||||
],
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
const text = result.parts.findLast((x) => x.type === "text")?.text ?? ""
|
||||
const msg = yield* Effect.sync(() => MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }))
|
||||
if (msg.info.role !== "assistant") return yield* Effect.fail(new Error("Not an assistant message"))
|
||||
|
||||
const output = [
|
||||
`task_id: ${session.id} (for resuming to continue this task if needed)`,
|
||||
"",
|
||||
"<task_result>",
|
||||
text,
|
||||
"</task_result>",
|
||||
].join("\n")
|
||||
const model = next.model ?? {
|
||||
modelID: msg.info.modelID,
|
||||
providerID: msg.info.providerID,
|
||||
}
|
||||
|
||||
ctx.metadata({
|
||||
title: params.description,
|
||||
metadata: {
|
||||
sessionId: session.id,
|
||||
model,
|
||||
},
|
||||
})
|
||||
|
||||
const messageID = MessageID.ascending()
|
||||
|
||||
function cancel() {
|
||||
SessionPrompt.cancel(session.id)
|
||||
}
|
||||
return yield* Effect.acquireUseRelease(
|
||||
Effect.sync(() => {
|
||||
ctx.abort.addEventListener("abort", cancel)
|
||||
}),
|
||||
() => Effect.promise(() => SessionPrompt.resolvePromptParts(params.prompt)),
|
||||
() =>
|
||||
Effect.sync(() => {
|
||||
ctx.abort.removeEventListener("abort", cancel)
|
||||
}),
|
||||
).pipe(
|
||||
Effect.flatMap((parts) =>
|
||||
Effect.promise(() =>
|
||||
SessionPrompt.prompt({
|
||||
messageID,
|
||||
sessionID: session.id,
|
||||
model: {
|
||||
modelID: model.modelID,
|
||||
providerID: model.providerID,
|
||||
},
|
||||
agent: next.name,
|
||||
tools: {
|
||||
...(hasTodo ? {} : { todowrite: false }),
|
||||
...(hasTask ? {} : { task: false }),
|
||||
...Object.fromEntries((cfg.experimental?.primary_tools ?? []).map((item) => [item, false])),
|
||||
},
|
||||
parts,
|
||||
}),
|
||||
),
|
||||
),
|
||||
Effect.map((result) => {
|
||||
const text = result.parts.findLast((item) => item.type === "text")?.text ?? ""
|
||||
return {
|
||||
title: params.description,
|
||||
metadata: {
|
||||
sessionId: session.id,
|
||||
model,
|
||||
},
|
||||
output,
|
||||
output: [
|
||||
`task_id: ${session.id} (for resuming to continue this task if needed)`,
|
||||
"",
|
||||
"<task_result>",
|
||||
text,
|
||||
"</task_result>",
|
||||
].join("\n"),
|
||||
}
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
return async (ctx) => {
|
||||
const description = await Effect.runPromise(desc(ctx?.agent))
|
||||
|
||||
return {
|
||||
description,
|
||||
parameters,
|
||||
async execute(params: z.infer<typeof parameters>, ctx) {
|
||||
return Effect.runPromise(run(params, ctx))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue