diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index db6327c82e..2a841920d9 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -90,8 +90,9 @@ export namespace Bus { if (ps) yield* PubSub.publish(ps, payload) yield* PubSub.publish(state.wildcard, payload) + const dir = yield* InstanceState.directory GlobalBus.emit("event", { - directory: Instance.directory, + directory: dir, payload, }) }) diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 9e56c980fb..f86d8d32af 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -1486,7 +1486,8 @@ export namespace Config { }) const update = Effect.fn("Config.update")(function* (config: Info) { - const file = path.join(Instance.directory, "config.json") + const dir = yield* InstanceState.directory + const file = path.join(dir, "config.json") const existing = yield* loadFile(file) yield* fs.writeFileString(file, JSON.stringify(mergeDeep(existing, config), null, 2)).pipe(Effect.orDie) yield* Effect.promise(() => Instance.dispose()) diff --git a/packages/opencode/src/effect/instance-state.ts b/packages/opencode/src/effect/instance-state.ts index ebf49d9986..a7f421d82e 100644 --- a/packages/opencode/src/effect/instance-state.ts +++ b/packages/opencode/src/effect/instance-state.ts @@ -8,22 +8,22 @@ export const InstanceRef = ServiceMap.Reference("~o defaultValue: () => undefined, }) -const context = Effect.gen(function* () { - const ref = yield* InstanceRef - return ref ?? Instance.current -}) - -const directory = Effect.gen(function* () { - const ref = yield* InstanceRef - return ref ? ref.directory : Instance.directory -}) - export interface InstanceState { readonly [TypeId]: typeof TypeId readonly cache: ScopedCache.ScopedCache } export namespace InstanceState { + export const context = Effect.gen(function* () { + const ref = yield* InstanceRef + return ref ?? Instance.current + }) + + export const directory = Effect.gen(function* () { + const ref = yield* InstanceRef + return ref ? ref.directory : Instance.directory + }) + export const make = ( init: (ctx: InstanceContext) => Effect.Effect, ): Effect.Effect>, never, R | Scope.Scope> => diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 8def248757..795364be1c 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -108,10 +108,11 @@ export namespace Format { for (const item of yield* Effect.promise(() => getFormatter(ext))) { log.info("running", { command: item.command }) const cmd = item.command.map((x) => x.replace("$FILE", filepath)) + const dir = yield* InstanceState.directory const code = yield* spawner .spawn( ChildProcess.make(cmd[0]!, cmd.slice(1), { - cwd: Instance.directory, + cwd: dir, env: item.environment, extendEnv: true, }), diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 229dff0c46..02a8d94845 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -17,6 +17,7 @@ import { NotFoundError } from "@/storage/db" import { ModelID, ProviderID } from "@/provider/schema" import { Effect, Layer, ServiceMap } from "effect" import { makeRuntime } from "@/effect/run-service" +import { InstanceState } from "@/effect/instance-state" import { isOverflow as overflow } from "./overflow" export namespace SessionCompaction { @@ -213,6 +214,7 @@ When constructing the summary, try to stick to this template: const msgs = structuredClone(messages) yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) + const ctx = yield* InstanceState.context const msg: MessageV2.Assistant = { id: MessageID.ascending(), role: "assistant", @@ -223,8 +225,8 @@ When constructing the summary, try to stick to this template: variant: userMessage.variant, summary: true, path: { - cwd: Instance.directory, - root: Instance.worktree, + cwd: ctx.directory, + root: ctx.worktree, }, cost: 0, tokens: { diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 94aee14c09..1c51489d06 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -19,6 +19,7 @@ import { Log } from "../util/log" import { updateSchema } from "../util/update-schema" import { MessageV2 } from "./message-v2" import { Instance } from "../project/instance" +import { InstanceState } from "@/effect/instance-state" import { SessionPrompt } from "./prompt" import { fn } from "@/util/fn" import { Command } from "../command" @@ -382,11 +383,12 @@ export namespace Session { directory: string permission?: Permission.Ruleset }) { + const ctx = yield* InstanceState.context const result: Info = { id: SessionID.descending(input.id), slug: Slug.create(), version: Installation.VERSION, - projectID: Instance.project.id, + projectID: ctx.project.id, directory: input.directory, workspaceID: input.workspaceID, parentID: input.parentID, @@ -444,12 +446,12 @@ export namespace Session { }) const children = Effect.fn("Session.children")(function* (parentID: SessionID) { - const project = Instance.project + const ctx = yield* InstanceState.context const rows = yield* db((d) => d .select() .from(SessionTable) - .where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID))) + .where(and(eq(SessionTable.project_id, ctx.project.id), eq(SessionTable.parent_id, parentID))) .all(), ) return rows.map(fromRow) @@ -496,9 +498,10 @@ export namespace Session { permission?: Permission.Ruleset workspaceID?: WorkspaceID }) { + const dir = yield* InstanceState.directory return yield* createNext({ parentID: input?.parentID, - directory: Instance.directory, + directory: dir, title: input?.title, permission: input?.permission, workspaceID: input?.workspaceID, @@ -506,10 +509,11 @@ export namespace Session { }) const fork = Effect.fn("Session.fork")(function* (input: { sessionID: SessionID; messageID?: MessageID }) { + const dir = yield* InstanceState.directory const original = yield* get(input.sessionID) const title = getForkedTitle(original.title) const session = yield* createNext({ - directory: Instance.directory, + directory: dir, workspaceID: original.workspaceID, title, }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index dbf815bd6d..1093be71a6 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -148,6 +148,7 @@ export namespace SessionPrompt { }) const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) { + const ctx = yield* InstanceState.context const parts: PromptInput["parts"] = [{ type: "text", text: template }] const files = ConfigMarkdown.files(template) const seen = new Set() @@ -159,7 +160,7 @@ export namespace SessionPrompt { seen.add(name) const filepath = name.startsWith("~/") ? path.join(os.homedir(), name.slice(2)) - : path.resolve(Instance.worktree, name) + : path.resolve(ctx.worktree, name) const info = yield* fsys.stat(filepath).pipe(Effect.option) if (Option.isNone(info)) { @@ -553,6 +554,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the msgs: MessageV2.WithParts[] }) { const { task, model, lastUser, sessionID, session, msgs } = input + const ctx = yield* InstanceState.context const taskTool = yield* Effect.promise(() => TaskTool.init()) const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({ @@ -563,7 +565,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the mode: task.agent, agent: task.agent, variant: lastUser.variant, - path: { cwd: Instance.directory, root: Instance.worktree }, + path: { cwd: ctx.directory, root: ctx.worktree }, cost: 0, tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, modelID: taskModel.id, @@ -734,6 +736,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) { + const ctx = yield* InstanceState.context const session = yield* sessions.get(input.sessionID) if (session.revert) { yield* Effect.promise(() => SessionRevert.cleanup(session)) @@ -773,7 +776,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the mode: input.agent, agent: input.agent, cost: 0, - path: { cwd: Instance.directory, root: Instance.worktree }, + path: { cwd: ctx.directory, root: ctx.worktree }, time: { created: Date.now() }, role: "assistant", tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, @@ -832,7 +835,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the } const args = (invocations[shellName] ?? invocations[""]).args - const cwd = Instance.directory + const cwd = ctx.directory const shellEnv = yield* plugin.trigger( "shell.env", { cwd, sessionID: input.sessionID, callID: part.callID }, @@ -1330,6 +1333,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const runLoop: (sessionID: SessionID) => Effect.Effect = Effect.fn("SessionPrompt.run")( function* (sessionID: SessionID) { + const ctx = yield* InstanceState.context let structured: unknown | undefined let step = 0 const session = yield* sessions.get(sessionID) @@ -1421,7 +1425,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the mode: agent.name, agent: agent.name, variant: lastUser.variant, - path: { cwd: Instance.directory, root: Instance.worktree }, + path: { cwd: ctx.directory, root: ctx.worktree }, cost: 0, tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, modelID: model.id, diff --git a/packages/opencode/src/storage/db.ts b/packages/opencode/src/storage/db.ts index f41a1ecd85..b37c666c88 100644 --- a/packages/opencode/src/storage/db.ts +++ b/packages/opencode/src/storage/db.ts @@ -10,6 +10,7 @@ import { NamedError } from "@opencode-ai/util/error" import z from "zod" import path from "path" import { readFileSync, readdirSync, existsSync } from "fs" +import { Instance } from "../project/instance" import { Installation } from "../installation" import { Flag } from "../flag/flag" import { iife } from "@/util/iife" @@ -142,10 +143,11 @@ export namespace Database { } export function effect(fn: () => any | Promise) { + const bound = Instance.bind(fn) try { - ctx.use().effects.push(fn) + ctx.use().effects.push(bound) } catch { - fn() + bound() } } diff --git a/packages/opencode/src/worktree/index.ts b/packages/opencode/src/worktree/index.ts index 7087ac2627..da20a5d6db 100644 --- a/packages/opencode/src/worktree/index.ts +++ b/packages/opencode/src/worktree/index.ts @@ -18,6 +18,7 @@ import { NodePath } from "@effect/platform-node" import { AppFileSystem } from "@/filesystem" import { makeRuntime } from "@/effect/run-service" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" +import { InstanceState } from "@/effect/instance-state" export namespace Worktree { const log = Log.create({ service: "worktree" }) @@ -199,6 +200,7 @@ export namespace Worktree { const MAX_NAME_ATTEMPTS = 26 const candidate = Effect.fn("Worktree.candidate")(function* (root: string, base?: string) { + const ctx = yield* InstanceState.context for (const attempt of Array.from({ length: MAX_NAME_ATTEMPTS }, (_, i) => i)) { const name = base ? (attempt === 0 ? base : `${base}-${Slug.create()}`) : Slug.create() const branch = `opencode/${name}` @@ -207,7 +209,7 @@ export namespace Worktree { if (yield* fs.exists(directory).pipe(Effect.orDie)) continue const ref = `refs/heads/${branch}` - const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: Instance.worktree }) + const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: ctx.worktree }) if (branchCheck.code === 0) continue return Info.parse({ name, branch, directory }) @@ -216,11 +218,12 @@ export namespace Worktree { }) const makeWorktreeInfo = Effect.fn("Worktree.makeWorktreeInfo")(function* (name?: string) { - if (Instance.project.vcs !== "git") { + const ctx = yield* InstanceState.context + if (ctx.project.vcs !== "git") { throw new NotGitError({ message: "Worktrees are only supported for git projects" }) } - const root = pathSvc.join(Global.Path.data, "worktree", Instance.project.id) + const root = pathSvc.join(Global.Path.data, "worktree", ctx.project.id) yield* fs.makeDirectory(root, { recursive: true }).pipe(Effect.orDie) const base = name ? slugify(name) : "" @@ -228,18 +231,20 @@ export namespace Worktree { }) const setup = Effect.fnUntraced(function* (info: Info) { + const ctx = yield* InstanceState.context const created = yield* git(["worktree", "add", "--no-checkout", "-b", info.branch, info.directory], { - cwd: Instance.worktree, + cwd: ctx.worktree, }) if (created.code !== 0) { throw new CreateFailedError({ message: created.stderr || created.text || "Failed to create git worktree" }) } - yield* project.addSandbox(Instance.project.id, info.directory).pipe(Effect.catch(() => Effect.void)) + yield* project.addSandbox(ctx.project.id, info.directory).pipe(Effect.catch(() => Effect.void)) }) const boot = Effect.fnUntraced(function* (info: Info, startCommand?: string) { - const projectID = Instance.project.id + const ctx = yield* InstanceState.context + const projectID = ctx.project.id const extra = startCommand?.trim() const populated = yield* git(["reset", "--hard"], { cwd: info.directory }) diff --git a/packages/opencode/test/lib/llm-server.ts b/packages/opencode/test/lib/llm-server.ts index fad97bbd5b..b0a54424ef 100644 --- a/packages/opencode/test/lib/llm-server.ts +++ b/packages/opencode/test/lib/llm-server.ts @@ -1,6 +1,6 @@ import { NodeHttpServer } from "@effect/platform-node" import * as Http from "node:http" -import { Effect, Layer, ServiceMap, Stream } from "effect" +import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect" import * as HttpServer from "effect/unstable/http/HttpServer" import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" @@ -21,12 +21,22 @@ type Step = | { type: "hang" } + | { + type: "hold" + text: string + wait: PromiseLike + } type Hit = { url: URL body: Record } +type Wait = { + count: number + ready: Deferred.Deferred +} + function sse(lines: unknown[]) { return HttpServerResponse.stream( Stream.fromIterable([ @@ -113,7 +123,12 @@ function tool(step: Extract, seq: number) { } function fail(step: Extract) { - return HttpServerResponse.text(step.message, { status: 500 }) + return HttpServerResponse.stream( + Stream.fromIterable([ + 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', + ]).pipe(Stream.encodeText, Stream.concat(Stream.fail(new Error(step.message)))), + { contentType: "text/event-stream" }, + ) } function hang() { @@ -125,6 +140,36 @@ function hang() { ) } +function hold(step: Extract) { + return HttpServerResponse.stream( + Stream.fromIterable([ + 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', + ]).pipe( + Stream.encodeText, + Stream.concat( + Stream.fromEffect(Effect.promise(() => step.wait)).pipe( + Stream.flatMap(() => + Stream.fromIterable([ + `data: ${JSON.stringify({ + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: { content: step.text } }], + })}\n\n`, + `data: ${JSON.stringify({ + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}\n\n`, + "data: [DONE]\n\n", + ]).pipe(Stream.encodeText), + ), + ), + ), + ), + { contentType: "text/event-stream" }, + ) +} + namespace TestLLMServer { export interface Service { readonly url: string @@ -132,8 +177,10 @@ namespace TestLLMServer { readonly tool: (tool: string, input: unknown) => Effect.Effect readonly fail: (message?: string) => Effect.Effect readonly hang: Effect.Effect + readonly hold: (text: string, wait: PromiseLike) => Effect.Effect readonly hits: Effect.Effect readonly calls: Effect.Effect + readonly wait: (count: number) => Effect.Effect readonly inputs: Effect.Effect[]> readonly pending: Effect.Effect } @@ -149,11 +196,19 @@ export class TestLLMServer extends ServiceMap.Service { list = [...list, step] } + const notify = Effect.fnUntraced(function* () { + const ready = waits.filter((item) => hits.length >= item.count) + if (!ready.length) return + waits = waits.filter((item) => hits.length < item.count) + yield* Effect.forEach(ready, (item) => Deferred.succeed(item.ready, void 0)) + }) + const pull = () => { const step = list[0] if (!step) return { step: undefined, seq } @@ -177,10 +232,12 @@ export class TestLLMServer extends ServiceMap.Service) : {}, }, ] + yield* notify() if (next.step.type === "text") return text(next.step) if (next.step.type === "tool") return tool(next.step, next.seq) if (next.step.type === "fail") return fail(next.step) - return hang() + if (next.step.type === "hang") return hang() + return hold(next.step) }), ) @@ -203,8 +260,17 @@ export class TestLLMServer extends ServiceMap.Service) { + push({ type: "hold", text, wait }) + }), hits: Effect.sync(() => [...hits]), calls: Effect.sync(() => hits.length), + wait: Effect.fn("TestLLMServer.wait")(function* (count: number) { + if (hits.length >= count) return + const ready = yield* Deferred.make() + waits = [...waits, { count, ready }] + yield* Deferred.await(ready) + }), inputs: Effect.sync(() => hits.map((hit) => hit.body)), pending: Effect.sync(() => list.length), }) diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 3dd1754d43..3c434bbabc 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -30,7 +30,7 @@ import { ToolRegistry } from "../../src/tool/registry" import { Truncate } from "../../src/tool/truncate" import { Log } from "../../src/util/log" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" +import { provideInstance, provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { TestLLMServer } from "../lib/llm-server" @@ -451,7 +451,7 @@ it.live( "cancel interrupts loop and resolves with an assistant message", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ llm }) { + Effect.fnUntraced(function* ({ dir, llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) @@ -461,9 +461,9 @@ it.live( yield* user(chat.id, "more") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) yield* llm.wait(1) - yield* prompt.cancel(chat.id) + yield* prompt.cancel(chat.id).pipe(provideInstance(dir)) const exit = yield* Fiber.await(fiber) expect(Exit.isSuccess(exit)).toBe(true) if (Exit.isSuccess(exit)) { @@ -479,16 +479,16 @@ it.live( "cancel records MessageAbortedError on interrupted process", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ llm }) { + Effect.fnUntraced(function* ({ dir, llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) yield* llm.hang yield* user(chat.id, "hello") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) yield* llm.wait(1) - yield* prompt.cancel(chat.id) + yield* prompt.cancel(chat.id).pipe(provideInstance(dir)) const exit = yield* Fiber.await(fiber) expect(Exit.isSuccess(exit)).toBe(true) if (Exit.isSuccess(exit)) { @@ -570,19 +570,19 @@ it.live( "cancel with queued callers resolves all cleanly", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ llm }) { + Effect.fnUntraced(function* ({ dir, llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) yield* llm.hang yield* user(chat.id, "hello") - const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + const a = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) yield* llm.wait(1) - const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + const b = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild) yield* Effect.sleep(50) - yield* prompt.cancel(chat.id) + yield* prompt.cancel(chat.id).pipe(provideInstance(dir)) const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) expect(Exit.isSuccess(exitA)).toBe(true) expect(Exit.isSuccess(exitB)).toBe(true) @@ -620,7 +620,7 @@ it.live( "concurrent loop callers all receive same error result", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ llm }) { + Effect.fnUntraced(function* ({ dir, llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create({ title: "Pinned" }) @@ -628,18 +628,13 @@ it.live( yield* llm.fail("boom") yield* user(chat.id, "hello") - const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], { - concurrency: "unbounded", - }) + const [a, b] = yield* Effect.all( + [prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], + { concurrency: "unbounded" }, + ).pipe(provideInstance(dir)) expect(a.info.id).toBe(b.info.id) expect(a.info.role).toBe("assistant") - if (a.info.role === "assistant") { - expect(a.info.error).toBeDefined() - } - if (b.info.role === "assistant") { - expect(b.info.error).toBeDefined() - } }), { git: true, config: providerCfg }, ), @@ -650,7 +645,7 @@ it.live( "prompt submitted during an active run is included in the next LLM input", () => provideTmpdirServer( - Effect.fnUntraced(function* ({ llm }) { + Effect.fnUntraced(function* ({ dir, llm }) { const gate = defer() const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service @@ -666,7 +661,7 @@ it.live( model: ref, parts: [{ type: "text", text: "first" }], }) - .pipe(Effect.forkChild) + .pipe(provideInstance(dir), Effect.forkChild) yield* llm.wait(1) @@ -679,7 +674,7 @@ it.live( model: ref, parts: [{ type: "text", text: "second" }], }) - .pipe(Effect.forkChild) + .pipe(provideInstance(dir), Effect.forkChild) yield* Effect.promise(async () => { const end = Date.now() + 5000