Merge 2423da194c into ae614d919f
commit
d3a2432969
|
|
@ -1036,6 +1036,24 @@ export namespace Config {
|
|||
.positive()
|
||||
.optional()
|
||||
.describe("Timeout in milliseconds for model context protocol (MCP) requests"),
|
||||
tool_timeout: z
|
||||
.number()
|
||||
.int()
|
||||
.positive()
|
||||
.optional()
|
||||
.describe("Maximum duration in milliseconds before the watchdog force-errors a stuck tool"),
|
||||
task_timeout: z
|
||||
.number()
|
||||
.int()
|
||||
.positive()
|
||||
.optional()
|
||||
.describe("Maximum duration in milliseconds before the watchdog force-errors a stuck task tool"),
|
||||
idle_timeout: z
|
||||
.number()
|
||||
.int()
|
||||
.positive()
|
||||
.optional()
|
||||
.describe("Duration in milliseconds of inactivity before the watchdog cancels an idle session"),
|
||||
})
|
||||
.optional(),
|
||||
})
|
||||
|
|
|
|||
|
|
@ -11,6 +11,18 @@ import { Command } from "../command"
|
|||
import { Instance } from "./instance"
|
||||
import { Log } from "@/util/log"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { Database, sql } from "../storage/db"
|
||||
import { PartTable, SessionTable } from "../session/session.sql"
|
||||
import { SessionPrompt } from "../session/prompt"
|
||||
import { SessionActivity } from "../session/activity"
|
||||
import { SessionID } from "../session/schema"
|
||||
import { Config } from "../config/config"
|
||||
|
||||
const log = Log.create({ service: "bootstrap" })
|
||||
|
||||
const WATCHDOG_INTERVAL = 60_000
|
||||
const MAX_RUNNING = 45 * 60 * 1_000
|
||||
const DEFAULT_IDLE = 5 * 60 * 1_000
|
||||
|
||||
export async function InstanceBootstrap() {
|
||||
Log.Default.info("bootstrapping", { directory: Instance.directory })
|
||||
|
|
@ -22,6 +34,9 @@ export async function InstanceBootstrap() {
|
|||
FileWatcher.init()
|
||||
Vcs.init()
|
||||
Snapshot.init()
|
||||
SessionActivity.init()
|
||||
cleanupOrphanedParts()
|
||||
watchdog()
|
||||
|
||||
Bus.subscribe(Command.Event.Executed, async (payload) => {
|
||||
if (payload.properties.name === Command.Default.INIT) {
|
||||
|
|
@ -29,3 +44,203 @@ export async function InstanceBootstrap() {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark any tool parts left in "running" state from a previous process as errored.
|
||||
* When the process exits (crash or clean shutdown), in-flight tool executions
|
||||
* are lost but their DB state remains "running" forever. This recovers them.
|
||||
*/
|
||||
function cleanupOrphanedParts() {
|
||||
const now = Date.now()
|
||||
Database.use((db) => {
|
||||
const orphaned = db
|
||||
.select({ id: PartTable.id })
|
||||
.from(PartTable)
|
||||
.where(
|
||||
sql`json_extract(${PartTable.data}, '$.type') = 'tool'
|
||||
AND json_extract(${PartTable.data}, '$.state.status') = 'running'`,
|
||||
)
|
||||
.all()
|
||||
if (orphaned.length === 0) return
|
||||
log.info("cleaning up orphaned tool parts", { count: orphaned.length })
|
||||
db.update(PartTable)
|
||||
.set({
|
||||
data: sql`json_set(
|
||||
json_set(
|
||||
json_set(${PartTable.data}, '$.state.status', 'error'),
|
||||
'$.state.error', 'Tool execution orphaned by process restart'
|
||||
),
|
||||
'$.state.time.end', ${now}
|
||||
)`,
|
||||
})
|
||||
.where(
|
||||
sql`json_extract(${PartTable.data}, '$.type') = 'tool'
|
||||
AND json_extract(${PartTable.data}, '$.state.status') = 'running'`,
|
||||
)
|
||||
.run()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Single watchdog tick: find tool parts stuck in "running" beyond the cutoff,
|
||||
* filter to leaf-level tools, cancel their sessions, and force-error the
|
||||
* DB rows as a safety net.
|
||||
*
|
||||
* Only cancels "leaf" stuck tools — i.e. non-task tools that are the actual
|
||||
* root cause. Task tools that are waiting on a child session with its own
|
||||
* stuck tool are left alone so the normal error-propagation path can run:
|
||||
* child cancel → task tool resolves → parent LLM processes the error.
|
||||
*
|
||||
* Exported for testing.
|
||||
*/
|
||||
export function watchdogTick(cutoff: number, idle?: number, taskCutoff?: number) {
|
||||
Database.use((db) => {
|
||||
const stuck = db
|
||||
.select({
|
||||
id: PartTable.id,
|
||||
session_id: PartTable.session_id,
|
||||
tool: sql<string>`json_extract(${PartTable.data}, '$.tool')`,
|
||||
child: sql<string | null>`json_extract(${PartTable.data}, '$.state.metadata.sessionId')`,
|
||||
start: sql<number>`json_extract(${PartTable.data}, '$.state.time.start')`,
|
||||
})
|
||||
.from(PartTable)
|
||||
.where(
|
||||
sql`json_extract(${PartTable.data}, '$.type') = 'tool'
|
||||
AND json_extract(${PartTable.data}, '$.state.status') = 'running'
|
||||
AND json_extract(${PartTable.data}, '$.state.time.start') < ${Math.max(cutoff, taskCutoff ?? cutoff)}`,
|
||||
)
|
||||
.all()
|
||||
// Apply per-tool-type cutoff: task tools use taskCutoff, others use cutoff
|
||||
.filter((r) => r.start < (r.tool === "task" ? (taskCutoff ?? cutoff) : cutoff))
|
||||
|
||||
const cancelled = new Set<SessionID>()
|
||||
|
||||
if (stuck.length > 0) {
|
||||
// Sessions that contain at least one stuck tool
|
||||
const stuckSessions = new Set(stuck.map((r) => SessionID.make(r.session_id)))
|
||||
|
||||
// A task tool whose child session also has stuck tools is just
|
||||
// waiting — it will resolve once the child is cancelled.
|
||||
// Everything else (non-task tools, or task tools whose child has
|
||||
// no stuck tools) is a leaf that we must force-error.
|
||||
const leaf = stuck.filter((r) => {
|
||||
if (r.tool !== "task") return true
|
||||
if (!r.child) return true
|
||||
return !stuckSessions.has(SessionID.make(r.child))
|
||||
})
|
||||
|
||||
log.warn("watchdog: found stuck tool parts", {
|
||||
total: stuck.length,
|
||||
leaf: leaf.length,
|
||||
ids: stuck.map((r) => r.id),
|
||||
})
|
||||
|
||||
if (leaf.length > 0) {
|
||||
// For task-tool leaves, cancel the *child* session so the task tool's
|
||||
// normal error-propagation path runs: child cancel → SessionPrompt.prompt()
|
||||
// resolves → task tool returns structured TIMEOUT to the parent LLM.
|
||||
// For non-task leaves, cancel the owning session directly.
|
||||
for (const r of leaf) {
|
||||
if (r.tool === "task" && r.child) {
|
||||
const sid = SessionID.make(r.child)
|
||||
if (cancelled.has(sid)) continue
|
||||
cancelled.add(sid)
|
||||
log.warn("watchdog: cancelling stuck child session", { child: r.child, parent: r.session_id })
|
||||
SessionPrompt.cancel(sid).catch(() => {})
|
||||
} else {
|
||||
const sid = SessionID.make(r.session_id)
|
||||
if (cancelled.has(sid)) continue
|
||||
cancelled.add(sid)
|
||||
log.warn("watchdog: cancelling stuck session", { sessionID: r.session_id })
|
||||
SessionPrompt.cancel(sid).catch(() => {})
|
||||
}
|
||||
}
|
||||
|
||||
// DB update as redundant safety net — only for leaf tools
|
||||
const now = Date.now()
|
||||
for (const r of leaf) {
|
||||
db.update(PartTable)
|
||||
.set({
|
||||
data: sql`json_set(
|
||||
json_set(
|
||||
json_set(${PartTable.data}, '$.state.status', 'error'),
|
||||
'$.state.error', 'Tool execution exceeded maximum allowed duration (watchdog)'
|
||||
),
|
||||
'$.state.time.end', ${now}
|
||||
)`,
|
||||
})
|
||||
.where(
|
||||
sql`${PartTable.id} = ${r.id}
|
||||
AND json_extract(${PartTable.data}, '$.state.status') = 'running'`,
|
||||
)
|
||||
.run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Independent idle detection sweep ---
|
||||
// Runs on every tick when idle param is provided, regardless of
|
||||
// whether any stuck tool parts were found above.
|
||||
// Only targets child (subagent) sessions — root sessions are never
|
||||
// idle-cancelled since the user controls their lifecycle.
|
||||
if (idle) {
|
||||
const stale = Object.entries(SessionActivity.list())
|
||||
.filter(([id]) => {
|
||||
if (cancelled.has(SessionID.make(id))) return false
|
||||
return SessionActivity.stale(id, idle)
|
||||
})
|
||||
.map(([id]) => id)
|
||||
if (stale.length > 0) {
|
||||
// Batch-check which stale sessions are children (have parent_id)
|
||||
const children = new Set(
|
||||
db
|
||||
.select({ id: SessionTable.id })
|
||||
.from(SessionTable)
|
||||
.where(
|
||||
sql`${SessionTable.id} IN (${sql.join(
|
||||
stale.map((id) => sql`${id}`),
|
||||
sql`, `,
|
||||
)})
|
||||
AND ${SessionTable.parent_id} IS NOT NULL`,
|
||||
)
|
||||
.all()
|
||||
.map((r) => r.id),
|
||||
)
|
||||
for (const id of stale) {
|
||||
const sid = SessionID.make(id)
|
||||
if (!children.has(sid)) continue
|
||||
const ts = SessionActivity.last(id)
|
||||
log.warn("watchdog: idle session detected", {
|
||||
sessionID: id,
|
||||
last: ts,
|
||||
threshold: idle,
|
||||
})
|
||||
cancelled.add(sid)
|
||||
SessionPrompt.cancel(sid).catch(() => {})
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodic scan for tool parts stuck in "running" beyond the configured timeout.
|
||||
* Safety net for cases where the bash hard-stop or abort signal also fails.
|
||||
* Respects both tool_timeout and task_timeout config to avoid killing
|
||||
* long-running but healthy Task tool executions.
|
||||
*/
|
||||
function watchdog() {
|
||||
const timer = setInterval(async () => {
|
||||
try {
|
||||
const cfg = await Config.get()
|
||||
const tool = cfg.experimental?.tool_timeout ?? MAX_RUNNING
|
||||
const task = cfg.experimental?.task_timeout ?? 1_800_000
|
||||
const idle = cfg.experimental?.idle_timeout ?? DEFAULT_IDLE
|
||||
const now = Date.now()
|
||||
watchdogTick(now - tool, idle, now - (task + 60_000))
|
||||
} catch {
|
||||
watchdogTick(Date.now() - MAX_RUNNING)
|
||||
}
|
||||
}, WATCHDOG_INTERVAL)
|
||||
timer.unref()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,78 @@
|
|||
import { Bus } from "@/bus"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { Log } from "@/util/log"
|
||||
|
||||
const log = Log.create({ service: "session.activity" })
|
||||
|
||||
/**
|
||||
* Tracks per-session last-activity timestamps via Bus events.
|
||||
*
|
||||
* Activity signals:
|
||||
* - message.part.delta (token streaming — highest frequency)
|
||||
* - message.part.updated (tool state changes, text completions)
|
||||
*
|
||||
* The watchdog queries `stale(sessionID, threshold)` to detect sessions
|
||||
* that have had no activity for longer than the threshold, distinguishing
|
||||
* "genuinely stuck / idle" from "actively streaming or executing tools".
|
||||
*
|
||||
* Root sessions (no parentID) are never subject to idle detection — only
|
||||
* subagent sessions spawned by the task tool are monitored.
|
||||
*/
|
||||
export namespace SessionActivity {
|
||||
const state = Instance.state(() => {
|
||||
const data: Record<string, number> = {}
|
||||
return data
|
||||
})
|
||||
|
||||
/** Update the last-activity timestamp for a session. */
|
||||
export function touch(id: string, now = Date.now()) {
|
||||
state()[id] = now
|
||||
}
|
||||
|
||||
/** Return the last-activity timestamp, or undefined if never recorded. */
|
||||
export function last(id: string): number | undefined {
|
||||
return state()[id]
|
||||
}
|
||||
|
||||
/**
|
||||
* True when the session has had no activity for longer than `threshold` ms.
|
||||
* Returns false for sessions with no recorded activity (they haven't
|
||||
* started yet, so they aren't stale).
|
||||
*/
|
||||
export function stale(id: string, threshold: number): boolean {
|
||||
const ts = state()[id]
|
||||
if (ts === undefined) return false
|
||||
return Date.now() - ts > threshold
|
||||
}
|
||||
|
||||
/** Remove tracking for a session (cleanup on cancel / completion). */
|
||||
export function remove(id: string) {
|
||||
delete state()[id]
|
||||
}
|
||||
|
||||
/** Snapshot of all tracked sessions — for diagnostics / logging. */
|
||||
export function list() {
|
||||
return { ...state() }
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to Bus events that indicate session activity.
|
||||
* Call once during bootstrap (idempotent per Instance lifecycle
|
||||
* since Instance.state is scoped to the current instance).
|
||||
*/
|
||||
export function init() {
|
||||
log.info("init")
|
||||
|
||||
// Token deltas — fires on every chunk from the LLM stream.
|
||||
// This is the highest-frequency signal and acts as a natural heartbeat.
|
||||
Bus.subscribe(MessageV2.Event.PartDelta, (evt) => {
|
||||
touch(evt.properties.sessionID)
|
||||
})
|
||||
|
||||
// Part state changes — tool start/complete/error, text start/end, etc.
|
||||
Bus.subscribe(MessageV2.Event.PartUpdated, (evt) => {
|
||||
touch(evt.properties.part.sessionID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,863 @@
|
|||
import { afterEach, describe, expect, mock, test } from "bun:test"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { Session } from "../../src/session"
|
||||
import { Identifier } from "../../src/id/id"
|
||||
import { Database, sql } from "../../src/storage/db"
|
||||
import { PartTable } from "../../src/session/session.sql"
|
||||
import { watchdogTick } from "../../src/project/bootstrap"
|
||||
import { SessionPrompt } from "../../src/session/prompt"
|
||||
import { SessionActivity } from "../../src/session/activity"
|
||||
import { SessionID } from "../../src/session/schema"
|
||||
import { Log } from "../../src/util/log"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
|
||||
/**
|
||||
* Tests for the watchdog's leaf-filtering logic.
|
||||
*
|
||||
* The watchdog scans for tool parts stuck in "running" beyond a cutoff.
|
||||
* The key behavior: task tools whose child session also has stuck tools
|
||||
* are NOT cancelled (they resolve naturally when the child is cancelled).
|
||||
* Only "leaf" tools — non-task tools, or task tools with no live child —
|
||||
* are force-errored.
|
||||
*/
|
||||
|
||||
afterEach(async () => {
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
Log.init({ print: false })
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Insert a message row via raw SQL (avoids type constraints for test data). */
|
||||
function insertMessage(id: string, session: string) {
|
||||
const now = Date.now()
|
||||
Database.use((db) => {
|
||||
db.run(
|
||||
sql.raw(
|
||||
`INSERT INTO message (id, session_id, time_created, time_updated, data)
|
||||
VALUES ('${id}', '${session}', ${now}, ${now},
|
||||
'${JSON.stringify({
|
||||
role: "assistant",
|
||||
time: { created: now },
|
||||
agent: "test",
|
||||
modelID: "test",
|
||||
providerID: "test",
|
||||
parentID: "",
|
||||
mode: "",
|
||||
path: { cwd: "/tmp", root: "/tmp" },
|
||||
cost: 0,
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
})}')`,
|
||||
),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/** Insert a tool part with "running" status via raw SQL. */
|
||||
function insertRunning(opts: {
|
||||
id: string
|
||||
session: string
|
||||
message: string
|
||||
tool: string
|
||||
start: number
|
||||
child?: string
|
||||
}) {
|
||||
const metadata = opts.child ? { sessionId: opts.child } : {}
|
||||
const data = JSON.stringify({
|
||||
type: "tool",
|
||||
callID: `call_${opts.id}`,
|
||||
tool: opts.tool,
|
||||
state: {
|
||||
status: "running",
|
||||
input: {},
|
||||
time: { start: opts.start },
|
||||
metadata,
|
||||
},
|
||||
})
|
||||
Database.use((db) => {
|
||||
db.run(
|
||||
sql.raw(
|
||||
`INSERT INTO part (id, message_id, session_id, time_created, time_updated, data)
|
||||
VALUES ('${opts.id}', '${opts.message}', '${opts.session}', ${opts.start}, ${opts.start}, '${data}')`,
|
||||
),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/** Read a part's status from the DB. */
|
||||
function partStatus(id: string): string {
|
||||
return Database.use((db) => {
|
||||
const row = db
|
||||
.select({
|
||||
status: sql<string>`json_extract(${PartTable.data}, '$.state.status')`,
|
||||
})
|
||||
.from(PartTable)
|
||||
.where(sql`${PartTable.id} = ${id}`)
|
||||
.get()
|
||||
return row!.status
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("watchdog: leaf-filtering", () => {
|
||||
test("single stuck non-task tool is force-errored", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const ses = await Session.create({})
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, ses.id)
|
||||
insertRunning({ id: prt, session: ses.id, message: msg, tool: "bash", start: 1000 })
|
||||
|
||||
watchdogTick(Date.now())
|
||||
|
||||
expect(partStatus(prt)).toBe("error")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("task tool waiting on child with stuck tool is NOT errored", async () => {
|
||||
// 3-level chain: top → child → grandchild
|
||||
// grandchild has a stuck "question" tool (the leaf)
|
||||
// child has a stuck "task" tool pointing at grandchild
|
||||
// top has a stuck "task" tool pointing at child
|
||||
//
|
||||
// Only the grandchild's "question" tool should be errored.
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const top = await Session.create({})
|
||||
const child = await Session.create({ parentID: top.id })
|
||||
const grand = await Session.create({ parentID: child.id })
|
||||
|
||||
const topMsg = Identifier.ascending("message")
|
||||
const childMsg = Identifier.ascending("message")
|
||||
const grandMsg = Identifier.ascending("message")
|
||||
const topPrt = Identifier.ascending("part")
|
||||
const childPrt = Identifier.ascending("part")
|
||||
const grandPrt = Identifier.ascending("part")
|
||||
|
||||
insertMessage(topMsg, top.id)
|
||||
insertMessage(childMsg, child.id)
|
||||
insertMessage(grandMsg, grand.id)
|
||||
|
||||
const old = 1000
|
||||
// Top-level: task tool waiting on child
|
||||
insertRunning({
|
||||
id: topPrt,
|
||||
session: top.id,
|
||||
message: topMsg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
// Child: task tool waiting on grandchild
|
||||
insertRunning({
|
||||
id: childPrt,
|
||||
session: child.id,
|
||||
message: childMsg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: grand.id,
|
||||
})
|
||||
// Grandchild: stuck "question" tool (leaf)
|
||||
insertRunning({
|
||||
id: grandPrt,
|
||||
session: grand.id,
|
||||
message: grandMsg,
|
||||
tool: "question",
|
||||
start: old,
|
||||
})
|
||||
|
||||
watchdogTick(Date.now())
|
||||
|
||||
// Only the grandchild's leaf tool should be errored
|
||||
expect(partStatus(grandPrt)).toBe("error")
|
||||
// Parent task tools should remain running
|
||||
expect(partStatus(childPrt)).toBe("running")
|
||||
expect(partStatus(topPrt)).toBe("running")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("task tool with no child metadata is treated as leaf", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const ses = await Session.create({})
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, ses.id)
|
||||
// Task tool but no child session ID in metadata
|
||||
insertRunning({ id: prt, session: ses.id, message: msg, tool: "task", start: 1000 })
|
||||
|
||||
watchdogTick(Date.now())
|
||||
|
||||
expect(partStatus(prt)).toBe("error")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("task tool whose child has no stuck tools is treated as leaf", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
|
||||
// Parent has task tool pointing at child, but child has NO stuck tools
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: 1000,
|
||||
child: child.id,
|
||||
})
|
||||
|
||||
watchdogTick(Date.now())
|
||||
|
||||
expect(partStatus(prt)).toBe("error")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("no stuck tools means no changes", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
// Nothing stuck — watchdog should be a no-op
|
||||
watchdogTick(Date.now())
|
||||
// No assertion needed; just verify it doesn't throw
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("tools within cutoff are not affected", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const ses = await Session.create({})
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, ses.id)
|
||||
|
||||
// Tool started recently
|
||||
const recent = Date.now()
|
||||
insertRunning({ id: prt, session: ses.id, message: msg, tool: "bash", start: recent })
|
||||
|
||||
// Cutoff is before the tool started — not stuck
|
||||
watchdogTick(recent - 1000)
|
||||
|
||||
expect(partStatus(prt)).toBe("running")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("multiple leaves across different sessions are all errored", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const ses1 = await Session.create({})
|
||||
const ses2 = await Session.create({})
|
||||
const msg1 = Identifier.ascending("message")
|
||||
const msg2 = Identifier.ascending("message")
|
||||
const prt1 = Identifier.ascending("part")
|
||||
const prt2 = Identifier.ascending("part")
|
||||
|
||||
insertMessage(msg1, ses1.id)
|
||||
insertMessage(msg2, ses2.id)
|
||||
|
||||
insertRunning({ id: prt1, session: ses1.id, message: msg1, tool: "bash", start: 1000 })
|
||||
insertRunning({ id: prt2, session: ses2.id, message: msg2, tool: "read", start: 1000 })
|
||||
|
||||
watchdogTick(Date.now())
|
||||
|
||||
expect(partStatus(prt1)).toBe("error")
|
||||
expect(partStatus(prt2)).toBe("error")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("mixed: leaf tools errored, waiting task tools preserved", async () => {
|
||||
// Two independent chains:
|
||||
// Chain A: parentA → task(childA) → childA has stuck bash
|
||||
// Chain B: standalone session with stuck read tool
|
||||
// Both leaf tools errored, parentA's task tool preserved.
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parentA = await Session.create({})
|
||||
const childA = await Session.create({ parentID: parentA.id })
|
||||
const sesB = await Session.create({})
|
||||
|
||||
const msgA = Identifier.ascending("message")
|
||||
const msgChild = Identifier.ascending("message")
|
||||
const msgB = Identifier.ascending("message")
|
||||
const prtA = Identifier.ascending("part")
|
||||
const prtChild = Identifier.ascending("part")
|
||||
const prtB = Identifier.ascending("part")
|
||||
|
||||
insertMessage(msgA, parentA.id)
|
||||
insertMessage(msgChild, childA.id)
|
||||
insertMessage(msgB, sesB.id)
|
||||
|
||||
const old = 1000
|
||||
insertRunning({
|
||||
id: prtA,
|
||||
session: parentA.id,
|
||||
message: msgA,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: childA.id,
|
||||
})
|
||||
insertRunning({ id: prtChild, session: childA.id, message: msgChild, tool: "bash", start: old })
|
||||
insertRunning({ id: prtB, session: sesB.id, message: msgB, tool: "read", start: old })
|
||||
|
||||
watchdogTick(Date.now())
|
||||
|
||||
// Leaf tools errored
|
||||
expect(partStatus(prtChild)).toBe("error")
|
||||
expect(partStatus(prtB)).toBe("error")
|
||||
// Parent's task tool preserved
|
||||
expect(partStatus(prtA)).toBe("running")
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("watchdog: cancel target", () => {
|
||||
test("task tool leaf cancels child session, not parent", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
|
||||
// Task tool whose child has NO stuck tools → leaf
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: 1000,
|
||||
child: child.id,
|
||||
})
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
// Don't call through — no real session running
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
watchdogTick(Date.now())
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Watchdog should cancel the child, not the parent
|
||||
expect(ids).toEqual([child.id])
|
||||
expect(ids).not.toContain(parent.id)
|
||||
// DB part still force-errored as safety net
|
||||
expect(partStatus(prt)).toBe("error")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("non-task leaf cancels owning session", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const ses = await Session.create({})
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, ses.id)
|
||||
insertRunning({ id: prt, session: ses.id, message: msg, tool: "bash", start: 1000 })
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
watchdogTick(Date.now())
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Non-task tool: cancel the owning session directly
|
||||
expect(ids).toEqual([ses.id])
|
||||
expect(partStatus(prt)).toBe("error")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("3-level chain cancels grandchild session only", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const top = await Session.create({})
|
||||
const child = await Session.create({ parentID: top.id })
|
||||
const grand = await Session.create({ parentID: child.id })
|
||||
|
||||
const topMsg = Identifier.ascending("message")
|
||||
const childMsg = Identifier.ascending("message")
|
||||
const grandMsg = Identifier.ascending("message")
|
||||
const topPrt = Identifier.ascending("part")
|
||||
const childPrt = Identifier.ascending("part")
|
||||
const grandPrt = Identifier.ascending("part")
|
||||
|
||||
insertMessage(topMsg, top.id)
|
||||
insertMessage(childMsg, child.id)
|
||||
insertMessage(grandMsg, grand.id)
|
||||
|
||||
const old = 1000
|
||||
insertRunning({
|
||||
id: topPrt,
|
||||
session: top.id,
|
||||
message: topMsg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
insertRunning({
|
||||
id: childPrt,
|
||||
session: child.id,
|
||||
message: childMsg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: grand.id,
|
||||
})
|
||||
insertRunning({
|
||||
id: grandPrt,
|
||||
session: grand.id,
|
||||
message: grandMsg,
|
||||
tool: "question",
|
||||
start: old,
|
||||
})
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
watchdogTick(Date.now())
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Only the grandchild's owning session should be cancelled
|
||||
// (non-task "question" tool → cancel owning session)
|
||||
expect(ids).toEqual([grand.id])
|
||||
expect(ids).not.toContain(top.id)
|
||||
expect(ids).not.toContain(child.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// SessionActivity unit tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("SessionActivity", () => {
|
||||
test("touch and last", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
expect(SessionActivity.last("a")).toBeUndefined()
|
||||
SessionActivity.touch("a", 1000)
|
||||
expect(SessionActivity.last("a")).toBe(1000)
|
||||
SessionActivity.touch("a", 2000)
|
||||
expect(SessionActivity.last("a")).toBe(2000)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("stale returns false for unknown session", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
// No activity recorded — not stale (hasn't started yet)
|
||||
expect(SessionActivity.stale("unknown", 1000)).toBe(false)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("stale detects inactivity", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const old = Date.now() - 10_000
|
||||
SessionActivity.touch("a", old)
|
||||
// 5s threshold — session was active 10s ago → stale
|
||||
expect(SessionActivity.stale("a", 5_000)).toBe(true)
|
||||
// 15s threshold — session was active 10s ago → not stale yet
|
||||
expect(SessionActivity.stale("a", 15_000)).toBe(false)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("remove clears tracking", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
SessionActivity.touch("a", 1000)
|
||||
expect(SessionActivity.last("a")).toBe(1000)
|
||||
SessionActivity.remove("a")
|
||||
expect(SessionActivity.last("a")).toBeUndefined()
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Idle detection integration tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("watchdog: idle detection", () => {
|
||||
test("idle subagent is cancelled when stale", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
|
||||
// Parent has a task tool pointing at child — child session has
|
||||
// stuck tools too (so it's NOT a leaf, but IS in the stuck set)
|
||||
const childMsg = Identifier.ascending("message")
|
||||
const childPrt = Identifier.ascending("part")
|
||||
insertMessage(childMsg, child.id)
|
||||
|
||||
const old = 1000
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
// Child has a running tool too — parent task tool is NOT a leaf
|
||||
insertRunning({
|
||||
id: childPrt,
|
||||
session: child.id,
|
||||
message: childMsg,
|
||||
tool: "bash",
|
||||
start: old,
|
||||
})
|
||||
|
||||
// Simulate the child having had activity a long time ago
|
||||
SessionActivity.touch(child.id, Date.now() - 600_000)
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
// The leaf filter will catch the bash tool (non-task leaf).
|
||||
// The idle check should ALSO cancel the child because it's stale.
|
||||
// But since the bash tool's cancel already targets child.id,
|
||||
// the idle path adds it too — deduplicated by the `cancelled` set.
|
||||
watchdogTick(Date.now(), 300_000)
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Child should be cancelled (by either leaf detection or idle — at least once)
|
||||
expect(ids).toContain(child.id)
|
||||
// Parent should NOT be cancelled
|
||||
expect(ids).not.toContain(parent.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("active subagent is NOT cancelled by idle check", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
|
||||
const old = 1000
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
|
||||
// Child had very recent activity — NOT stale
|
||||
SessionActivity.touch(child.id, Date.now() - 1000)
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
// The task tool IS a leaf (child has no stuck tools) so it
|
||||
// gets cancelled by the leaf filter. But the idle check
|
||||
// should NOT independently cancel it since it's active.
|
||||
watchdogTick(Date.now(), 300_000)
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Child cancelled by leaf filter (task tool whose child has no stuck tools)
|
||||
expect(ids).toContain(child.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("idle detection skipped when idle param is omitted", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
const childMsg = Identifier.ascending("message")
|
||||
const childPrt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
insertMessage(childMsg, child.id)
|
||||
|
||||
const old = 1000
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
insertRunning({
|
||||
id: childPrt,
|
||||
session: child.id,
|
||||
message: childMsg,
|
||||
tool: "bash",
|
||||
start: old,
|
||||
})
|
||||
|
||||
// Child is stale
|
||||
SessionActivity.touch(child.id, Date.now() - 600_000)
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
// No idle param — only leaf detection runs
|
||||
watchdogTick(Date.now())
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// The bash tool is the leaf, so child.id is cancelled by leaf filter.
|
||||
// But the parent task tool is NOT a leaf (child has stuck bash),
|
||||
// so it stays running. The idle path did NOT run (no idle param).
|
||||
expect(ids).toEqual([child.id])
|
||||
// Parent task tool preserved
|
||||
expect(partStatus(prt)).toBe("running")
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("stale session with running tool IS idle-cancelled (idle sweep has no running-tool guard)", async () => {
|
||||
// After restructuring, the idle sweep simply checks SessionActivity.stale()
|
||||
// for all tracked sessions — it does not exempt sessions with running tools.
|
||||
// The stuck-tool path handles running tools separately.
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
const childMsg = Identifier.ascending("message")
|
||||
const childPrt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
insertMessage(childMsg, child.id)
|
||||
|
||||
const old = 1000
|
||||
// Parent task tool pointing at child — both stuck past cutoff
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
// Child has a running bash tool
|
||||
insertRunning({
|
||||
id: childPrt,
|
||||
session: child.id,
|
||||
message: childMsg,
|
||||
tool: "bash",
|
||||
start: old,
|
||||
})
|
||||
|
||||
// Child's last Bus activity was 10 minutes ago — stale
|
||||
SessionActivity.touch(child.id, Date.now() - 600_000)
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
// cutoff=0: no stuck tools found. But idle sweep runs
|
||||
// independently and child is stale → cancelled.
|
||||
watchdogTick(0, 300_000)
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Child cancelled by idle sweep (stale activity)
|
||||
expect(ids).toEqual([child.id])
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("session with no recorded activity is not considered stale", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const parent = await Session.create({})
|
||||
const child = await Session.create({ parentID: parent.id })
|
||||
|
||||
const msg = Identifier.ascending("message")
|
||||
const prt = Identifier.ascending("part")
|
||||
insertMessage(msg, parent.id)
|
||||
|
||||
const old = 1000
|
||||
insertRunning({
|
||||
id: prt,
|
||||
session: parent.id,
|
||||
message: msg,
|
||||
tool: "task",
|
||||
start: old,
|
||||
child: child.id,
|
||||
})
|
||||
|
||||
// Deliberately do NOT touch SessionActivity for the child —
|
||||
// simulates a session that hasn't started streaming yet.
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
watchdogTick(Date.now(), 300_000)
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Child is cancelled by the leaf filter (task tool whose child
|
||||
// has no stuck tools), but the idle check should NOT have fired
|
||||
// because there's no recorded activity (stale() returns false).
|
||||
expect(ids).toEqual([child.id])
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("root session is never idle-cancelled even when stale", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const root = await Session.create({})
|
||||
|
||||
// Root session has stale activity — but idle sweep should skip it
|
||||
SessionActivity.touch(root.id, Date.now() - 600_000)
|
||||
|
||||
const ids: string[] = []
|
||||
const orig = SessionPrompt.cancel
|
||||
const spy = mock(async (id: string) => {
|
||||
ids.push(id)
|
||||
})
|
||||
SessionPrompt.cancel = spy as typeof SessionPrompt.cancel
|
||||
|
||||
try {
|
||||
watchdogTick(0, 300_000)
|
||||
} finally {
|
||||
SessionPrompt.cancel = orig
|
||||
}
|
||||
|
||||
// Root session must NOT be cancelled by idle sweep
|
||||
expect(ids).not.toContain(root.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
Loading…
Reference in New Issue