From ba41b6928f906bd3d8f0c554eb9c6899d4a51ded Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 6 Apr 2026 11:00:42 -0400 Subject: [PATCH] wip: wire local leto observability --- packages/opencode/src/agent/agent.ts | 2 +- packages/opencode/src/bus/index.ts | 2 +- packages/opencode/src/command/index.ts | 2 +- packages/opencode/src/config/config.ts | 2 +- packages/opencode/src/effect/observability.ts | 28 ++++ packages/opencode/src/effect/run-service.ts | 3 +- packages/opencode/src/file/index.ts | 8 +- packages/opencode/src/file/time.ts | 8 +- packages/opencode/src/file/watcher.ts | 2 +- packages/opencode/src/flag/flag.ts | 2 + packages/opencode/src/format/index.ts | 133 ++++++++++++------ packages/opencode/src/lsp/index.ts | 2 +- packages/opencode/src/mcp/index.ts | 2 +- packages/opencode/src/permission/index.ts | 2 +- packages/opencode/src/plugin/index.ts | 2 +- packages/opencode/src/project/vcs.ts | 58 ++++---- packages/opencode/src/provider/auth.ts | 2 +- packages/opencode/src/pty/index.ts | 2 +- packages/opencode/src/question/index.ts | 2 +- packages/opencode/src/session/compaction.ts | 4 +- packages/opencode/src/session/instruction.ts | 8 +- packages/opencode/src/session/processor.ts | 30 ++++ packages/opencode/src/session/prompt.ts | 32 ++++- packages/opencode/src/session/status.ts | 4 +- packages/opencode/src/session/summary.ts | 4 +- packages/opencode/src/share/share-next.ts | 2 +- packages/opencode/src/skill/index.ts | 2 +- packages/opencode/src/snapshot/index.ts | 63 ++++----- packages/opencode/src/tool/registry.ts | 2 +- packages/opencode/test/preload.ts | 1 + 30 files changed, 265 insertions(+), 151 deletions(-) create mode 100644 packages/opencode/src/effect/observability.ts diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index 0c6fe6ec91..5781ac85f4 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -78,7 +78,7 @@ export namespace Agent { const provider = yield* Provider.Service const state = yield* InstanceState.make( - Effect.fn("Agent.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const cfg = yield* config.get() const skillDirs = yield* skill.dirs() const whitelistedDirs = [Truncate.GLOB, ...skillDirs.map((dir) => path.join(dir, "*"))] diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index fe26a6672e..13856ba9f9 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -47,7 +47,7 @@ export namespace Bus { Service, Effect.gen(function* () { const state = yield* InstanceState.make( - Effect.fn("Bus.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const wildcard = yield* PubSub.unbounded() const typed = new Map>() diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts index 088d7c5659..6ad49c6547 100644 --- a/packages/opencode/src/command/index.ts +++ b/packages/opencode/src/command/index.ts @@ -79,7 +79,7 @@ export namespace Command { const mcp = yield* MCP.Service const skill = yield* Skill.Service - const init = Effect.fn("Command.state")(function* (ctx) { + const init = Effect.fnUntraced(function* (ctx) { const cfg = yield* config.get() const commands: Record = {} diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index b11ae83192..15b1121b89 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -1475,7 +1475,7 @@ export namespace Config { }) const state = yield* InstanceState.make( - Effect.fn("Config.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { return yield* loadInstanceState(ctx) }), ) diff --git a/packages/opencode/src/effect/observability.ts b/packages/opencode/src/effect/observability.ts new file mode 100644 index 0000000000..7e3f1607d3 --- /dev/null +++ b/packages/opencode/src/effect/observability.ts @@ -0,0 +1,28 @@ +import { Layer } from "effect" +import { FetchHttpClient } from "effect/unstable/http" +import { Otlp } from "effect/unstable/observability" +import { Flag } from "@/flag/flag" +import { CHANNEL, VERSION } from "@/installation/meta" + +export namespace Observability { + const base = Flag.OPENCODE_DISABLE_OTLP + ? undefined + : (Flag.OPENCODE_OTLP_BASE_URL ?? (CHANNEL === "local" ? "http://127.0.0.1:27686" : undefined)) + + export const enabled = !!base + + export const layer = !base + ? Layer.empty + : Otlp.layerJson({ + baseUrl: base, + loggerMergeWithExisting: false, + resource: { + serviceName: "opencode", + serviceVersion: VERSION, + attributes: { + "deployment.environment.name": CHANNEL === "local" ? "local" : CHANNEL, + "opencode.client": Flag.OPENCODE_CLIENT, + }, + }, + }).pipe(Layer.provide(FetchHttpClient.layer)) +} diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 619d5be6b5..9eae89c807 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -3,6 +3,7 @@ import * as ServiceMap from "effect/ServiceMap" import { Instance } from "@/project/instance" import { Context } from "@/util/context" import { InstanceRef } from "./instance-ref" +import { Observability } from "./observability" export const memoMap = Layer.makeMemoMapUnsafe() @@ -18,7 +19,7 @@ function attach(effect: Effect.Effect): Effect.Effect export function makeRuntime(service: ServiceMap.Service, layer: Layer.Layer) { let rt: ManagedRuntime.ManagedRuntime | undefined - const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap })) + const getRuntime = () => (rt ??= ManagedRuntime.make(Layer.merge(layer, Observability.layer), { memoMap })) return { runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(attach(service.use(fn))), diff --git a/packages/opencode/src/file/index.ts b/packages/opencode/src/file/index.ts index cdcf80a99e..6104bf831d 100644 --- a/packages/opencode/src/file/index.ts +++ b/packages/opencode/src/file/index.ts @@ -346,11 +346,11 @@ export namespace File { const appFs = yield* AppFileSystem.Service const state = yield* InstanceState.make( - Effect.fn("File.state")(() => - Effect.succeed({ + Effect.fnUntraced(function* () { + return { cache: { files: [], dirs: [] } as Entry, - }), - ), + } + }), ) const scan = Effect.fn("File.scan")(function* () { diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts index bd2b5f04f3..2754b0a8cb 100644 --- a/packages/opencode/src/file/time.ts +++ b/packages/opencode/src/file/time.ts @@ -54,12 +54,12 @@ export namespace FileTime { } }) const state = yield* InstanceState.make( - Effect.fn("FileTime.state")(() => - Effect.succeed({ + Effect.fnUntraced(function* () { + return { reads: new Map>(), locks: new Map(), - }), - ), + } + }), ) const getLock = Effect.fn("FileTime.lock")(function* (filepath: string) { diff --git a/packages/opencode/src/file/watcher.ts b/packages/opencode/src/file/watcher.ts index b78b3a33a0..39bf3dc2d2 100644 --- a/packages/opencode/src/file/watcher.ts +++ b/packages/opencode/src/file/watcher.ts @@ -73,7 +73,7 @@ export namespace FileWatcher { const config = yield* Config.Service const state = yield* InstanceState.make( - Effect.fn("FileWatcher.state")( + Effect.fnUntraced( function* () { if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts index 1ac52dd17f..0d67a64993 100644 --- a/packages/opencode/src/flag/flag.ts +++ b/packages/opencode/src/flag/flag.ts @@ -44,6 +44,8 @@ export namespace Flag { export const OPENCODE_SERVER_PASSWORD = process.env["OPENCODE_SERVER_PASSWORD"] export const OPENCODE_SERVER_USERNAME = process.env["OPENCODE_SERVER_USERNAME"] export const OPENCODE_ENABLE_QUESTION_TOOL = truthy("OPENCODE_ENABLE_QUESTION_TOOL") + export const OPENCODE_OTLP_BASE_URL = process.env["OPENCODE_OTLP_BASE_URL"] + export const OPENCODE_DISABLE_OTLP = truthy("OPENCODE_DISABLE_OTLP") // Experimental export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL") diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index c05c2bf454..59258fb0c6 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -40,7 +40,7 @@ export namespace Format { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const state = yield* InstanceState.make( - Effect.fn("Format.state")(function* (_ctx) { + Effect.fnUntraced(function* (_ctx) { const commands: Record = {} const formatters: Record = {} @@ -84,57 +84,106 @@ export namespace Format { return cmd !== false } - async function getFormatter(ext: string) { - const matching = Object.values(formatters).filter((item) => item.extensions.includes(ext)) - const checks = await Promise.all( - matching.map(async (item) => { - log.info("checking", { name: item.name, ext }) - const cmd = await getCommand(item) - if (cmd) { - log.info("enabled", { name: item.name, ext }) - } - return { - item, - cmd, - } - }), - ) - return checks.filter((x) => x.cmd).map((x) => ({ item: x.item, cmd: x.cmd! })) + function check(item: Formatter.Info, ext: string) { + return Effect.gen(function* () { + yield* Effect.annotateCurrentSpan({ + ext, + formatter: item.name, + }) + log.info("checking", { name: item.name, ext }) + const cmd = yield* Effect.promise(() => getCommand(item)) + if (cmd) { + log.info("enabled", { name: item.name, ext }) + } + yield* Effect.annotateCurrentSpan({ enabled: !!cmd }) + return { + item, + cmd, + } + }).pipe(Effect.withSpan("Format.checkFormatter")) } - function formatFile(filepath: string) { + function resolve(ext: string) { + return Effect.gen(function* () { + const matching = Object.values(formatters).filter((item) => item.extensions.includes(ext)) + const checks = yield* Effect.all(matching.map((item) => check(item, ext))) + const enabled = checks.filter((item) => item.cmd).map((item) => ({ item: item.item, cmd: item.cmd! })) + yield* Effect.annotateCurrentSpan({ + ext, + matched_formatters: matching.map((item) => item.name).join(",") || "none", + enabled_formatters: enabled.map((item) => item.item.name).join(",") || "none", + }) + return { + matching, + enabled, + } + }).pipe(Effect.withSpan("Format.resolveFormatters")) + } + + function spawn(item: Formatter.Info, command: string[], filepath: string) { + return Effect.gen(function* () { + const dir = yield* InstanceState.directory + yield* Effect.annotateCurrentSpan({ + file: filepath, + formatter: item.name, + command: command.join(" "), + }) + return yield* spawner.spawn( + ChildProcess.make(command[0]!, command.slice(1), { + cwd: dir, + env: item.environment, + extendEnv: true, + }), + ) + }).pipe(Effect.withSpan("Format.spawnFormatter")) + } + + function wait( + handle: ChildProcessSpawner.ChildProcessHandle, + item: Formatter.Info, + command: string[], + filepath: string, + ) { + return Effect.gen(function* () { + yield* Effect.annotateCurrentSpan({ + file: filepath, + formatter: item.name, + command: command.join(" "), + }) + return yield* handle.exitCode + }).pipe(Effect.withSpan("Format.waitFormatter")) + } + + function formatFile(filepath: string): Effect.Effect { return Effect.gen(function* () { log.info("formatting", { file: filepath }) const ext = path.extname(filepath) + yield* Effect.annotateCurrentSpan({ file: filepath, ext }) + const fmt = yield* resolve(ext) + yield* Effect.annotateCurrentSpan({ + matched_formatters: fmt.matching.map((item) => item.name).join(",") || "none", + enabled_formatters: fmt.enabled.map((item) => item.item.name).join(",") || "none", + }) - for (const { item, cmd } of yield* Effect.promise(() => getFormatter(ext))) { + for (const { item, cmd } of fmt.enabled) { if (cmd === false) continue log.info("running", { command: cmd }) const replaced = cmd.map((x) => x.replace("$FILE", filepath)) - const dir = yield* InstanceState.directory - const code = yield* spawner - .spawn( - ChildProcess.make(replaced[0]!, replaced.slice(1), { - cwd: dir, - env: item.environment, - extendEnv: true, + const code = yield* spawn(item, replaced, filepath).pipe( + Effect.flatMap((handle) => wait(handle, item, replaced, filepath)), + Effect.scoped, + Effect.catch(() => + Effect.sync(() => { + log.error("failed to format file", { + error: "spawn failed", + command: replaced, + ...item.environment, + file: filepath, + }) + return ChildProcessSpawner.ExitCode(1) }), - ) - .pipe( - Effect.flatMap((handle) => handle.exitCode), - Effect.scoped, - Effect.catch(() => - Effect.sync(() => { - log.error("failed to format file", { - error: "spawn failed", - command: cmd, - ...item.environment, - file: filepath, - }) - return ChildProcessSpawner.ExitCode(1) - }), - ), - ) + ), + ) if (code !== 0) { log.error("failed", { command: cmd, diff --git a/packages/opencode/src/lsp/index.ts b/packages/opencode/src/lsp/index.ts index de87e568f7..820dcff7ec 100644 --- a/packages/opencode/src/lsp/index.ts +++ b/packages/opencode/src/lsp/index.ts @@ -164,7 +164,7 @@ export namespace LSP { const config = yield* Config.Service const state = yield* InstanceState.make( - Effect.fn("LSP.state")(function* () { + Effect.fnUntraced(function* () { const cfg = yield* config.get() const servers: Record = {} diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 8c92bb6b2e..d009ce9d7e 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -478,7 +478,7 @@ export namespace MCP { } const state = yield* InstanceState.make( - Effect.fn("MCP.state")(function* () { + Effect.fnUntraced(function* () { const cfg = yield* cfgSvc.get() const config = cfg.mcp ?? {} const s: State = { diff --git a/packages/opencode/src/permission/index.ts b/packages/opencode/src/permission/index.ts index b2cc0f9bbc..33cca5e847 100644 --- a/packages/opencode/src/permission/index.ts +++ b/packages/opencode/src/permission/index.ts @@ -142,7 +142,7 @@ export namespace Permission { Effect.gen(function* () { const bus = yield* Bus.Service const state = yield* InstanceState.make( - Effect.fn("Permission.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const row = Database.use((db) => db.select().from(PermissionTable).where(eq(PermissionTable.project_id, ctx.project.id)).get(), ) diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index fb60fa096e..53d59b74a3 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -98,7 +98,7 @@ export namespace Plugin { const config = yield* Config.Service const state = yield* InstanceState.make( - Effect.fn("Plugin.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const hooks: Hooks[] = [] const { Server } = yield* Effect.promise(() => import("../server/server")) diff --git a/packages/opencode/src/project/vcs.ts b/packages/opencode/src/project/vcs.ts index 5142079b1d..95d6f363a0 100644 --- a/packages/opencode/src/project/vcs.ts +++ b/packages/opencode/src/project/vcs.ts @@ -147,39 +147,37 @@ export namespace Vcs { const bus = yield* Bus.Service const state = yield* InstanceState.make( - Effect.fn("Vcs.state")((ctx) => - Effect.gen(function* () { - if (ctx.project.vcs !== "git") { - return { current: undefined, root: undefined } - } + Effect.fnUntraced(function* (ctx) { + if (ctx.project.vcs !== "git") { + return { current: undefined, root: undefined } + } - const get = Effect.fnUntraced(function* () { - return yield* git.branch(ctx.directory) - }) - const [current, root] = yield* Effect.all([git.branch(ctx.directory), git.defaultBranch(ctx.directory)], { - concurrency: 2, - }) - const value = { current, root } - log.info("initialized", { branch: value.current, default_branch: value.root?.name }) + const get = Effect.fnUntraced(function* () { + return yield* git.branch(ctx.directory) + }) + const [current, root] = yield* Effect.all([git.branch(ctx.directory), git.defaultBranch(ctx.directory)], { + concurrency: 2, + }) + const value = { current, root } + log.info("initialized", { branch: value.current, default_branch: value.root?.name }) - yield* bus.subscribe(FileWatcher.Event.Updated).pipe( - Stream.filter((evt) => evt.properties.file.endsWith("HEAD")), - Stream.runForEach((_evt) => - Effect.gen(function* () { - const next = yield* get() - if (next !== value.current) { - log.info("branch changed", { from: value.current, to: next }) - value.current = next - yield* bus.publish(Event.BranchUpdated, { branch: next }) - } - }), - ), - Effect.forkScoped, - ) + yield* bus.subscribe(FileWatcher.Event.Updated).pipe( + Stream.filter((evt) => evt.properties.file.endsWith("HEAD")), + Stream.runForEach((_evt) => + Effect.gen(function* () { + const next = yield* get() + if (next !== value.current) { + log.info("branch changed", { from: value.current, to: next }) + value.current = next + yield* bus.publish(Event.BranchUpdated, { branch: next }) + } + }), + ), + Effect.forkScoped, + ) - return value - }), - ), + return value + }), ) return Service.of({ diff --git a/packages/opencode/src/provider/auth.ts b/packages/opencode/src/provider/auth.ts index 38ef4b11f4..209dfd8614 100644 --- a/packages/opencode/src/provider/auth.ts +++ b/packages/opencode/src/provider/auth.ts @@ -117,7 +117,7 @@ export namespace ProviderAuth { const auth = yield* Auth.Service const plugin = yield* Plugin.Service const state = yield* InstanceState.make( - Effect.fn("ProviderAuth.state")(function* () { + Effect.fnUntraced(function* () { const plugins = yield* plugin.list() return { hooks: Record.fromEntries( diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index a97f3373d1..7406cd7116 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -133,7 +133,7 @@ export namespace Pty { } const state = yield* InstanceState.make( - Effect.fn("Pty.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const state = { dir: ctx.directory, sessions: new Map(), diff --git a/packages/opencode/src/question/index.ts b/packages/opencode/src/question/index.ts index 615c699ce9..4a472b90a2 100644 --- a/packages/opencode/src/question/index.ts +++ b/packages/opencode/src/question/index.ts @@ -111,7 +111,7 @@ export namespace Question { Effect.gen(function* () { const bus = yield* Bus.Service const state = yield* InstanceState.make( - Effect.fn("Question.state")(function* () { + Effect.fnUntraced(function* () { const state = { pending: new Map(), } diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 3158393f11..2d216c3203 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -138,7 +138,7 @@ export namespace SessionCompaction { } }) - const processCompaction = Effect.fn("SessionCompaction.process")(function* (input: { + const process = Effect.fn("SessionCompaction.process")(function* (input: { parentID: MessageID messages: MessageV2.WithParts[] sessionID: SessionID @@ -374,7 +374,7 @@ When constructing the summary, try to stick to this template: return Service.of({ isOverflow, prune, - process: processCompaction, + process, create, }) }), diff --git a/packages/opencode/src/session/instruction.ts b/packages/opencode/src/session/instruction.ts index fc90093e99..4c442ba2d2 100644 --- a/packages/opencode/src/session/instruction.ts +++ b/packages/opencode/src/session/instruction.ts @@ -75,12 +75,12 @@ export namespace Instruction { const http = HttpClient.filterStatusOk(withTransientReadRetry(yield* HttpClient.HttpClient)) const state = yield* InstanceState.make( - Effect.fn("Instruction.state")(() => - Effect.succeed({ + Effect.fnUntraced(function* () { + return { // Track which instruction files have already been attached for a given assistant message. claims: new Map>(), - }), - ), + } + }), ) const relative = Effect.fnUntraced(function* (instruction: string) { diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 146c73f277..f86b999062 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -415,9 +415,20 @@ export namespace SessionProcessor { const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) { log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined }) + yield* Effect.logError("session processor failed", { + agent: ctx.assistantMessage.agent, + modelID: ctx.model.id, + providerID: ctx.model.providerID, + sessionID: ctx.sessionID, + }) const error = parse(e) if (MessageV2.ContextOverflowError.isInstance(error)) { ctx.needsCompaction = true + yield* Effect.logWarning("session processor requested compaction", { + modelID: ctx.model.id, + providerID: ctx.model.providerID, + sessionID: ctx.sessionID, + }) yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error }) return } @@ -446,6 +457,18 @@ export namespace SessionProcessor { log.info("process") ctx.needsCompaction = false ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true + yield* Effect.annotateCurrentSpan({ + agent: streamInput.agent.name, + modelID: streamInput.model.id, + providerID: streamInput.model.providerID, + sessionID: ctx.sessionID, + }) + yield* Effect.logInfo("session processor started", { + agent: streamInput.agent.name, + modelID: streamInput.model.id, + providerID: streamInput.model.providerID, + sessionID: ctx.sessionID, + }) return yield* Effect.gen(function* () { yield* Effect.gen(function* () { @@ -459,6 +482,7 @@ export namespace SessionProcessor { Stream.runDrain, ) }).pipe( + Effect.withSpan("SessionProcessor.stream"), Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))), Effect.catchCauseIf( (cause) => !Cause.hasInterruptsOnly(cause), @@ -483,6 +507,12 @@ export namespace SessionProcessor { if (aborted && !ctx.assistantMessage.error) { yield* abort() } + yield* Effect.logInfo("session processor finished", { + aborted, + blocked: ctx.blocked, + compact: ctx.needsCompaction, + sessionID: ctx.sessionID, + }) if (ctx.needsCompaction) return "compact" if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop" return "continue" diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 24996c8d4b..001eeee61c 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -103,7 +103,7 @@ export namespace SessionPrompt { const instruction = yield* Instruction.Service const state = yield* InstanceState.make( - Effect.fn("SessionPrompt.state")(function* () { + Effect.fnUntraced(function* () { const runners = new Map>() yield* Effect.addFinalizer( Effect.fnUntraced(function* () { @@ -1340,12 +1340,15 @@ NOTE: At any point in time through this workflow you should feel free to ask the let structured: unknown | undefined let step = 0 const session = yield* sessions.get(sessionID) + yield* Effect.annotateCurrentSpan({ sessionID }) while (true) { yield* status.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) - let msgs = yield* MessageV2.filterCompactedEffect(sessionID) + let msgs = yield* MessageV2.filterCompactedEffect(sessionID).pipe( + Effect.withSpan("SessionPrompt.loadMessages"), + ) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined @@ -1398,13 +1401,20 @@ NOTE: At any point in time through this workflow you should feel free to ask the } if (task?.type === "compaction") { - const result = yield* compaction.process({ - messages: msgs, - parentID: lastUser.id, - sessionID, + yield* Effect.logWarning("session compaction task", { auto: task.auto, overflow: task.overflow, + sessionID, }) + const result = yield* compaction + .process({ + messages: msgs, + parentID: lastUser.id, + sessionID, + auto: task.auto, + overflow: task.overflow, + }) + .pipe(Effect.withSpan("SessionPrompt.compaction")) if (result === "stop") break continue } @@ -1414,6 +1424,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the lastFinished.summary !== true && (yield* compaction.isOverflow({ tokens: lastFinished.tokens, model })) ) { + yield* Effect.logWarning("session overflow detected", { modelID: model.id, sessionID, step }) yield* compaction.create({ sessionID, agent: lastUser.agent, model: lastUser.model, auto: true }) continue } @@ -1429,6 +1440,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the const maxSteps = agent.steps ?? Infinity const isLastStep = step >= maxSteps msgs = yield* insertReminders({ messages: msgs, agent, session }) + yield* Effect.logInfo("session turn", { + agent: agent.name, + modelID: model.id, + providerID: model.providerID, + sessionID, + step, + }) const msg: MessageV2.Assistant = { id: MessageID.ascending(), @@ -1503,7 +1521,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the Effect.promise(() => SystemPrompt.environment(model)), instruction.system().pipe(Effect.orDie), Effect.promise(() => MessageV2.toModelMessages(msgs, model)), - ]) + ]).pipe(Effect.withSpan("SessionPrompt.buildInput")) const system = [...env, ...(skills ? [skills] : []), ...instructions] const format = lastUser.format ?? { type: "text" as const } if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT) diff --git a/packages/opencode/src/session/status.ts b/packages/opencode/src/session/status.ts index 34a79eed11..bb4321b4f5 100644 --- a/packages/opencode/src/session/status.ts +++ b/packages/opencode/src/session/status.ts @@ -58,7 +58,9 @@ export namespace SessionStatus { const bus = yield* Bus.Service const state = yield* InstanceState.make( - Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map())), + Effect.fnUntraced(function* () { + return new Map() + }), ) const get = Effect.fn("SessionStatus.get")(function* (sessionID: SessionID) { diff --git a/packages/opencode/src/session/summary.ts b/packages/opencode/src/session/summary.ts index f2b53f3baf..045e253891 100644 --- a/packages/opencode/src/session/summary.ts +++ b/packages/opencode/src/session/summary.ts @@ -99,8 +99,8 @@ export namespace SessionSummary { if (part.type === "step-finish" && part.snapshot) to = part.snapshot } } - if (from && to) return yield* snapshot.diffFull(from, to) - return [] + if (!from || !to || from === to) return [] + return yield* snapshot.diffFull(from, to) }) const summarize = Effect.fn("SessionSummary.summarize")(function* (input: { diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 2eb9887ea4..8fa0661e80 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -144,7 +144,7 @@ export namespace ShareNext { } const state: InstanceState = yield* InstanceState.make( - Effect.fn("ShareNext.state")(function* (_ctx) { + Effect.fnUntraced(function* (_ctx) { const cache: State = { queue: new Map(), scope: yield* Scope.make() } yield* Effect.addFinalizer(() => diff --git a/packages/opencode/src/skill/index.ts b/packages/opencode/src/skill/index.ts index a2ac3d351c..9ea205326e 100644 --- a/packages/opencode/src/skill/index.ts +++ b/packages/opencode/src/skill/index.ts @@ -197,7 +197,7 @@ export namespace Skill { const bus = yield* Bus.Service const fsys = yield* AppFileSystem.Service const state = yield* InstanceState.make( - Effect.fn("Skill.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const s: State = { skills: {}, dirs: new Set() } yield* loadSkills(s, config, discovery, bus, fsys, ctx.directory, ctx.worktree) return s diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts index 2db67695ff..944cd01db1 100644 --- a/packages/opencode/src/snapshot/index.ts +++ b/packages/opencode/src/snapshot/index.ts @@ -1,4 +1,3 @@ -import { NodeFileSystem, NodePath } from "@effect/platform-node" import { Cause, Duration, Effect, Layer, Schedule, Semaphore, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import path from "path" @@ -82,7 +81,7 @@ export namespace Snapshot { } const state = yield* InstanceState.make( - Effect.fn("Snapshot.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const state = { directory: ctx.directory, worktree: ctx.worktree, @@ -150,7 +149,7 @@ export namespace Snapshot { yield* fs.writeFileString(target, text ? `${text}\n` : "").pipe(Effect.orDie) }) - const add = Effect.fnUntraced(function* () { + const add = Effect.fn("Snapshot.add")(function* () { yield* sync() const [diff, other] = yield* Effect.all( [ @@ -203,7 +202,7 @@ export namespace Snapshot { } }) - const cleanup = Effect.fnUntraced(function* () { + const cleanup = Effect.fn("Snapshot.cleanup")(function* () { return yield* locked( Effect.gen(function* () { if (!(yield* enabled())) return @@ -221,7 +220,7 @@ export namespace Snapshot { ) }) - const track = Effect.fnUntraced(function* () { + const track = Effect.fn("Snapshot.track")(function* () { return yield* locked( Effect.gen(function* () { if (!(yield* enabled())) return @@ -238,7 +237,9 @@ export namespace Snapshot { log.info("initialized") } yield* add() - const result = yield* git(args(["write-tree"]), { cwd: state.directory }) + const result = yield* git(args(["write-tree"]), { cwd: state.directory }).pipe( + Effect.withSpan("Snapshot.writeTree"), + ) const hash = result.text.trim() log.info("tracking", { hash, cwd: state.directory, git: state.gitdir }) return hash @@ -246,7 +247,7 @@ export namespace Snapshot { ) }) - const patch = Effect.fnUntraced(function* (hash: string) { + const patch = Effect.fn("Snapshot.patch")(function* (hash: string) { return yield* locked( Effect.gen(function* () { yield* add() @@ -273,7 +274,7 @@ export namespace Snapshot { ) }) - const restore = Effect.fnUntraced(function* (snapshot: string) { + const restore = Effect.fn("Snapshot.restore")(function* (snapshot: string) { return yield* locked( Effect.gen(function* () { log.info("restore", { commit: snapshot }) @@ -299,7 +300,7 @@ export namespace Snapshot { ) }) - const revert = Effect.fnUntraced(function* (patches: Snapshot.Patch[]) { + const revert = Effect.fn("Snapshot.revert")(function* (patches: Snapshot.Patch[]) { return yield* locked( Effect.gen(function* () { const ops: { hash: string; file: string; rel: string }[] = [] @@ -414,7 +415,7 @@ export namespace Snapshot { ) }) - const diff = Effect.fnUntraced(function* (hash: string) { + const diff = Effect.fn("Snapshot.diff")(function* (hash: string) { return yield* locked( Effect.gen(function* () { yield* add() @@ -434,7 +435,7 @@ export namespace Snapshot { ) }) - const diffFull = Effect.fnUntraced(function* (from: string, to: string) { + const diffFull = Effect.fn("Snapshot.diffFull")(function* (from: string, to: string) { return yield* locked( Effect.gen(function* () { type Row = { @@ -451,7 +452,7 @@ export namespace Snapshot { ref: string } - const show = Effect.fnUntraced(function* (row: Row) { + const show = Effect.fn("Snapshot.show")(function* (row: Row) { if (row.binary) return ["", ""] if (row.status === "added") { return [ @@ -478,7 +479,7 @@ export namespace Snapshot { ) }) - const load = Effect.fnUntraced( + const load = Effect.fn("Snapshot.load")( function* (rows: Row[]) { const refs = rows.flatMap((row) => { if (row.binary) return [] @@ -583,7 +584,7 @@ export namespace Snapshot { const statuses = yield* git( [...quote, ...args(["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."])], { cwd: state.directory }, - ) + ).pipe(Effect.withSpan("Snapshot.diffStatus")) for (const line of statuses.text.trim().split("\n")) { if (!line) continue @@ -597,7 +598,7 @@ export namespace Snapshot { { cwd: state.directory, }, - ) + ).pipe(Effect.withSpan("Snapshot.diffNumstat")) const rows = numstat.text .trim() @@ -660,30 +661,14 @@ export namespace Snapshot { ) return Service.of({ - init: Effect.fn("Snapshot.init")(function* () { - yield* InstanceState.get(state) - }), - cleanup: Effect.fn("Snapshot.cleanup")(function* () { - return yield* InstanceState.useEffect(state, (s) => s.cleanup()) - }), - track: Effect.fn("Snapshot.track")(function* () { - return yield* InstanceState.useEffect(state, (s) => s.track()) - }), - patch: Effect.fn("Snapshot.patch")(function* (hash: string) { - return yield* InstanceState.useEffect(state, (s) => s.patch(hash)) - }), - restore: Effect.fn("Snapshot.restore")(function* (snapshot: string) { - return yield* InstanceState.useEffect(state, (s) => s.restore(snapshot)) - }), - revert: Effect.fn("Snapshot.revert")(function* (patches: Snapshot.Patch[]) { - return yield* InstanceState.useEffect(state, (s) => s.revert(patches)) - }), - diff: Effect.fn("Snapshot.diff")(function* (hash: string) { - return yield* InstanceState.useEffect(state, (s) => s.diff(hash)) - }), - diffFull: Effect.fn("Snapshot.diffFull")(function* (from: string, to: string) { - return yield* InstanceState.useEffect(state, (s) => s.diffFull(from, to)) - }), + init: () => InstanceState.get(state).pipe(Effect.asVoid), + cleanup: () => InstanceState.useEffect(state, (s) => s.cleanup()), + track: () => InstanceState.useEffect(state, (s) => s.track()), + patch: (hash: string) => InstanceState.useEffect(state, (s) => s.patch(hash)), + restore: (snapshot: string) => InstanceState.useEffect(state, (s) => s.restore(snapshot)), + revert: (patches: Snapshot.Patch[]) => InstanceState.useEffect(state, (s) => s.revert(patches)), + diff: (hash: string) => InstanceState.useEffect(state, (s) => s.diff(hash)), + diffFull: (from: string, to: string) => InstanceState.useEffect(state, (s) => s.diffFull(from, to)), }) }), ) diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 9c045338ee..157b05caf5 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -82,7 +82,7 @@ export namespace ToolRegistry { Effect.isEffect(tool) ? tool : Effect.succeed(tool) const state = yield* InstanceState.make( - Effect.fn("ToolRegistry.state")(function* (ctx) { + Effect.fnUntraced(function* (ctx) { const custom: Tool.Info[] = [] function fromPlugin(id: string, def: ToolDefinition): Tool.Info { diff --git a/packages/opencode/test/preload.ts b/packages/opencode/test/preload.ts index 0ddc797faf..e6172c0940 100644 --- a/packages/opencode/test/preload.ts +++ b/packages/opencode/test/preload.ts @@ -33,6 +33,7 @@ process.env["XDG_DATA_HOME"] = path.join(dir, "share") process.env["XDG_CACHE_HOME"] = path.join(dir, "cache") process.env["XDG_CONFIG_HOME"] = path.join(dir, "config") process.env["XDG_STATE_HOME"] = path.join(dir, "state") +process.env["OPENCODE_DISABLE_OTLP"] = "1" process.env["OPENCODE_MODELS_PATH"] = path.join(import.meta.dir, "tool", "fixtures", "models-api.json") // Set test home directory to isolate tests from user's actual home directory