refactor(truncation): effectify TruncateService, delete Scheduler

- TruncateService as global Effect service on ManagedRuntime with
  FileSystem for cleanup (readDirectory + remove)
- Hourly cleanup via Effect.forkScoped + Schedule.spaced
- Split into truncate-service.ts (Effect) + truncation.ts (facade)
  to avoid circular import with runtime.ts
- Shared TRUNCATION_DIR constant in truncation-dir.ts
- Delete Scheduler module (no remaining consumers)
- Remove Truncate.init() from bootstrap (cleanup starts automatically
  when runtime is created)
pull/18019/head
Kit Langton 2026-03-17 08:47:53 -04:00
parent ab89f84b0c
commit d2d8aaae22
7 changed files with 67 additions and 161 deletions

View File

@ -3,10 +3,13 @@ import { AccountService } from "@/account/service"
import { AuthService } from "@/auth/service"
import { Instances } from "@/effect/instances"
import type { InstanceServices } from "@/effect/instances"
import { TruncateService } from "@/tool/truncate-service"
import { Instance } from "@/project/instance"
export const runtime = ManagedRuntime.make(
Layer.mergeAll(AccountService.defaultLayer, Instances.layer).pipe(Layer.provideMerge(AuthService.defaultLayer)),
Layer.mergeAll(AccountService.defaultLayer, Instances.layer, TruncateService.layer).pipe(
Layer.provideMerge(AuthService.defaultLayer),
),
)
export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {

View File

@ -11,7 +11,6 @@ import { VcsService } from "./vcs"
import { Log } from "@/util/log"
import { ShareNext } from "@/share/share-next"
import { Snapshot } from "../snapshot"
import { Truncate } from "../tool/truncation"
import { runPromiseInstance } from "@/effect/runtime"
export async function InstanceBootstrap() {
@ -24,7 +23,6 @@ export async function InstanceBootstrap() {
File.init()
await runPromiseInstance(VcsService.use((s) => s.init()))
Snapshot.init()
Truncate.init()
Bus.subscribe(Command.Event.Executed, async (payload) => {
if (payload.properties.name === Command.Default.INIT) {

View File

@ -1,61 +0,0 @@
import { Instance } from "../project/instance"
import { Log } from "../util/log"
export namespace Scheduler {
const log = Log.create({ service: "scheduler" })
export type Task = {
id: string
interval: number
run: () => Promise<void>
scope?: "instance" | "global"
}
type Timer = ReturnType<typeof setInterval>
type Entry = {
tasks: Map<string, Task>
timers: Map<string, Timer>
}
const create = (): Entry => {
const tasks = new Map<string, Task>()
const timers = new Map<string, Timer>()
return { tasks, timers }
}
const shared = create()
const state = Instance.state(
() => create(),
async (entry) => {
for (const timer of entry.timers.values()) {
clearInterval(timer)
}
entry.tasks.clear()
entry.timers.clear()
},
)
export function register(task: Task) {
const scope = task.scope ?? "instance"
const entry = scope === "global" ? shared : state()
const current = entry.timers.get(task.id)
if (current && scope === "global") return
if (current) clearInterval(current)
entry.tasks.set(task.id, task)
void run(task)
const timer = setInterval(() => {
void run(task)
}, task.interval)
timer.unref()
entry.timers.set(task.id, timer)
}
async function run(task: Task) {
log.info("run", { id: task.id })
await task.run().catch((error) => {
log.error("run failed", { id: task.id, error })
})
}
}

View File

@ -0,0 +1,52 @@
import path from "path"
import { Log } from "../util/log"
import { TRUNCATION_DIR } from "./truncation-dir"
import { Identifier } from "../id/id"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { Cause, Duration, Effect, FileSystem, Layer, Schedule, ServiceMap } from "effect"
const log = Log.create({ service: "truncation" })
const RETENTION = Duration.days(7)
export namespace TruncateService {
export interface Service {
readonly cleanup: () => Effect.Effect<void>
}
}
export class TruncateService extends ServiceMap.Service<TruncateService, TruncateService.Service>()(
"@opencode/Truncate",
) {
static readonly layer = Layer.effect(
TruncateService,
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const cleanup = Effect.fn("TruncateService.cleanup")(function* () {
const cutoff = Identifier.timestamp(Identifier.create("tool", false, Date.now() - Duration.toMillis(RETENTION)))
const entries = yield* fs
.readDirectory(TRUNCATION_DIR)
.pipe(
Effect.map((all) => all.filter((name) => name.startsWith("tool_"))),
Effect.catch(() => Effect.succeed([])),
)
for (const entry of entries) {
if (Identifier.timestamp(entry) >= cutoff) continue
yield* fs.remove(path.join(TRUNCATION_DIR, entry)).pipe(Effect.catch(() => Effect.void))
}
})
// Start hourly cleanup — scoped to runtime lifetime
yield* cleanup().pipe(
Effect.catchCause((cause) => {
log.error("truncation cleanup failed", { cause: Cause.pretty(cause) })
return Effect.void
}),
Effect.repeat(Schedule.spaced(Duration.hours(1))),
Effect.forkScoped,
)
return TruncateService.of({ cleanup })
}),
).pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer))
}

View File

@ -0,0 +1,4 @@
import path from "path"
import { Global } from "../global"
export const TRUNCATION_DIR = path.join(Global.Path.data, "tool-output")

View File

@ -1,21 +1,18 @@
import fs from "fs/promises"
import path from "path"
import { Global } from "../global"
import { Identifier } from "../id/id"
import { PermissionNext } from "../permission/next"
import { TRUNCATION_DIR } from "./truncation-dir"
import type { Agent } from "../agent/agent"
import { Scheduler } from "../scheduler"
import { Filesystem } from "../util/filesystem"
import { Glob } from "../util/glob"
import { ToolID } from "./schema"
import { TruncateService } from "./truncate-service"
import { runtime } from "@/effect/runtime"
export namespace Truncate {
export const MAX_LINES = 2000
export const MAX_BYTES = 50 * 1024
export const DIR = path.join(Global.Path.data, "tool-output")
export const GLOB = path.join(DIR, "*")
const RETENTION_MS = 7 * 24 * 60 * 60 * 1000 // 7 days
const HOUR_MS = 60 * 60 * 1000
export const DIR = TRUNCATION_DIR
export const GLOB = path.join(TRUNCATION_DIR, "*")
export type Result = { content: string; truncated: false } | { content: string; truncated: true; outputPath: string }
@ -25,22 +22,8 @@ export namespace Truncate {
direction?: "head" | "tail"
}
export function init() {
Scheduler.register({
id: "tool.truncation.cleanup",
interval: HOUR_MS,
run: cleanup,
scope: "global",
})
}
export async function cleanup() {
const cutoff = Identifier.timestamp(Identifier.create("tool", false, Date.now() - RETENTION_MS))
const entries = await Glob.scan("tool_*", { cwd: DIR, include: "file" }).catch(() => [] as string[])
for (const entry of entries) {
if (Identifier.timestamp(entry) >= cutoff) continue
await fs.unlink(path.join(DIR, entry)).catch(() => {})
}
return runtime.runPromise(TruncateService.use((s) => s.cleanup()))
}
function hasTaskTool(agent?: Agent.Info): boolean {

View File

@ -1,73 +0,0 @@
import { describe, expect, test } from "bun:test"
import { Scheduler } from "../src/scheduler"
import { Instance } from "../src/project/instance"
import { tmpdir } from "./fixture/fixture"
describe("Scheduler.register", () => {
const hour = 60 * 60 * 1000
test("defaults to instance scope per directory", async () => {
await using one = await tmpdir({ git: true })
await using two = await tmpdir({ git: true })
const runs = { count: 0 }
const id = "scheduler.instance." + Math.random().toString(36).slice(2)
const task = {
id,
interval: hour,
run: async () => {
runs.count += 1
},
}
await Instance.provide({
directory: one.path,
fn: async () => {
Scheduler.register(task)
await Instance.dispose()
},
})
expect(runs.count).toBe(1)
await Instance.provide({
directory: two.path,
fn: async () => {
Scheduler.register(task)
await Instance.dispose()
},
})
expect(runs.count).toBe(2)
})
test("global scope runs once across instances", async () => {
await using one = await tmpdir({ git: true })
await using two = await tmpdir({ git: true })
const runs = { count: 0 }
const id = "scheduler.global." + Math.random().toString(36).slice(2)
const task = {
id,
interval: hour,
run: async () => {
runs.count += 1
},
scope: "global" as const,
}
await Instance.provide({
directory: one.path,
fn: async () => {
Scheduler.register(task)
await Instance.dispose()
},
})
expect(runs.count).toBe(1)
await Instance.provide({
directory: two.path,
fn: async () => {
Scheduler.register(task)
await Instance.dispose()
},
})
expect(runs.count).toBe(1)
})
})