From 97c15a087d34f40f4cc09c5c347fbc49b7c7af38 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 25 Mar 2026 20:19:24 -0400 Subject: [PATCH] effectify Bus service: migrate to Effect PubSub + InstanceState (#18579) --- packages/opencode/AGENTS.md | 14 +- packages/opencode/specs/effect-migration.md | 47 +-- packages/opencode/src/account/index.ts | 4 +- packages/opencode/src/agent/agent.ts | 4 +- packages/opencode/src/auth/index.ts | 4 +- packages/opencode/src/bus/index.ts | 235 ++++++++----- packages/opencode/src/command/index.ts | 4 +- .../opencode/src/effect/instance-context.ts | 14 - packages/opencode/src/effect/run-service.ts | 12 +- packages/opencode/src/file/index.ts | 4 +- packages/opencode/src/file/time.ts | 4 +- packages/opencode/src/file/watcher.ts | 4 +- packages/opencode/src/format/index.ts | 93 +++--- packages/opencode/src/git/index.ts | 4 +- packages/opencode/src/installation/index.ts | 4 +- packages/opencode/src/permission/index.ts | 4 +- packages/opencode/src/plugin/index.ts | 33 +- packages/opencode/src/project/project.ts | 4 +- packages/opencode/src/project/vcs.ts | 43 +-- packages/opencode/src/provider/auth.ts | 4 +- packages/opencode/src/pty/index.ts | 4 +- packages/opencode/src/question/index.ts | 4 +- packages/opencode/src/session/status.ts | 11 +- packages/opencode/src/skill/index.ts | 4 +- packages/opencode/src/snapshot/index.ts | 4 +- packages/opencode/src/tool/apply_patch.ts | 6 +- packages/opencode/src/tool/edit.ts | 11 +- packages/opencode/src/tool/registry.ts | 4 +- packages/opencode/src/tool/truncate.ts | 4 +- packages/opencode/src/tool/write.ts | 6 +- packages/opencode/src/worktree/index.ts | 4 +- packages/opencode/test/bus/bus-effect.test.ts | 164 +++++++++ .../opencode/test/bus/bus-integration.test.ts | 87 +++++ packages/opencode/test/bus/bus.test.ts | 219 ++++++++++++ .../opencode/test/effect/run-service.test.ts | 8 +- packages/opencode/test/file/watcher.test.ts | 32 +- packages/opencode/test/fixture/fixture.ts | 68 ++++ packages/opencode/test/fixture/instance.ts | 51 --- packages/opencode/test/format/format.test.ts | 316 +++++++++--------- packages/opencode/test/project/vcs.test.ts | 72 ++-- packages/opencode/test/sync/index.test.ts | 10 +- packages/opencode/test/tool/edit.test.ts | 7 - 42 files changed, 1113 insertions(+), 522 deletions(-) delete mode 100644 packages/opencode/src/effect/instance-context.ts create mode 100644 packages/opencode/test/bus/bus-effect.test.ts create mode 100644 packages/opencode/test/bus/bus-integration.test.ts create mode 100644 packages/opencode/test/bus/bus.test.ts delete mode 100644 packages/opencode/test/fixture/instance.ts diff --git a/packages/opencode/AGENTS.md b/packages/opencode/AGENTS.md index e2a0c918dd..3e4c309ce2 100644 --- a/packages/opencode/AGENTS.md +++ b/packages/opencode/AGENTS.md @@ -31,12 +31,14 @@ See `specs/effect-migration.md` for the compact pattern reference and examples. - Use `Schema.Defect` instead of `unknown` for defect-like causes. - In `Effect.gen` / `Effect.fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches. -## Runtime vs Instances +## Runtime vs InstanceState -- Use the shared runtime for process-wide services with one lifecycle for the whole app. -- Use `src/effect/instances.ts` for per-directory or per-project services that need `InstanceContext`, per-instance state, or per-instance cleanup. -- If two open directories should not share one copy of the service, it belongs in `Instances`. -- Instance-scoped services should read context from `InstanceContext`, not `Instance.*` globals. +- Use `makeRuntime` (from `src/effect/run-service.ts`) for all services. It returns `{ runPromise, runFork, runCallback }` backed by a shared `memoMap` that deduplicates layers. +- Use `InstanceState` (from `src/effect/instance-state.ts`) for per-directory or per-project state that needs per-instance cleanup. It uses `ScopedCache` keyed by directory — each open project gets its own state, automatically cleaned up on disposal. +- If two open directories should not share one copy of the service, it needs `InstanceState`. +- Do the work directly in the `InstanceState.make` closure — `ScopedCache` handles run-once semantics. Don't add fibers, `ensure()` callbacks, or `started` flags on top. +- Use `Effect.addFinalizer` or `Effect.acquireRelease` inside the `InstanceState.make` closure for cleanup (subscriptions, process teardown, etc.). +- Use `Effect.forkScoped` inside the closure for background stream consumers — the fiber is interrupted when the instance is disposed. ## Preferred Effect services @@ -51,7 +53,7 @@ See `specs/effect-migration.md` for the compact pattern reference and examples. `Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and restores it synchronously when called. -Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`. +Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish` or anything that reads `Instance.directory`. You do not need it for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers. diff --git a/packages/opencode/specs/effect-migration.md b/packages/opencode/specs/effect-migration.md index d98750eac9..c95d131dc5 100644 --- a/packages/opencode/specs/effect-migration.md +++ b/packages/opencode/specs/effect-migration.md @@ -6,7 +6,7 @@ Practical reference for new and migrated Effect code in `packages/opencode`. Use `InstanceState` (from `src/effect/instance-state.ts`) for services that need per-directory state, per-instance cleanup, or project-bound background work. InstanceState uses a `ScopedCache` keyed by directory, so each open project gets its own copy of the state that is automatically cleaned up on disposal. -Use `makeRunPromise` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. +Use `makeRuntime` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. Returns `{ runPromise, runFork, runCallback }`. - Global services (no per-directory state): Account, Auth, Installation, Truncate - Instance-scoped (per-directory state via InstanceState): File, FileTime, FileWatcher, Format, Permission, Question, Skill, Snapshot, Vcs, ProviderAuth @@ -46,7 +46,7 @@ export namespace Foo { export const defaultLayer = layer.pipe(Layer.provide(FooDep.layer)) // Per-service runtime (inside the namespace) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) // Async facade functions export async function get(id: FooID) { @@ -79,29 +79,34 @@ See `Auth.ZodInfo` for the canonical example. The `InstanceState.make` init callback receives a `Scope`, so you can use `Effect.acquireRelease`, `Effect.addFinalizer`, and `Effect.forkScoped` inside it. Resources acquired this way are automatically cleaned up when the instance is disposed or invalidated by `ScopedCache`. This makes it the right place for: -- **Subscriptions**: Use `Effect.acquireRelease` to subscribe and auto-unsubscribe: +- **Subscriptions**: Yield `Bus.Service` at the layer level, then use `Stream` + `forkScoped` inside the init closure. The fiber is automatically interrupted when the instance scope closes: ```ts -const cache = - yield * - InstanceState.make( - Effect.fn("Foo.state")(function* (ctx) { - // ... load state ... +const bus = yield* Bus.Service - yield* Effect.acquireRelease( - Effect.sync(() => - Bus.subscribeAll((event) => { - /* handle */ - }), - ), - (unsub) => Effect.sync(unsub), +const cache = yield* InstanceState.make( + Effect.fn("Foo.state")(function* (ctx) { + // ... load state ... + + yield* bus + .subscribeAll() + .pipe( + Stream.runForEach((event) => Effect.sync(() => { /* handle */ })), + Effect.forkScoped, ) - return { - /* state */ - } - }), - ) + return { /* state */ } + }), +) +``` + +- **Resource cleanup**: Use `Effect.acquireRelease` or `Effect.addFinalizer` for resources that need teardown (native watchers, process handles, etc.): + +```ts +yield* Effect.acquireRelease( + Effect.sync(() => nativeAddon.watch(dir)), + (watcher) => Effect.sync(() => watcher.close()), +) ``` - **Background fibers**: Use `Effect.forkScoped` — the fiber is interrupted on disposal. @@ -165,7 +170,7 @@ Still open and likely worth migrating: - [x] `ToolRegistry` - [ ] `Pty` - [x] `Worktree` -- [ ] `Bus` +- [x] `Bus` - [x] `Command` - [ ] `Config` - [ ] `Session` diff --git a/packages/opencode/src/account/index.ts b/packages/opencode/src/account/index.ts index 0a8d3687a3..82b166ef2a 100644 --- a/packages/opencode/src/account/index.ts +++ b/packages/opencode/src/account/index.ts @@ -1,7 +1,7 @@ import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { withTransientReadRetry } from "@/util/effect-http-client" import { AccountRepo, type AccountRow } from "./repo" import { @@ -379,7 +379,7 @@ export namespace Account { export const defaultLayer = layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(FetchHttpClient.layer)) - export const runPromise = makeRunPromise(Service, defaultLayer) + export const { runPromise } = makeRuntime(Service, defaultLayer) export async function active(): Promise { return Option.getOrUndefined(await runPromise((service) => service.active())) diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index 2ae18aaaed..622537e3c1 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -21,7 +21,7 @@ import { Plugin } from "@/plugin" import { Skill } from "../skill" import { Effect, ServiceMap, Layer } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" export namespace Agent { export const Info = z @@ -393,7 +393,7 @@ export namespace Agent { export const defaultLayer = layer.pipe(Layer.provide(Auth.layer)) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function get(agent: string) { return runPromise((svc) => svc.get(agent)) diff --git a/packages/opencode/src/auth/index.ts b/packages/opencode/src/auth/index.ts index 2238d57f5d..2ccc1edff1 100644 --- a/packages/opencode/src/auth/index.ts +++ b/packages/opencode/src/auth/index.ts @@ -1,6 +1,6 @@ import path from "path" import { Effect, Layer, Record, Result, Schema, ServiceMap } from "effect" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { zod } from "@/util/effect-zod" import { Global } from "../global" import { Filesystem } from "../util/filesystem" @@ -95,7 +95,7 @@ export namespace Auth { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export async function get(providerID: string) { return runPromise((service) => service.get(providerID)) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 625f296622..db6327c82e 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -1,12 +1,14 @@ import z from "zod" +import { Effect, Exit, Layer, PubSub, Scope, ServiceMap, Stream } from "effect" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" +import { InstanceState } from "@/effect/instance-state" +import { makeRuntime } from "@/effect/run-service" export namespace Bus { const log = Log.create({ service: "bus" }) - type Subscription = (event: any) => void export const InstanceDisposed = BusEvent.define( "server.instance.disposed", @@ -15,91 +17,168 @@ export namespace Bus { }), ) - const state = Instance.state( - () => { - const subscriptions = new Map() + type Payload = { + type: D["type"] + properties: z.infer + } - return { - subscriptions, + type State = { + wildcard: PubSub.PubSub + typed: Map> + } + + export interface Interface { + readonly publish: ( + def: D, + properties: z.output, + ) => Effect.Effect + readonly subscribe: (def: D) => Stream.Stream> + readonly subscribeAll: () => Stream.Stream + readonly subscribeCallback: ( + def: D, + callback: (event: Payload) => unknown, + ) => Effect.Effect<() => void> + readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void> + } + + export class Service extends ServiceMap.Service()("@opencode/Bus") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const cache = yield* InstanceState.make( + Effect.fn("Bus.state")(function* (ctx) { + const wildcard = yield* PubSub.unbounded() + const typed = new Map>() + + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + // Publish InstanceDisposed before shutting down so subscribers see it + yield* PubSub.publish(wildcard, { + type: InstanceDisposed.type, + properties: { directory: ctx.directory }, + }) + yield* PubSub.shutdown(wildcard) + for (const ps of typed.values()) { + yield* PubSub.shutdown(ps) + } + }), + ) + + return { wildcard, typed } + }), + ) + + function getOrCreate(state: State, def: D) { + return Effect.gen(function* () { + let ps = state.typed.get(def.type) + if (!ps) { + ps = yield* PubSub.unbounded() + state.typed.set(def.type, ps) + } + return ps as unknown as PubSub.PubSub> + }) } - }, - async (entry) => { - const wildcard = entry.subscriptions.get("*") - if (!wildcard) return - const event = { - type: InstanceDisposed.type, - properties: { - directory: Instance.directory, - }, + + function publish(def: D, properties: z.output) { + return Effect.gen(function* () { + const state = yield* InstanceState.get(cache) + const payload: Payload = { type: def.type, properties } + log.info("publishing", { type: def.type }) + + const ps = state.typed.get(def.type) + if (ps) yield* PubSub.publish(ps, payload) + yield* PubSub.publish(state.wildcard, payload) + + GlobalBus.emit("event", { + directory: Instance.directory, + payload, + }) + }) } - for (const sub of [...wildcard]) { - sub(event) + + function subscribe(def: D): Stream.Stream> { + log.info("subscribing", { type: def.type }) + return Stream.unwrap( + Effect.gen(function* () { + const state = yield* InstanceState.get(cache) + const ps = yield* getOrCreate(state, def) + return Stream.fromPubSub(ps) + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) } - }, + + function subscribeAll(): Stream.Stream { + log.info("subscribing", { type: "*" }) + return Stream.unwrap( + Effect.gen(function* () { + const state = yield* InstanceState.get(cache) + return Stream.fromPubSub(state.wildcard) + }), + ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" })))) + } + + function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { + return Effect.gen(function* () { + log.info("subscribing", { type }) + const scope = yield* Scope.make() + const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub)) + + yield* Scope.provide(scope)( + Stream.fromSubscription(subscription).pipe( + Stream.runForEach((msg) => + Effect.tryPromise({ + try: () => Promise.resolve().then(() => callback(msg)), + catch: (cause) => { + log.error("subscriber failed", { type, cause }) + }, + }).pipe(Effect.ignore), + ), + Effect.forkScoped, + ), + ) + + return () => { + log.info("unsubscribing", { type }) + Effect.runFork(Scope.close(scope, Exit.void)) + } + }) + } + + const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* ( + def: D, + callback: (event: Payload) => unknown, + ) { + const state = yield* InstanceState.get(cache) + const ps = yield* getOrCreate(state, def) + return yield* on(ps, def.type, callback) + }) + + const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) { + const state = yield* InstanceState.get(cache) + return yield* on(state.wildcard, "*", callback) + }) + + return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback }) + }), ) - export async function publish( - def: Definition, - properties: z.output, + const { runPromise, runSync } = makeRuntime(Service, layer) + + // runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe, + // Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw. + export async function publish(def: D, properties: z.output) { + return runPromise((svc) => svc.publish(def, properties)) + } + + export function subscribe( + def: D, + callback: (event: { type: D["type"]; properties: z.infer }) => unknown, ) { - const payload = { - type: def.type, - properties, - } - log.info("publishing", { - type: def.type, - }) - const pending = [] - for (const key of [def.type, "*"]) { - const match = [...(state().subscriptions.get(key) ?? [])] - for (const sub of match) { - pending.push(sub(payload)) - } - } - GlobalBus.emit("event", { - directory: Instance.directory, - payload, - }) - return Promise.all(pending) + return runSync((svc) => svc.subscribeCallback(def, callback)) } - export function subscribe( - def: Definition, - callback: (event: { type: Definition["type"]; properties: z.infer }) => void, - ) { - return raw(def.type, callback) - } - - export function once( - def: Definition, - callback: (event: { - type: Definition["type"] - properties: z.infer - }) => "done" | undefined, - ) { - const unsub = subscribe(def, (event) => { - if (callback(event)) unsub() - }) - } - - export function subscribeAll(callback: (event: any) => void) { - return raw("*", callback) - } - - function raw(type: string, callback: (event: any) => void) { - log.info("subscribing", { type }) - const subscriptions = state().subscriptions - let match = subscriptions.get(type) ?? [] - match.push(callback) - subscriptions.set(type, match) - - return () => { - log.info("unsubscribing", { type }) - const match = subscriptions.get(type) - if (!match) return - const index = match.indexOf(callback) - if (index === -1) return - match.splice(index, 1) - } + export function subscribeAll(callback: (event: any) => unknown) { + return runSync((svc) => svc.subscribeAllCallback(callback)) } } diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts index ff93826103..a2407982a3 100644 --- a/packages/opencode/src/command/index.ts +++ b/packages/opencode/src/command/index.ts @@ -1,6 +1,6 @@ import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { SessionID, MessageID } from "@/session/schema" import { Effect, Layer, ServiceMap } from "effect" import z from "zod" @@ -173,7 +173,7 @@ export namespace Command { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export async function get(name: string) { return runPromise((svc) => svc.get(name)) diff --git a/packages/opencode/src/effect/instance-context.ts b/packages/opencode/src/effect/instance-context.ts deleted file mode 100644 index fd45901904..0000000000 --- a/packages/opencode/src/effect/instance-context.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { ServiceMap } from "effect" -import type { Project } from "@/project/project" - -export declare namespace InstanceContext { - export interface Shape { - readonly directory: string - readonly worktree: string - readonly project: Project.Info - } -} - -export class InstanceContext extends ServiceMap.Service()( - "opencode/InstanceContext", -) {} diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 226c276ead..76248ca88f 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -3,11 +3,15 @@ import * as ServiceMap from "effect/ServiceMap" export const memoMap = Layer.makeMemoMapUnsafe() -export function makeRunPromise(service: ServiceMap.Service, layer: Layer.Layer) { +export function makeRuntime(service: ServiceMap.Service, layer: Layer.Layer) { let rt: ManagedRuntime.ManagedRuntime | undefined + const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap })) - return (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => { - rt ??= ManagedRuntime.make(layer, { memoMap }) - return rt.runPromise(service.use(fn), options) + return { + runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(service.use(fn)), + runPromise: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => + getRuntime().runPromise(service.use(fn), options), + runFork: (fn: (svc: S) => Effect.Effect) => getRuntime().runFork(service.use(fn)), + runCallback: (fn: (svc: S) => Effect.Effect) => getRuntime().runCallback(service.use(fn)), } } diff --git a/packages/opencode/src/file/index.ts b/packages/opencode/src/file/index.ts index 7dc36e9c3d..86f7bb0dce 100644 --- a/packages/opencode/src/file/index.ts +++ b/packages/opencode/src/file/index.ts @@ -1,6 +1,6 @@ import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { Git } from "@/git" import { Effect, Fiber, Layer, Scope, ServiceMap } from "effect" import { formatPatch, structuredPatch } from "diff" @@ -688,7 +688,7 @@ export namespace File { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export function init() { return runPromise((svc) => svc.init()) diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts index 4962ef0c9e..d33848000d 100644 --- a/packages/opencode/src/file/time.ts +++ b/packages/opencode/src/file/time.ts @@ -1,6 +1,6 @@ import { DateTime, Effect, Layer, Semaphore, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { Flag } from "@/flag/flag" import type { SessionID } from "@/session/schema" import { Filesystem } from "../util/filesystem" @@ -108,7 +108,7 @@ export namespace FileTime { }), ).pipe(Layer.orDie) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export function read(sessionID: SessionID, file: string) { return runPromise((s) => s.read(sessionID, file)) diff --git a/packages/opencode/src/file/watcher.ts b/packages/opencode/src/file/watcher.ts index ba70791433..42ece35827 100644 --- a/packages/opencode/src/file/watcher.ts +++ b/packages/opencode/src/file/watcher.ts @@ -8,7 +8,7 @@ import z from "zod" import { Bus } from "@/bus" import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { Flag } from "@/flag/flag" import { Git } from "@/git" import { Instance } from "@/project/instance" @@ -159,7 +159,7 @@ export namespace FileWatcher { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export function init() { return runPromise((svc) => svc.init()) diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 39e0630cfc..316ea5ba5c 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -1,12 +1,10 @@ import { Effect, Layer, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import path from "path" import { mergeDeep } from "remeda" import z from "zod" -import { Bus } from "../bus" import { Config } from "../config/config" -import { File } from "../file" import { Instance } from "../project/instance" import { Process } from "../util/process" import { Log } from "../util/log" @@ -29,6 +27,7 @@ export namespace Format { export interface Interface { readonly init: () => Effect.Effect readonly status: () => Effect.Effect + readonly file: (filepath: string) => Effect.Effect } export class Service extends ServiceMap.Service()("@opencode/Format") {} @@ -97,53 +96,46 @@ export namespace Format { return checks.filter((x) => x.enabled).map((x) => x.item) } - yield* Effect.acquireRelease( - Effect.sync(() => - Bus.subscribe( - File.Event.Edited, - Instance.bind(async (payload) => { - const file = payload.properties.file - log.info("formatting", { file }) - const ext = path.extname(file) + async function formatFile(filepath: string) { + log.info("formatting", { file: filepath }) + const ext = path.extname(filepath) + + for (const item of await getFormatter(ext)) { + log.info("running", { command: item.command }) + try { + const proc = Process.spawn( + item.command.map((x) => x.replace("$FILE", filepath)), + { + cwd: Instance.directory, + env: { ...process.env, ...item.environment }, + stdout: "ignore", + stderr: "ignore", + }, + ) + const exit = await proc.exited + if (exit !== 0) { + log.error("failed", { + command: item.command, + ...item.environment, + }) + } + } catch (error) { + log.error("failed to format file", { + error, + command: item.command, + ...item.environment, + file: filepath, + }) + } + } + } - for (const item of await getFormatter(ext)) { - log.info("running", { command: item.command }) - try { - const proc = Process.spawn( - item.command.map((x) => x.replace("$FILE", file)), - { - cwd: Instance.directory, - env: { ...process.env, ...item.environment }, - stdout: "ignore", - stderr: "ignore", - }, - ) - const exit = await proc.exited - if (exit !== 0) { - log.error("failed", { - command: item.command, - ...item.environment, - }) - } - } catch (error) { - log.error("failed to format file", { - error, - command: item.command, - ...item.environment, - file, - }) - } - } - }), - ), - ), - (unsubscribe) => Effect.sync(unsubscribe), - ) log.info("init") return { formatters, isEnabled, + formatFile, } }), ) @@ -166,11 +158,16 @@ export namespace Format { return result }) - return Service.of({ init, status }) + const file = Effect.fn("Format.file")(function* (filepath: string) { + const { formatFile } = yield* InstanceState.get(state) + yield* Effect.promise(() => formatFile(filepath)) + }) + + return Service.of({ init, status, file }) }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export async function init() { return runPromise((s) => s.init()) @@ -179,4 +176,8 @@ export namespace Format { export async function status() { return runPromise((s) => s.status()) } + + export async function file(filepath: string) { + return runPromise((s) => s.file(filepath)) + } } diff --git a/packages/opencode/src/git/index.ts b/packages/opencode/src/git/index.ts index 1442b8cb65..521643e9f1 100644 --- a/packages/opencode/src/git/index.ts +++ b/packages/opencode/src/git/index.ts @@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import { Effect, Layer, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" export namespace Git { const cfg = [ @@ -264,7 +264,7 @@ export namespace Git { Layer.provide(NodePath.layer), ) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export function run(args: string[], opts: Options) { return runPromise((git) => git.run(args, opts)) diff --git a/packages/opencode/src/installation/index.ts b/packages/opencode/src/installation/index.ts index 912951a0ba..76f3d0c9e1 100644 --- a/packages/opencode/src/installation/index.ts +++ b/packages/opencode/src/installation/index.ts @@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node" import { Effect, Layer, Schema, ServiceMap, Stream } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { withTransientReadRetry } from "@/util/effect-http-client" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import path from "path" @@ -346,7 +346,7 @@ export namespace Installation { Layer.provide(NodePath.layer), ) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function info(): Promise { return runPromise((svc) => svc.info()) diff --git a/packages/opencode/src/permission/index.ts b/packages/opencode/src/permission/index.ts index 63e6570189..1a7bd2c610 100644 --- a/packages/opencode/src/permission/index.ts +++ b/packages/opencode/src/permission/index.ts @@ -2,7 +2,7 @@ import { Bus } from "@/bus" import { BusEvent } from "@/bus/bus-event" import { Config } from "@/config/config" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { ProjectID } from "@/project/schema" import { Instance } from "@/project/instance" import { MessageID, SessionID } from "@/session/schema" @@ -306,7 +306,7 @@ export namespace Permission { return result } - export const runPromise = makeRunPromise(Service, layer) + export const { runPromise } = makeRuntime(Service, layer) export async function ask(input: z.infer) { return runPromise((s) => s.ask(input)) diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 09e991c5a4..cd4b917992 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -11,9 +11,9 @@ import { NamedError } from "@opencode-ai/util/error" import { CopilotAuthPlugin } from "./copilot" import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth" import { PoeAuthPlugin } from "opencode-poe-auth" -import { Effect, Layer, ServiceMap } from "effect" +import { Effect, Layer, ServiceMap, Stream } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" export namespace Plugin { const log = Log.create({ service: "plugin" }) @@ -52,6 +52,8 @@ export namespace Plugin { export const layer = Layer.effect( Service, Effect.gen(function* () { + const bus = yield* Bus.Service + const cache = yield* InstanceState.make( Effect.fn("Plugin.state")(function* (ctx) { const hooks: Hooks[] = [] @@ -146,17 +148,19 @@ export namespace Plugin { } }) - // Subscribe to bus events, clean up when scope is closed - yield* Effect.acquireRelease( - Effect.sync(() => - Bus.subscribeAll(async (input) => { - for (const hook of hooks) { - hook["event"]?.({ event: input }) - } - }), - ), - (unsub) => Effect.sync(unsub), - ) + // Subscribe to bus events, fiber interrupted when scope closes + yield* bus + .subscribeAll() + .pipe( + Stream.runForEach((input) => + Effect.sync(() => { + for (const hook of hooks) { + hook["event"]?.({ event: input as any }) + } + }), + ), + Effect.forkScoped, + ) return { hooks } }), @@ -192,7 +196,8 @@ export namespace Plugin { }), ) - const runPromise = makeRunPromise(Service, layer) + const defaultLayer = layer.pipe(Layer.provide(Bus.layer)) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function trigger< Name extends TriggerName, diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts index 256be36959..b8de639e76 100644 --- a/packages/opencode/src/project/project.ts +++ b/packages/opencode/src/project/project.ts @@ -11,7 +11,7 @@ import { ProjectID } from "./schema" import { Effect, Layer, Path, Scope, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import { NodeFileSystem, NodePath } from "@effect/platform-node" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { AppFileSystem } from "@/filesystem" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" @@ -462,7 +462,7 @@ export namespace Project { Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer), ) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) // --------------------------------------------------------------------------- // Promise-based API (delegates to Effect service via runPromise) diff --git a/packages/opencode/src/project/vcs.ts b/packages/opencode/src/project/vcs.ts index e3243ba8eb..7df9dfb6f1 100644 --- a/packages/opencode/src/project/vcs.ts +++ b/packages/opencode/src/project/vcs.ts @@ -1,9 +1,9 @@ -import { Effect, Layer, ServiceMap } from "effect" +import { Effect, Layer, ServiceMap, Stream } from "effect" import path from "path" import { Bus } from "@/bus" import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { AppFileSystem } from "@/filesystem" import { FileWatcher } from "@/file/watcher" import { Git } from "@/git" @@ -139,11 +139,12 @@ export namespace Vcs { export class Service extends ServiceMap.Service()("@opencode/Vcs") {} - export const layer: Layer.Layer = Layer.effect( + export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { const fs = yield* AppFileSystem.Service const git = yield* Git.Service + const bus = yield* Bus.Service const state = yield* InstanceState.make( Effect.fn("Vcs.state")((ctx) => Effect.gen(function* () { @@ -158,22 +159,22 @@ export namespace Vcs { const value = { current, root } log.info("initialized", { branch: value.current, default_branch: value.root?.name }) - yield* Effect.acquireRelease( - Effect.sync(() => - Bus.subscribe( - FileWatcher.Event.Updated, - Instance.bind(async (evt) => { - if (!evt.properties.file.endsWith("HEAD")) return - const next = await get() - if (next === value.current) return - log.info("branch changed", { from: value.current, to: next }) - value.current = next - Bus.publish(Event.BranchUpdated, { branch: next }) + yield* bus + .subscribe(FileWatcher.Event.Updated) + .pipe( + Stream.filter((evt) => evt.properties.file.endsWith("HEAD")), + Stream.runForEach((_evt) => + Effect.gen(function* () { + const next = yield* Effect.promise(() => get()) + if (next !== value.current) { + log.info("branch changed", { from: value.current, to: next }) + value.current = next + yield* bus.publish(Event.BranchUpdated, { branch: next }) + } }), ), - ), - (unsubscribe) => Effect.sync(unsubscribe), - ) + Effect.forkScoped, + ) return value }), @@ -212,9 +213,13 @@ export namespace Vcs { }), ) - export const defaultLayer = layer.pipe(Layer.provide(Git.defaultLayer), Layer.provide(AppFileSystem.defaultLayer)) + export const defaultLayer = layer.pipe( + Layer.provide(Git.defaultLayer), + Layer.provide(AppFileSystem.defaultLayer), + Layer.provide(Bus.layer), + ) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export function init() { return runPromise((svc) => svc.init()) diff --git a/packages/opencode/src/provider/auth.ts b/packages/opencode/src/provider/auth.ts index 99184c48a5..759f8803ae 100644 --- a/packages/opencode/src/provider/auth.ts +++ b/packages/opencode/src/provider/auth.ts @@ -2,7 +2,7 @@ import type { AuthOuathResult, Hooks } from "@opencode-ai/plugin" import { NamedError } from "@opencode-ai/util/error" import { Auth } from "@/auth" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { Plugin } from "../plugin" import { ProviderID } from "./schema" import { Array as Arr, Effect, Layer, Record, Result, ServiceMap } from "effect" @@ -231,7 +231,7 @@ export namespace ProviderAuth { export const defaultLayer = layer.pipe(Layer.provide(Auth.layer)) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function methods() { return runPromise((svc) => svc.methods()) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index f866b18adc..1ba87126bb 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -1,7 +1,7 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { Instance } from "@/project/instance" import { type IPty } from "bun-pty" import z from "zod" @@ -361,7 +361,7 @@ export namespace Pty { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export async function list() { return runPromise((svc) => svc.list()) diff --git a/packages/opencode/src/question/index.ts b/packages/opencode/src/question/index.ts index a0d62d94b8..f46cdd1081 100644 --- a/packages/opencode/src/question/index.ts +++ b/packages/opencode/src/question/index.ts @@ -2,7 +2,7 @@ import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect" import { Bus } from "@/bus" import { BusEvent } from "@/bus/bus-event" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { SessionID, MessageID } from "@/session/schema" import { Log } from "@/util/log" import z from "zod" @@ -197,7 +197,7 @@ export namespace Question { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export async function ask(input: { sessionID: SessionID diff --git a/packages/opencode/src/session/status.ts b/packages/opencode/src/session/status.ts index 462d5ded48..34a79eed11 100644 --- a/packages/opencode/src/session/status.ts +++ b/packages/opencode/src/session/status.ts @@ -1,7 +1,7 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { SessionID } from "./schema" import { Effect, Layer, ServiceMap } from "effect" import z from "zod" @@ -55,6 +55,8 @@ export namespace SessionStatus { export const layer = Layer.effect( Service, Effect.gen(function* () { + const bus = yield* Bus.Service + const state = yield* InstanceState.make( Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map())), ) @@ -70,9 +72,9 @@ export namespace SessionStatus { const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) { const data = yield* InstanceState.get(state) - yield* Effect.promise(() => Bus.publish(Event.Status, { sessionID, status })) + yield* bus.publish(Event.Status, { sessionID, status }) if (status.type === "idle") { - yield* Effect.promise(() => Bus.publish(Event.Idle, { sessionID })) + yield* bus.publish(Event.Idle, { sessionID }) data.delete(sessionID) return } @@ -83,7 +85,8 @@ export namespace SessionStatus { }), ) - const runPromise = makeRunPromise(Service, layer) + const defaultLayer = layer.pipe(Layer.provide(Bus.layer)) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function get(sessionID: SessionID) { return runPromise((svc) => svc.get(sessionID)) diff --git a/packages/opencode/src/skill/index.ts b/packages/opencode/src/skill/index.ts index 43a22219ed..239549a1af 100644 --- a/packages/opencode/src/skill/index.ts +++ b/packages/opencode/src/skill/index.ts @@ -7,7 +7,7 @@ import { NamedError } from "@opencode-ai/util/error" import type { Agent } from "@/agent/agent" import { Bus } from "@/bus" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { Flag } from "@/flag/flag" import { Global } from "@/global" import { Permission } from "@/permission" @@ -242,7 +242,7 @@ export namespace Skill { return ["## Available Skills", ...list.map((skill) => `- **${skill.name}**: ${skill.description}`)].join("\n") } - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function get(name: string) { return runPromise((skill) => skill.get(name)) diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts index 98a9d322c5..d6bdf8a3c1 100644 --- a/packages/opencode/src/snapshot/index.ts +++ b/packages/opencode/src/snapshot/index.ts @@ -5,7 +5,7 @@ import path from "path" import z from "zod" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { AppFileSystem } from "@/filesystem" import { Hash } from "@/util/hash" import { Config } from "../config/config" @@ -459,7 +459,7 @@ export namespace Snapshot { Layer.provide(NodePath.layer), ) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function init() { return runPromise((svc) => svc.init()) diff --git a/packages/opencode/src/tool/apply_patch.ts b/packages/opencode/src/tool/apply_patch.ts index 06293b6eba..c23c0dd3d0 100644 --- a/packages/opencode/src/tool/apply_patch.ts +++ b/packages/opencode/src/tool/apply_patch.ts @@ -13,6 +13,7 @@ import { LSP } from "../lsp" import { Filesystem } from "../util/filesystem" import DESCRIPTION from "./apply_patch.txt" import { File } from "../file" +import { Format } from "../format" const PatchParams = z.object({ patchText: z.string().describe("The full patch text that describes all changes to be made"), @@ -220,9 +221,8 @@ export const ApplyPatchTool = Tool.define("apply_patch", { } if (edited) { - await Bus.publish(File.Event.Edited, { - file: edited, - }) + await Format.file(edited) + Bus.publish(File.Event.Edited, { file: edited }) } } diff --git a/packages/opencode/src/tool/edit.ts b/packages/opencode/src/tool/edit.ts index 1a7614fc17..554d547d05 100644 --- a/packages/opencode/src/tool/edit.ts +++ b/packages/opencode/src/tool/edit.ts @@ -12,6 +12,7 @@ import DESCRIPTION from "./edit.txt" import { File } from "../file" import { FileWatcher } from "../file/watcher" import { Bus } from "../bus" +import { Format } from "../format" import { FileTime } from "../file/time" import { Filesystem } from "../util/filesystem" import { Instance } from "../project/instance" @@ -71,9 +72,8 @@ export const EditTool = Tool.define("edit", { }, }) await Filesystem.write(filePath, params.newString) - await Bus.publish(File.Event.Edited, { - file: filePath, - }) + await Format.file(filePath) + Bus.publish(File.Event.Edited, { file: filePath }) await Bus.publish(FileWatcher.Event.Updated, { file: filePath, event: existed ? "change" : "add", @@ -108,9 +108,8 @@ export const EditTool = Tool.define("edit", { }) await Filesystem.write(filePath, contentNew) - await Bus.publish(File.Event.Edited, { - file: filePath, - }) + await Format.file(filePath) + Bus.publish(File.Event.Edited, { file: filePath }) await Bus.publish(FileWatcher.Event.Updated, { file: filePath, event: "change", diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 6381fcfbc0..ada761fd50 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -31,7 +31,7 @@ import { Glob } from "../util/glob" import { pathToFileURL } from "url" import { Effect, Layer, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" export namespace ToolRegistry { const log = Log.create({ service: "tool.registry" }) @@ -198,7 +198,7 @@ export namespace ToolRegistry { }), ) - const runPromise = makeRunPromise(Service, layer) + const { runPromise } = makeRuntime(Service, layer) export async function register(tool: Tool.Info) { return runPromise((svc) => svc.register(tool)) diff --git a/packages/opencode/src/tool/truncate.ts b/packages/opencode/src/tool/truncate.ts index fa1d0a4aed..5cddacefc6 100644 --- a/packages/opencode/src/tool/truncate.ts +++ b/packages/opencode/src/tool/truncate.ts @@ -2,7 +2,7 @@ import { NodePath } from "@effect/platform-node" import { Cause, Duration, Effect, Layer, Schedule, ServiceMap } from "effect" import path from "path" import type { Agent } from "../agent/agent" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import { AppFileSystem } from "@/filesystem" import { evaluate } from "@/permission/evaluate" import { Identifier } from "../id/id" @@ -136,7 +136,7 @@ export namespace Truncate { export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer)) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function output(text: string, options: Options = {}, agent?: Agent.Info): Promise { return runPromise((s) => s.output(text, options, agent)) diff --git a/packages/opencode/src/tool/write.ts b/packages/opencode/src/tool/write.ts index 83474a543c..6b134e5253 100644 --- a/packages/opencode/src/tool/write.ts +++ b/packages/opencode/src/tool/write.ts @@ -7,6 +7,7 @@ import DESCRIPTION from "./write.txt" import { Bus } from "../bus" import { File } from "../file" import { FileWatcher } from "../file/watcher" +import { Format } from "../format" import { FileTime } from "../file/time" import { Filesystem } from "../util/filesystem" import { Instance } from "../project/instance" @@ -42,9 +43,8 @@ export const WriteTool = Tool.define("write", { }) await Filesystem.write(filepath, params.content) - await Bus.publish(File.Event.Edited, { - file: filepath, - }) + await Format.file(filepath) + Bus.publish(File.Event.Edited, { file: filepath }) await Bus.publish(FileWatcher.Event.Updated, { file: filepath, event: exists ? "change" : "add", diff --git a/packages/opencode/src/worktree/index.ts b/packages/opencode/src/worktree/index.ts index 0a8f0f00a6..41f11ca012 100644 --- a/packages/opencode/src/worktree/index.ts +++ b/packages/opencode/src/worktree/index.ts @@ -15,7 +15,7 @@ import { Git } from "@/git" import { Effect, FileSystem, Layer, Path, Scope, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import { NodeFileSystem, NodePath } from "@effect/platform-node" -import { makeRunPromise } from "@/effect/run-service" +import { makeRuntime } from "@/effect/run-service" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" export namespace Worktree { @@ -576,7 +576,7 @@ export namespace Worktree { Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer), ) - const runPromise = makeRunPromise(Service, defaultLayer) + const { runPromise } = makeRuntime(Service, defaultLayer) export async function makeWorktreeInfo(name?: string) { return runPromise((svc) => svc.makeWorktreeInfo(name)) diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts new file mode 100644 index 0000000000..642763e90f --- /dev/null +++ b/packages/opencode/test/bus/bus-effect.test.ts @@ -0,0 +1,164 @@ +import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { describe, expect } from "bun:test" +import { Deferred, Effect, Layer, Stream } from "effect" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { Instance } from "../../src/project/instance" +import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture" +import { testEffect } from "../lib/effect" + +const TestEvent = { + Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })), + Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })), +} + +const node = NodeChildProcessSpawner.layer.pipe( + Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)), +) + +const live = Layer.mergeAll(Bus.layer, node) + +const it = testEffect(live) + +describe("Bus (Effect-native)", () => { + it.effect("publish + subscribe stream delivers events", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const received: number[] = [] + const done = yield* Deferred.make() + + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + received.push(evt.properties.value) + if (received.length === 2) Deferred.doneUnsafe(done, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* bus.publish(TestEvent.Ping, { value: 2 }) + yield* Deferred.await(done) + + expect(received).toEqual([1, 2]) + }), + ), + ) + + it.effect("subscribe filters by event type", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const pings: number[] = [] + const done = yield* Deferred.make() + + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + pings.push(evt.properties.value) + Deferred.doneUnsafe(done, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Pong, { message: "ignored" }) + yield* bus.publish(TestEvent.Ping, { value: 42 }) + yield* Deferred.await(done) + + expect(pings).toEqual([42]) + }), + ), + ) + + it.effect("subscribeAll receives all types", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const types: string[] = [] + const done = yield* Deferred.make() + + yield* Stream.runForEach(bus.subscribeAll(), (evt) => + Effect.sync(() => { + types.push(evt.type) + if (types.length === 2) Deferred.doneUnsafe(done, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* bus.publish(TestEvent.Pong, { message: "hi" }) + yield* Deferred.await(done) + + expect(types).toContain("test.effect.ping") + expect(types).toContain("test.effect.pong") + }), + ), + ) + + it.effect("multiple subscribers each receive the event", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const bus = yield* Bus.Service + const a: number[] = [] + const b: number[] = [] + const doneA = yield* Deferred.make() + const doneB = yield* Deferred.make() + + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + a.push(evt.properties.value) + Deferred.doneUnsafe(doneA, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + Effect.sync(() => { + b.push(evt.properties.value) + Deferred.doneUnsafe(doneB, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 99 }) + yield* Deferred.await(doneA) + yield* Deferred.await(doneB) + + expect(a).toEqual([99]) + expect(b).toEqual([99]) + }), + ), + ) + + it.effect("subscribeAll stream sees InstanceDisposed on disposal", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped() + const types: string[] = [] + const seen = yield* Deferred.make() + const disposed = yield* Deferred.make() + + // Set up subscriber inside the instance + yield* Effect.gen(function* () { + const bus = yield* Bus.Service + + yield* Stream.runForEach(bus.subscribeAll(), (evt) => + Effect.sync(() => { + types.push(evt.type) + if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void) + if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void) + }), + ).pipe(Effect.forkScoped) + + yield* Effect.sleep("10 millis") + yield* bus.publish(TestEvent.Ping, { value: 1 }) + yield* Deferred.await(seen) + }).pipe(provideInstance(dir)) + + // Dispose from OUTSIDE the instance scope + yield* Effect.promise(() => Instance.disposeAll()) + yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds")) + + expect(types).toContain("test.effect.ping") + expect(types).toContain(Bus.InstanceDisposed.type) + }), + ) +}) diff --git a/packages/opencode/test/bus/bus-integration.test.ts b/packages/opencode/test/bus/bus-integration.test.ts new file mode 100644 index 0000000000..e42bd5299e --- /dev/null +++ b/packages/opencode/test/bus/bus-integration.test.ts @@ -0,0 +1,87 @@ +import { afterEach, describe, expect, test } from "bun:test" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +const TestEvent = BusEvent.define("test.integration", z.object({ value: z.number() })) + +function withInstance(directory: string, fn: () => Promise) { + return Instance.provide({ directory, fn }) +} + +describe("Bus integration: acquireRelease subscriber pattern", () => { + afterEach(() => Instance.disposeAll()) + + test("subscriber via callback facade receives events and cleans up on unsub", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent, (evt) => { + received.push(evt.properties.value) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent, { value: 1 }) + await Bus.publish(TestEvent, { value: 2 }) + await Bun.sleep(10) + + expect(received).toEqual([1, 2]) + + unsub() + await Bun.sleep(10) + await Bus.publish(TestEvent, { value: 3 }) + await Bun.sleep(10) + + expect(received).toEqual([1, 2]) + }) + }) + + test("subscribeAll receives events from multiple types", async () => { + await using tmp = await tmpdir() + const received: Array<{ type: string; value?: number }> = [] + + const OtherEvent = BusEvent.define("test.other", z.object({ value: z.number() })) + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + received.push({ type: evt.type, value: evt.properties.value }) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent, { value: 10 }) + await Bus.publish(OtherEvent, { value: 20 }) + await Bun.sleep(10) + }) + + expect(received).toEqual([ + { type: "test.integration", value: 10 }, + { type: "test.other", value: 20 }, + ]) + }) + + test("subscriber cleanup on instance disposal interrupts the stream", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + let disposed = false + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + if (evt.type === Bus.InstanceDisposed.type) { + disposed = true + return + } + received.push(evt.properties.value) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent, { value: 1 }) + await Bun.sleep(10) + }) + + await Instance.disposeAll() + await Bun.sleep(50) + + expect(received).toEqual([1]) + expect(disposed).toBe(true) + }) +}) diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts new file mode 100644 index 0000000000..3df179787d --- /dev/null +++ b/packages/opencode/test/bus/bus.test.ts @@ -0,0 +1,219 @@ +import { afterEach, describe, expect, test } from "bun:test" +import z from "zod" +import { Bus } from "../../src/bus" +import { BusEvent } from "../../src/bus/bus-event" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" + +const TestEvent = { + Ping: BusEvent.define("test.ping", z.object({ value: z.number() })), + Pong: BusEvent.define("test.pong", z.object({ message: z.string() })), +} + +function withInstance(directory: string, fn: () => Promise) { + return Instance.provide({ directory, fn }) +} + +describe("Bus", () => { + afterEach(() => Instance.disposeAll()) + + describe("publish + subscribe", () => { + test("subscriber is live immediately after subscribe returns", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bus.publish(TestEvent.Ping, { value: 42 }) + await Bun.sleep(10) + }) + + expect(received).toEqual([42]) + }) + + test("subscriber receives matching events", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + // Give the subscriber fiber time to start consuming + await Bun.sleep(10) + await Bus.publish(TestEvent.Ping, { value: 42 }) + await Bus.publish(TestEvent.Ping, { value: 99 }) + // Give subscriber time to process + await Bun.sleep(10) + }) + + expect(received).toEqual([42, 99]) + }) + + test("subscriber does not receive events of other types", async () => { + await using tmp = await tmpdir() + const pings: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + pings.push(evt.properties.value) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent.Pong, { message: "hello" }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bun.sleep(10) + }) + + expect(pings).toEqual([1]) + }) + + test("publish with no subscribers does not throw", async () => { + await using tmp = await tmpdir() + + await withInstance(tmp.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + }) + }) + }) + + describe("unsubscribe", () => { + test("unsubscribe stops delivery", async () => { + await using tmp = await tmpdir() + const received: number[] = [] + + await withInstance(tmp.path, async () => { + const unsub = Bus.subscribe(TestEvent.Ping, (evt) => { + received.push(evt.properties.value) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bun.sleep(10) + unsub() + await Bun.sleep(10) + await Bus.publish(TestEvent.Ping, { value: 2 }) + await Bun.sleep(10) + }) + + expect(received).toEqual([1]) + }) + }) + + describe("subscribeAll", () => { + test("subscribeAll is live immediately after subscribe returns", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + received.push(evt.type) + }) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bun.sleep(10) + }) + + expect(received).toEqual(["test.ping"]) + }) + + test("receives all event types", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + received.push(evt.type) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bus.publish(TestEvent.Pong, { message: "hi" }) + await Bun.sleep(10) + }) + + expect(received).toContain("test.ping") + expect(received).toContain("test.pong") + }) + }) + + describe("multiple subscribers", () => { + test("all subscribers for same event type are called", async () => { + await using tmp = await tmpdir() + const a: number[] = [] + const b: number[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + a.push(evt.properties.value) + }) + Bus.subscribe(TestEvent.Ping, (evt) => { + b.push(evt.properties.value) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent.Ping, { value: 7 }) + await Bun.sleep(10) + }) + + expect(a).toEqual([7]) + expect(b).toEqual([7]) + }) + }) + + describe("instance isolation", () => { + test("events in one directory do not reach subscribers in another", async () => { + await using tmpA = await tmpdir() + await using tmpB = await tmpdir() + const receivedA: number[] = [] + const receivedB: number[] = [] + + await withInstance(tmpA.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + receivedA.push(evt.properties.value) + }) + await Bun.sleep(10) + }) + + await withInstance(tmpB.path, async () => { + Bus.subscribe(TestEvent.Ping, (evt) => { + receivedB.push(evt.properties.value) + }) + await Bun.sleep(10) + }) + + await withInstance(tmpA.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bun.sleep(10) + }) + + await withInstance(tmpB.path, async () => { + await Bus.publish(TestEvent.Ping, { value: 2 }) + await Bun.sleep(10) + }) + + expect(receivedA).toEqual([1]) + expect(receivedB).toEqual([2]) + }) + }) + + describe("instance disposal", () => { + test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => { + await using tmp = await tmpdir() + const received: string[] = [] + + await withInstance(tmp.path, async () => { + Bus.subscribeAll((evt) => { + received.push(evt.type) + }) + await Bun.sleep(10) + await Bus.publish(TestEvent.Ping, { value: 1 }) + await Bun.sleep(10) + }) + + // Instance.disposeAll triggers the finalizer which publishes InstanceDisposed + await Instance.disposeAll() + await Bun.sleep(50) + + expect(received).toContain("test.ping") + expect(received).toContain(Bus.InstanceDisposed.type) + }) + }) +}) diff --git a/packages/opencode/test/effect/run-service.test.ts b/packages/opencode/test/effect/run-service.test.ts index c9f630585e..b2004fb664 100644 --- a/packages/opencode/test/effect/run-service.test.ts +++ b/packages/opencode/test/effect/run-service.test.ts @@ -1,10 +1,10 @@ import { expect, test } from "bun:test" import { Effect, Layer, ServiceMap } from "effect" -import { makeRunPromise } from "../../src/effect/run-service" +import { makeRuntime } from "../../src/effect/run-service" class Shared extends ServiceMap.Service()("@test/Shared") {} -test("makeRunPromise shares dependent layers through the shared memo map", async () => { +test("makeRuntime shares dependent layers through the shared memo map", async () => { let n = 0 const shared = Layer.effect( @@ -37,8 +37,8 @@ test("makeRunPromise shares dependent layers through the shared memo map", async }), ).pipe(Layer.provide(shared)) - const runOne = makeRunPromise(One, one) - const runTwo = makeRunPromise(Two, two) + const { runPromise: runOne } = makeRuntime(One, one) + const { runPromise: runTwo } = makeRuntime(Two, two) expect(await runOne((svc) => svc.get())).toBe(1) expect(await runTwo((svc) => svc.get())).toBe(1) diff --git a/packages/opencode/test/file/watcher.test.ts b/packages/opencode/test/file/watcher.test.ts index 6658634e54..f98a580f62 100644 --- a/packages/opencode/test/file/watcher.test.ts +++ b/packages/opencode/test/file/watcher.test.ts @@ -2,9 +2,8 @@ import { $ } from "bun" import { afterEach, describe, expect, test } from "bun:test" import fs from "fs/promises" import path from "path" -import { Deferred, Effect, Option } from "effect" +import { ConfigProvider, Deferred, Effect, Layer, ManagedRuntime, Option } from "effect" import { tmpdir } from "../fixture/fixture" -import { watcherConfigLayer, withServices } from "../fixture/instance" import { Bus } from "../../src/bus" import { FileWatcher } from "../../src/file/watcher" import { Instance } from "../../src/project/instance" @@ -16,20 +15,33 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc // Helpers // --------------------------------------------------------------------------- +const watcherConfigLayer = ConfigProvider.layer( + ConfigProvider.fromUnknown({ + OPENCODE_EXPERIMENTAL_FILEWATCHER: "true", + OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false", + }), +) + type WatcherEvent = { file: string; event: "add" | "change" | "unlink" } /** Run `body` with a live FileWatcher service. */ function withWatcher(directory: string, body: Effect.Effect) { - return withServices( + return Instance.provide({ directory, - FileWatcher.layer, - async (rt) => { - await rt.runPromise(FileWatcher.Service.use((s) => s.init())) - await Effect.runPromise(ready(directory)) - await Effect.runPromise(body) + fn: async () => { + const layer: Layer.Layer = FileWatcher.layer.pipe( + Layer.provide(watcherConfigLayer), + ) + const rt = ManagedRuntime.make(layer) + try { + await rt.runPromise(FileWatcher.Service.use((s) => s.init())) + await Effect.runPromise(ready(directory)) + await Effect.runPromise(body) + } finally { + await rt.dispose() + } }, - { provide: [watcherConfigLayer] }, - ) + }) } function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) { diff --git a/packages/opencode/test/fixture/fixture.ts b/packages/opencode/test/fixture/fixture.ts index f2f864e8b1..a36a3f9d84 100644 --- a/packages/opencode/test/fixture/fixture.ts +++ b/packages/opencode/test/fixture/fixture.ts @@ -2,7 +2,10 @@ import { $ } from "bun" import * as fs from "fs/promises" import os from "os" import path from "path" +import { Effect, FileSystem, ServiceMap } from "effect" +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import type { Config } from "../../src/config/config" +import { Instance } from "../../src/project/instance" // Strip null bytes from paths (defensive fix for CI environment issues) function sanitizePath(p: string): string { @@ -71,3 +74,68 @@ export async function tmpdir(options?: TmpDirOptions) { } return result } + +/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */ +export function tmpdirScoped(options?: { git?: boolean; config?: Partial }) { + return Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner + const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" }) + + const git = (...args: string[]) => + spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode)) + + if (options?.git) { + yield* git("init") + yield* git("config", "core.fsmonitor", "false") + yield* git("config", "user.email", "test@opencode.test") + yield* git("config", "user.name", "Test") + yield* git("commit", "--allow-empty", "-m", "root commit") + } + + if (options?.config) { + yield* fs.writeFileString( + path.join(dir, "opencode.json"), + JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }), + ) + } + + return dir + }) +} + +export const provideInstance = + (directory: string) => + (self: Effect.Effect): Effect.Effect => + Effect.servicesWith((services: ServiceMap.ServiceMap) => + Effect.promise(async () => + Instance.provide({ + directory, + fn: () => Effect.runPromiseWith(services)(self), + }), + ), + ) + +export function provideTmpdirInstance( + self: (path: string) => Effect.Effect, + options?: { git?: boolean; config?: Partial }, +) { + return Effect.gen(function* () { + const path = yield* tmpdirScoped(options) + let provided = false + + yield* Effect.addFinalizer(() => + provided + ? Effect.promise(() => + Instance.provide({ + directory: path, + fn: () => Instance.dispose(), + }), + ).pipe(Effect.ignore) + : Effect.void, + ) + + provided = true + return yield* self(path).pipe(provideInstance(path)) + }) +} diff --git a/packages/opencode/test/fixture/instance.ts b/packages/opencode/test/fixture/instance.ts deleted file mode 100644 index 67af82fc8b..0000000000 --- a/packages/opencode/test/fixture/instance.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { ConfigProvider, Layer, ManagedRuntime } from "effect" -import { InstanceContext } from "../../src/effect/instance-context" -import { Instance } from "../../src/project/instance" - -/** ConfigProvider that enables the experimental file watcher. */ -export const watcherConfigLayer = ConfigProvider.layer( - ConfigProvider.fromUnknown({ - OPENCODE_EXPERIMENTAL_FILEWATCHER: "true", - OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false", - }), -) - -/** - * Boot an Instance with the given service layers and run `body` with - * the ManagedRuntime. Cleanup is automatic — the runtime is disposed - * and Instance context is torn down when `body` completes. - * - * Layers may depend on InstanceContext (provided automatically). - * Pass extra layers via `options.provide` (e.g. ConfigProvider.layer). - */ -export function withServices( - directory: string, - layer: Layer.Layer, - body: (rt: ManagedRuntime.ManagedRuntime) => Promise, - options?: { provide?: Layer.Layer[] }, -) { - return Instance.provide({ - directory, - fn: async () => { - const ctx = Layer.sync(InstanceContext, () => - InstanceContext.of({ - directory: Instance.directory, - worktree: Instance.worktree, - project: Instance.project, - }), - ) - let resolved: Layer.Layer = layer.pipe(Layer.provide(ctx)) as any - if (options?.provide) { - for (const l of options.provide) { - resolved = resolved.pipe(Layer.provide(l)) as any - } - } - const rt = ManagedRuntime.make(resolved) - try { - await body(rt) - } finally { - await rt.dispose() - } - }, - }) -} diff --git a/packages/opencode/test/format/format.test.ts b/packages/opencode/test/format/format.test.ts index 68fe71e03f..c718c13e8b 100644 --- a/packages/opencode/test/format/format.test.ts +++ b/packages/opencode/test/format/format.test.ts @@ -1,172 +1,182 @@ -import { Effect } from "effect" -import { afterEach, describe, expect, test } from "bun:test" -import { tmpdir } from "../fixture/fixture" -import { withServices } from "../fixture/instance" -import { Bus } from "../../src/bus" -import { File } from "../../src/file" +import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { describe, expect } from "bun:test" +import { Effect, Layer } from "effect" +import { provideTmpdirInstance } from "../fixture/fixture" +import { testEffect } from "../lib/effect" import { Format } from "../../src/format" import * as Formatter from "../../src/format/formatter" -import { Instance } from "../../src/project/instance" + +const node = NodeChildProcessSpawner.layer.pipe( + Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)), +) + +const it = testEffect(Layer.mergeAll(Format.layer, node)) describe("Format", () => { - afterEach(async () => { - await Instance.disposeAll() - }) + it.effect("status() returns built-in formatters when no config overrides", () => + provideTmpdirInstance(() => + Format.Service.use((fmt) => + Effect.gen(function* () { + const statuses = yield* fmt.status() + expect(Array.isArray(statuses)).toBe(true) + expect(statuses.length).toBeGreaterThan(0) - test("status() returns built-in formatters when no config overrides", async () => { - await using tmp = await tmpdir() + for (const item of statuses) { + expect(typeof item.name).toBe("string") + expect(Array.isArray(item.extensions)).toBe(true) + expect(typeof item.enabled).toBe("boolean") + } - await withServices(tmp.path, Format.layer, async (rt) => { - const statuses = await rt.runPromise(Format.Service.use((s) => s.status())) - expect(Array.isArray(statuses)).toBe(true) - expect(statuses.length).toBeGreaterThan(0) + const gofmt = statuses.find((item) => item.name === "gofmt") + expect(gofmt).toBeDefined() + expect(gofmt!.extensions).toContain(".go") + }), + ), + ), + ) - for (const s of statuses) { - expect(typeof s.name).toBe("string") - expect(Array.isArray(s.extensions)).toBe(true) - expect(typeof s.enabled).toBe("boolean") - } + it.effect("status() returns empty list when formatter is disabled", () => + provideTmpdirInstance( + () => + Format.Service.use((fmt) => + Effect.gen(function* () { + expect(yield* fmt.status()).toEqual([]) + }), + ), + { config: { formatter: false } }, + ), + ) - const gofmt = statuses.find((s) => s.name === "gofmt") - expect(gofmt).toBeDefined() - expect(gofmt!.extensions).toContain(".go") - }) - }) - - test("status() returns empty list when formatter is disabled", async () => { - await using tmp = await tmpdir({ - config: { formatter: false }, - }) - - await withServices(tmp.path, Format.layer, async (rt) => { - const statuses = await rt.runPromise(Format.Service.use((s) => s.status())) - expect(statuses).toEqual([]) - }) - }) - - test("status() excludes formatters marked as disabled in config", async () => { - await using tmp = await tmpdir({ - config: { - formatter: { - gofmt: { disabled: true }, + it.effect("status() excludes formatters marked as disabled in config", () => + provideTmpdirInstance( + () => + Format.Service.use((fmt) => + Effect.gen(function* () { + const statuses = yield* fmt.status() + const gofmt = statuses.find((item) => item.name === "gofmt") + expect(gofmt).toBeUndefined() + }), + ), + { + config: { + formatter: { + gofmt: { disabled: true }, + }, }, }, - }) + ), + ) - await withServices(tmp.path, Format.layer, async (rt) => { - const statuses = await rt.runPromise(Format.Service.use((s) => s.status())) - const gofmt = statuses.find((s) => s.name === "gofmt") - expect(gofmt).toBeUndefined() - }) - }) + it.effect("service initializes without error", () => + provideTmpdirInstance(() => Format.Service.use(() => Effect.void)), + ) - test("service initializes without error", async () => { - await using tmp = await tmpdir() - - await withServices(tmp.path, Format.layer, async (rt) => { - await rt.runPromise(Format.Service.use(() => Effect.void)) - }) - }) - - test("status() initializes formatter state per directory", async () => { - await using off = await tmpdir({ - config: { formatter: false }, - }) - await using on = await tmpdir() - - const a = await Instance.provide({ - directory: off.path, - fn: () => Format.status(), - }) - const b = await Instance.provide({ - directory: on.path, - fn: () => Format.status(), - }) - - expect(a).toEqual([]) - expect(b.length).toBeGreaterThan(0) - }) - - test("runs enabled checks for matching formatters in parallel", async () => { - await using tmp = await tmpdir() - - const file = `${tmp.path}/test.parallel` - await Bun.write(file, "x") - - const one = { - extensions: Formatter.gofmt.extensions, - enabled: Formatter.gofmt.enabled, - command: Formatter.gofmt.command, - } - const two = { - extensions: Formatter.mix.extensions, - enabled: Formatter.mix.enabled, - command: Formatter.mix.command, - } - - let active = 0 - let max = 0 - - Formatter.gofmt.extensions = [".parallel"] - Formatter.mix.extensions = [".parallel"] - Formatter.gofmt.command = ["sh", "-c", "true"] - Formatter.mix.command = ["sh", "-c", "true"] - Formatter.gofmt.enabled = async () => { - active++ - max = Math.max(max, active) - await Bun.sleep(20) - active-- - return true - } - Formatter.mix.enabled = async () => { - active++ - max = Math.max(max, active) - await Bun.sleep(20) - active-- - return true - } - - try { - await withServices(tmp.path, Format.layer, async (rt) => { - await rt.runPromise(Format.Service.use((s) => s.init())) - await Bus.publish(File.Event.Edited, { file }) + it.effect("status() initializes formatter state per directory", () => + Effect.gen(function* () { + const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), { + config: { formatter: false }, }) - } finally { - Formatter.gofmt.extensions = one.extensions - Formatter.gofmt.enabled = one.enabled - Formatter.gofmt.command = one.command - Formatter.mix.extensions = two.extensions - Formatter.mix.enabled = two.enabled - Formatter.mix.command = two.command - } + const b = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status())) - expect(max).toBe(2) - }) + expect(a).toEqual([]) + expect(b.length).toBeGreaterThan(0) + }), + ) - test("runs matching formatters sequentially for the same file", async () => { - await using tmp = await tmpdir({ - config: { - formatter: { - first: { - command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"], - extensions: [".seq"], - }, - second: { - command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"], - extensions: [".seq"], + it.effect("runs enabled checks for matching formatters in parallel", () => + provideTmpdirInstance((path) => + Effect.gen(function* () { + const file = `${path}/test.parallel` + yield* Effect.promise(() => Bun.write(file, "x")) + + const one = { + extensions: Formatter.gofmt.extensions, + enabled: Formatter.gofmt.enabled, + command: Formatter.gofmt.command, + } + const two = { + extensions: Formatter.mix.extensions, + enabled: Formatter.mix.enabled, + command: Formatter.mix.command, + } + + let active = 0 + let max = 0 + + yield* Effect.acquireUseRelease( + Effect.sync(() => { + Formatter.gofmt.extensions = [".parallel"] + Formatter.mix.extensions = [".parallel"] + Formatter.gofmt.command = ["sh", "-c", "true"] + Formatter.mix.command = ["sh", "-c", "true"] + Formatter.gofmt.enabled = async () => { + active++ + max = Math.max(max, active) + await Bun.sleep(20) + active-- + return true + } + Formatter.mix.enabled = async () => { + active++ + max = Math.max(max, active) + await Bun.sleep(20) + active-- + return true + } + }), + () => + Format.Service.use((fmt) => + Effect.gen(function* () { + yield* fmt.init() + yield* fmt.file(file) + }), + ), + () => + Effect.sync(() => { + Formatter.gofmt.extensions = one.extensions + Formatter.gofmt.enabled = one.enabled + Formatter.gofmt.command = one.command + Formatter.mix.extensions = two.extensions + Formatter.mix.enabled = two.enabled + Formatter.mix.command = two.command + }), + ) + + expect(max).toBe(2) + }), + ), + ) + + it.effect("runs matching formatters sequentially for the same file", () => + provideTmpdirInstance( + (path) => + Effect.gen(function* () { + const file = `${path}/test.seq` + yield* Effect.promise(() => Bun.write(file, "x")) + + yield* Format.Service.use((fmt) => + Effect.gen(function* () { + yield* fmt.init() + yield* fmt.file(file) + }), + ) + + expect(yield* Effect.promise(() => Bun.file(file).text())).toBe("xAB") + }), + { + config: { + formatter: { + first: { + command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"], + extensions: [".seq"], + }, + second: { + command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"], + extensions: [".seq"], + }, }, }, }, - }) - - const file = `${tmp.path}/test.seq` - await Bun.write(file, "x") - - await withServices(tmp.path, Format.layer, async (rt) => { - await rt.runPromise(Format.Service.use((s) => s.init())) - await Bus.publish(File.Event.Edited, { file }) - }) - - expect(await Bun.file(file).text()).toBe("xAB") - }) + ), + ) }) diff --git a/packages/opencode/test/project/vcs.test.ts b/packages/opencode/test/project/vcs.test.ts index f55989caff..50282b5f6c 100644 --- a/packages/opencode/test/project/vcs.test.ts +++ b/packages/opencode/test/project/vcs.test.ts @@ -2,9 +2,7 @@ import { $ } from "bun" import { afterEach, describe, expect, test } from "bun:test" import fs from "fs/promises" import path from "path" -import { Effect, Layer, ManagedRuntime } from "effect" import { tmpdir } from "../fixture/fixture" -import { watcherConfigLayer, withServices } from "../fixture/instance" import { FileWatcher } from "../../src/file/watcher" import { Instance } from "../../src/project/instance" import { GlobalBus } from "../../src/bus/global" @@ -17,28 +15,26 @@ const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe // Helpers // --------------------------------------------------------------------------- -function withVcs( - directory: string, - body: (rt: ManagedRuntime.ManagedRuntime) => Promise, -) { - return withServices( +async function withVcs(directory: string, body: () => Promise) { + return Instance.provide({ directory, - Layer.merge(FileWatcher.layer, Vcs.defaultLayer), - async (rt) => { - await rt.runPromise(FileWatcher.Service.use((s) => s.init())) - await rt.runPromise(Vcs.Service.use((s) => s.init())) + fn: async () => { + FileWatcher.init() + Vcs.init() await Bun.sleep(500) - await body(rt) + await body() }, - { provide: [watcherConfigLayer] }, - ) + }) } -function withVcsOnly( - directory: string, - body: (rt: ManagedRuntime.ManagedRuntime) => Promise, -) { - return withServices(directory, Vcs.defaultLayer, body) +function withVcsOnly(directory: string, body: () => Promise) { + return Instance.provide({ + directory, + fn: async () => { + Vcs.init() + await body() + }, + }) } type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } } @@ -82,8 +78,8 @@ describeVcs("Vcs", () => { test("branch() returns current branch name", async () => { await using tmp = await tmpdir({ git: true }) - await withVcs(tmp.path, async (rt) => { - const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch())) + await withVcs(tmp.path, async () => { + const branch = await Vcs.branch() expect(branch).toBeDefined() expect(typeof branch).toBe("string") }) @@ -92,8 +88,8 @@ describeVcs("Vcs", () => { test("branch() returns undefined for non-git directories", async () => { await using tmp = await tmpdir() - await withVcs(tmp.path, async (rt) => { - const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch())) + await withVcs(tmp.path, async () => { + const branch = await Vcs.branch() expect(branch).toBeUndefined() }) }) @@ -119,14 +115,14 @@ describeVcs("Vcs", () => { const branch = `test-${Math.random().toString(36).slice(2)}` await $`git branch ${branch}`.cwd(tmp.path).quiet() - await withVcs(tmp.path, async (rt) => { + await withVcs(tmp.path, async () => { const pending = nextBranchUpdate(tmp.path) const head = path.join(tmp.path, ".git", "HEAD") await fs.writeFile(head, `ref: refs/heads/${branch}\n`) await pending - const current = await rt.runPromise(Vcs.Service.use((s) => s.branch())) + const current = await Vcs.branch() expect(current).toBe(branch) }) }) @@ -141,8 +137,8 @@ describe("Vcs diff", () => { await using tmp = await tmpdir({ git: true }) await $`git branch -M main`.cwd(tmp.path).quiet() - await withVcsOnly(tmp.path, async (rt) => { - const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch())) + await withVcsOnly(tmp.path, async () => { + const branch = await Vcs.defaultBranch() expect(branch).toBe("main") }) }) @@ -152,8 +148,8 @@ describe("Vcs diff", () => { await $`git branch -M trunk`.cwd(tmp.path).quiet() await $`git config init.defaultBranch trunk`.cwd(tmp.path).quiet() - await withVcsOnly(tmp.path, async (rt) => { - const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch())) + await withVcsOnly(tmp.path, async () => { + const branch = await Vcs.defaultBranch() expect(branch).toBe("trunk") }) }) @@ -165,10 +161,10 @@ describe("Vcs diff", () => { const dir = path.join(wt.path, "feature") await $`git worktree add -b feature/test ${dir} HEAD`.cwd(tmp.path).quiet() - await withVcsOnly(dir, async (rt) => { + await withVcsOnly(dir, async () => { const [branch, base] = await Promise.all([ - rt.runPromise(Vcs.Service.use((s) => s.branch())), - rt.runPromise(Vcs.Service.use((s) => s.defaultBranch())), + Vcs.branch(), + Vcs.defaultBranch(), ]) expect(branch).toBe("feature/test") expect(base).toBe("main") @@ -182,8 +178,8 @@ describe("Vcs diff", () => { await $`git commit --no-gpg-sign -m "add file"`.cwd(tmp.path).quiet() await fs.writeFile(path.join(tmp.path, "file.txt"), "changed\n", "utf-8") - await withVcsOnly(tmp.path, async (rt) => { - const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git"))) + await withVcsOnly(tmp.path, async () => { + const diff = await Vcs.diff("git") expect(diff).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -199,8 +195,8 @@ describe("Vcs diff", () => { await using tmp = await tmpdir({ git: true }) await fs.writeFile(path.join(tmp.path, weird), "hello\n", "utf-8") - await withVcsOnly(tmp.path, async (rt) => { - const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git"))) + await withVcsOnly(tmp.path, async () => { + const diff = await Vcs.diff("git") expect(diff).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -220,8 +216,8 @@ describe("Vcs diff", () => { await $`git add .`.cwd(tmp.path).quiet() await $`git commit --no-gpg-sign -m "branch file"`.cwd(tmp.path).quiet() - await withVcsOnly(tmp.path, async (rt) => { - const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("branch"))) + await withVcsOnly(tmp.path, async () => { + const diff = await Vcs.diff("branch") expect(diff).toEqual( expect.arrayContaining([ expect.objectContaining({ diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index f96750d7d9..5304f4ea8b 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -110,10 +110,16 @@ describe("SyncEvent", () => { type: string properties: { id: string; name: string } }> = [] - const unsub = Bus.subscribeAll((event) => events.push(event)) + const received = new Promise((resolve) => { + Bus.subscribeAll((event) => { + events.push(event) + resolve() + }) + }) SyncEvent.run(Created, { id: "evt_1", name: "test" }) + await received expect(events).toHaveLength(1) expect(events[0]).toEqual({ type: "item.created", @@ -122,8 +128,6 @@ describe("SyncEvent", () => { name: "test", }, }) - - unsub() }), ) }) diff --git a/packages/opencode/test/tool/edit.test.ts b/packages/opencode/test/tool/edit.test.ts index f6b1ee5c92..96d41400e3 100644 --- a/packages/opencode/test/tool/edit.test.ts +++ b/packages/opencode/test/tool/edit.test.ts @@ -89,7 +89,6 @@ describe("tool.edit", () => { const { FileWatcher } = await import("../../src/file/watcher") const events: string[] = [] - const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited")) const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated")) const edit = await EditTool.init() @@ -102,9 +101,7 @@ describe("tool.edit", () => { ctx, ) - expect(events).toContain("edited") expect(events).toContain("updated") - unsubEdited() unsubUpdated() }, }) @@ -305,11 +302,9 @@ describe("tool.edit", () => { await FileTime.read(ctx.sessionID, filepath) const { Bus } = await import("../../src/bus") - const { File } = await import("../../src/file") const { FileWatcher } = await import("../../src/file/watcher") const events: string[] = [] - const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited")) const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated")) const edit = await EditTool.init() @@ -322,9 +317,7 @@ describe("tool.edit", () => { ctx, ) - expect(events).toContain("edited") expect(events).toContain("updated") - unsubEdited() unsubUpdated() }, })