From 41c77ccb33b26c09aca2ab96661dc31a5db70264 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 24 Mar 2026 10:35:24 -0400 Subject: [PATCH] fix: restore cross-spawn behavior for effect child processes (#18798) --- .../src/effect/cross-spawn-spawner.ts | 476 ++++++++++++++++ packages/opencode/src/installation/index.ts | 5 +- packages/opencode/src/snapshot/index.ts | 7 +- .../test/effect/cross-spawn-spawner.test.ts | 518 ++++++++++++++++++ 4 files changed, 1001 insertions(+), 5 deletions(-) create mode 100644 packages/opencode/src/effect/cross-spawn-spawner.ts create mode 100644 packages/opencode/test/effect/cross-spawn-spawner.test.ts diff --git a/packages/opencode/src/effect/cross-spawn-spawner.ts b/packages/opencode/src/effect/cross-spawn-spawner.ts new file mode 100644 index 0000000000..f7b8786d08 --- /dev/null +++ b/packages/opencode/src/effect/cross-spawn-spawner.ts @@ -0,0 +1,476 @@ +import type * as Arr from "effect/Array" +import { NodeSink, NodeStream } from "@effect/platform-node" +import * as Deferred from "effect/Deferred" +import * as Effect from "effect/Effect" +import * as Exit from "effect/Exit" +import * as FileSystem from "effect/FileSystem" +import * as Layer from "effect/Layer" +import * as Path from "effect/Path" +import * as PlatformError from "effect/PlatformError" +import * as Predicate from "effect/Predicate" +import type * as Scope from "effect/Scope" +import * as Sink from "effect/Sink" +import * as Stream from "effect/Stream" +import * as ChildProcess from "effect/unstable/process/ChildProcess" +import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner" +import { + ChildProcessSpawner, + ExitCode, + make as makeSpawner, + makeHandle, + ProcessId, +} from "effect/unstable/process/ChildProcessSpawner" +import * as NodeChildProcess from "node:child_process" +import { PassThrough } from "node:stream" +import launch from "cross-spawn" + +const toError = (err: unknown): Error => (err instanceof globalThis.Error ? err : new globalThis.Error(String(err))) + +const toTag = (err: NodeJS.ErrnoException): PlatformError.SystemErrorTag => { + switch (err.code) { + case "ENOENT": + return "NotFound" + case "EACCES": + return "PermissionDenied" + case "EEXIST": + return "AlreadyExists" + case "EISDIR": + return "BadResource" + case "ENOTDIR": + return "BadResource" + case "EBUSY": + return "Busy" + case "ELOOP": + return "BadResource" + default: + return "Unknown" + } +} + +const flatten = (command: ChildProcess.Command) => { + const commands: Array = [] + const opts: Array = [] + + const walk = (cmd: ChildProcess.Command): void => { + switch (cmd._tag) { + case "StandardCommand": + commands.push(cmd) + return + case "PipedCommand": + walk(cmd.left) + opts.push(cmd.options) + walk(cmd.right) + return + } + } + + walk(command) + if (commands.length === 0) throw new Error("flatten produced empty commands array") + const [head, ...tail] = commands + return { + commands: [head, ...tail] as Arr.NonEmptyReadonlyArray, + opts, + } +} + +const toPlatformError = ( + method: string, + err: NodeJS.ErrnoException, + command: ChildProcess.Command, +): PlatformError.PlatformError => { + const cmd = flatten(command) + .commands.map((x) => `${x.command} ${x.args.join(" ")}`) + .join(" | ") + return PlatformError.systemError({ + _tag: toTag(err), + module: "ChildProcess", + method, + pathOrDescriptor: cmd, + syscall: err.syscall, + cause: err, + }) +} + +type ExitSignal = Deferred.Deferred + +export const make = Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem + const path = yield* Path.Path + + const cwd = Effect.fnUntraced(function* (opts: ChildProcess.CommandOptions) { + if (Predicate.isUndefined(opts.cwd)) return undefined + yield* fs.access(opts.cwd) + return path.resolve(opts.cwd) + }) + + const env = (opts: ChildProcess.CommandOptions) => + opts.extendEnv ? { ...globalThis.process.env, ...opts.env } : opts.env + + const input = (x: ChildProcess.CommandInput | undefined): NodeChildProcess.IOType | undefined => + Stream.isStream(x) ? "pipe" : x + + const output = (x: ChildProcess.CommandOutput | undefined): NodeChildProcess.IOType | undefined => + Sink.isSink(x) ? "pipe" : x + + const stdin = (opts: ChildProcess.CommandOptions): ChildProcess.StdinConfig => { + const cfg: ChildProcess.StdinConfig = { stream: "pipe", encoding: "utf-8", endOnDone: true } + if (Predicate.isUndefined(opts.stdin)) return cfg + if (typeof opts.stdin === "string") return { ...cfg, stream: opts.stdin } + if (Stream.isStream(opts.stdin)) return { ...cfg, stream: opts.stdin } + return { + stream: opts.stdin.stream, + encoding: opts.stdin.encoding ?? cfg.encoding, + endOnDone: opts.stdin.endOnDone ?? cfg.endOnDone, + } + } + + const stdio = (opts: ChildProcess.CommandOptions, key: "stdout" | "stderr"): ChildProcess.StdoutConfig => { + const cfg = opts[key] + if (Predicate.isUndefined(cfg)) return { stream: "pipe" } + if (typeof cfg === "string") return { stream: cfg } + if (Sink.isSink(cfg)) return { stream: cfg } + return { stream: cfg.stream } + } + + const fds = (opts: ChildProcess.CommandOptions) => { + if (Predicate.isUndefined(opts.additionalFds)) return [] + return Object.entries(opts.additionalFds) + .flatMap(([name, config]) => { + const fd = ChildProcess.parseFdName(name) + return Predicate.isUndefined(fd) ? [] : [{ fd, config }] + }) + .toSorted((a, b) => a.fd - b.fd) + } + + const stdios = ( + sin: ChildProcess.StdinConfig, + sout: ChildProcess.StdoutConfig, + serr: ChildProcess.StderrConfig, + extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>, + ): NodeChildProcess.StdioOptions => { + const pipe = (x: NodeChildProcess.IOType | undefined) => + process.platform === "win32" && x === "pipe" ? "overlapped" : x + const arr: Array = [ + pipe(input(sin.stream)), + pipe(output(sout.stream)), + pipe(output(serr.stream)), + ] + if (extra.length === 0) return arr as NodeChildProcess.StdioOptions + const max = extra.reduce((acc, x) => Math.max(acc, x.fd), 2) + for (let i = 3; i <= max; i++) arr[i] = "ignore" + for (const x of extra) arr[x.fd] = pipe("pipe") + return arr as NodeChildProcess.StdioOptions + } + + const setupFds = Effect.fnUntraced(function* ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>, + ) { + if (extra.length === 0) { + return { + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + } + } + + const ins = new Map>() + const outs = new Map>() + + for (const x of extra) { + const node = proc.stdio[x.fd] + switch (x.config.type) { + case "input": { + let sink: Sink.Sink = Sink.drain + if (node && "write" in node) { + sink = NodeSink.fromWritable({ + evaluate: () => node, + onError: (err) => toPlatformError(`fromWritable(fd${x.fd})`, toError(err), command), + endOnDone: true, + }) + } + if (x.config.stream) yield* Effect.forkScoped(Stream.run(x.config.stream, sink)) + ins.set(x.fd, sink) + break + } + case "output": { + let stream: Stream.Stream = Stream.empty + if (node && "read" in node) { + const tap = new PassThrough() + node.on("error", (err) => tap.destroy(toError(err))) + node.pipe(tap) + stream = NodeStream.fromReadable({ + evaluate: () => tap, + onError: (err) => toPlatformError(`fromReadable(fd${x.fd})`, toError(err), command), + }) + } + if (x.config.sink) stream = Stream.transduce(stream, x.config.sink) + outs.set(x.fd, stream) + break + } + } + } + + return { + getInputFd: (fd: number) => ins.get(fd) ?? Sink.drain, + getOutputFd: (fd: number) => outs.get(fd) ?? Stream.empty, + } + }) + + const setupStdin = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + cfg: ChildProcess.StdinConfig, + ) => + Effect.suspend(() => { + let sink: Sink.Sink = Sink.drain + if (Predicate.isNotNull(proc.stdin)) { + sink = NodeSink.fromWritable({ + evaluate: () => proc.stdin!, + onError: (err) => toPlatformError("fromWritable(stdin)", toError(err), command), + endOnDone: cfg.endOnDone, + encoding: cfg.encoding, + }) + } + if (Stream.isStream(cfg.stream)) return Effect.as(Effect.forkScoped(Stream.run(cfg.stream, sink)), sink) + return Effect.succeed(sink) + }) + + const setupOutput = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + out: ChildProcess.StdoutConfig, + err: ChildProcess.StderrConfig, + ) => { + let stdout = proc.stdout + ? NodeStream.fromReadable({ + evaluate: () => proc.stdout!, + onError: (cause) => toPlatformError("fromReadable(stdout)", toError(cause), command), + }) + : Stream.empty + let stderr = proc.stderr + ? NodeStream.fromReadable({ + evaluate: () => proc.stderr!, + onError: (cause) => toPlatformError("fromReadable(stderr)", toError(cause), command), + }) + : Stream.empty + + if (Sink.isSink(out.stream)) stdout = Stream.transduce(stdout, out.stream) + if (Sink.isSink(err.stream)) stderr = Stream.transduce(stderr, err.stream) + + return { stdout, stderr, all: Stream.merge(stdout, stderr) } + } + + const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) => + Effect.callback((resume) => { + const signal = Deferred.makeUnsafe() + const proc = launch(command.command, command.args, opts) + let end = false + let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined + proc.on("error", (err) => { + resume(Effect.fail(toPlatformError("spawn", err, command))) + }) + proc.on("exit", (...args) => { + exit = args + }) + proc.on("close", (...args) => { + if (end) return + end = true + Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args)) + }) + proc.on("spawn", () => { + resume(Effect.succeed([proc, signal])) + }) + return Effect.sync(() => { + proc.kill("SIGTERM") + }) + }) + + const killGroup = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + signal: NodeJS.Signals, + ) => { + if (globalThis.process.platform === "win32") { + return Effect.callback((resume) => { + NodeChildProcess.exec(`taskkill /pid ${proc.pid} /T /F`, { windowsHide: true }, (err) => { + if (err) return resume(Effect.fail(toPlatformError("kill", toError(err), command))) + resume(Effect.void) + }) + }) + } + + return Effect.try({ + try: () => { + globalThis.process.kill(-proc.pid!, signal) + }, + catch: (err) => toPlatformError("kill", toError(err), command), + }) + } + + const killOne = ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + signal: NodeJS.Signals, + ) => + Effect.suspend(() => { + if (proc.kill(signal)) return Effect.void + return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command)) + }) + + const timeout = + ( + proc: NodeChildProcess.ChildProcess, + command: ChildProcess.StandardCommand, + opts: ChildProcess.KillOptions | undefined, + ) => + ( + f: ( + command: ChildProcess.StandardCommand, + proc: NodeChildProcess.ChildProcess, + signal: NodeJS.Signals, + ) => Effect.Effect, + ) => { + const signal = opts?.killSignal ?? "SIGTERM" + if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal) + return Effect.timeoutOrElse(f(command, proc, signal), { + duration: opts.forceKillAfter, + onTimeout: () => f(command, proc, "SIGKILL"), + }) + } + + const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => { + const opt = from ?? "stdout" + switch (opt) { + case "stdout": + return handle.stdout + case "stderr": + return handle.stderr + case "all": + return handle.all + default: { + const fd = ChildProcess.parseFdName(opt) + return Predicate.isNotUndefined(fd) ? handle.getOutputFd(fd) : handle.stdout + } + } + } + + const spawnCommand: ( + command: ChildProcess.Command, + ) => Effect.Effect = Effect.fnUntraced( + function* (command) { + switch (command._tag) { + case "StandardCommand": { + const sin = stdin(command.options) + const sout = stdio(command.options, "stdout") + const serr = stdio(command.options, "stderr") + const extra = fds(command.options) + const dir = yield* cwd(command.options) + + const [proc, signal] = yield* Effect.acquireRelease( + spawn(command, { + cwd: dir, + env: env(command.options), + stdio: stdios(sin, sout, serr, extra), + detached: command.options.detached ?? process.platform !== "win32", + shell: command.options.shell, + windowsHide: process.platform === "win32", + }), + Effect.fnUntraced(function* ([proc, signal]) { + const done = yield* Deferred.isDone(signal) + const kill = timeout(proc, command, command.options) + if (done) { + const [code] = yield* Deferred.await(signal) + if (process.platform === "win32") return yield* Effect.void + if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup)) + return yield* Effect.void + } + return yield* kill((command, proc, signal) => + Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)), + ).pipe(Effect.andThen(Deferred.await(signal)), Effect.ignore) + }), + ) + + const fd = yield* setupFds(command, proc, extra) + const out = setupOutput(command, proc, sout, serr) + return makeHandle({ + pid: ProcessId(proc.pid!), + stdin: yield* setupStdin(command, proc, sin), + stdout: out.stdout, + stderr: out.stderr, + all: out.all, + getInputFd: fd.getInputFd, + getOutputFd: fd.getOutputFd, + isRunning: Effect.map(Deferred.isDone(signal), (done) => !done), + exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => { + if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code)) + return Effect.fail( + toPlatformError( + "exitCode", + new Error(`Process interrupted due to receipt of signal: '${signal}'`), + command, + ), + ) + }), + kill: (opts?: ChildProcess.KillOptions) => + timeout( + proc, + command, + opts, + )((command, proc, signal) => + Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)), + ).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid), + }) + } + case "PipedCommand": { + const flat = flatten(command) + const [head, ...tail] = flat.commands + let handle = spawnCommand(head) + for (let i = 0; i < tail.length; i++) { + const next = tail[i] + const opts = flat.opts[i] ?? {} + const sin = stdin(next.options) + const stream = Stream.unwrap(Effect.map(handle, (x) => source(x, opts.from))) + const to = opts.to ?? "stdin" + if (to === "stdin") { + handle = spawnCommand( + ChildProcess.make(next.command, next.args, { + ...next.options, + stdin: { ...sin, stream }, + }), + ) + continue + } + const fd = ChildProcess.parseFdName(to) + if (Predicate.isUndefined(fd)) { + handle = spawnCommand( + ChildProcess.make(next.command, next.args, { + ...next.options, + stdin: { ...sin, stream }, + }), + ) + continue + } + handle = spawnCommand( + ChildProcess.make(next.command, next.args, { + ...next.options, + additionalFds: { + ...next.options.additionalFds, + [ChildProcess.fdName(fd) as `fd${number}`]: { type: "input", stream }, + }, + }), + ) + } + return yield* handle + } + } + }, + ) + + return makeSpawner(spawnCommand) +}) + +export const layer: Layer.Layer = Layer.effect( + ChildProcessSpawner, + make, +) diff --git a/packages/opencode/src/installation/index.ts b/packages/opencode/src/installation/index.ts index 3551c861e4..912951a0ba 100644 --- a/packages/opencode/src/installation/index.ts +++ b/packages/opencode/src/installation/index.ts @@ -1,6 +1,7 @@ -import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { NodeFileSystem, NodePath } from "@effect/platform-node" import { Effect, Layer, Schema, ServiceMap, Stream } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" +import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import { makeRunPromise } from "@/effect/run-service" import { withTransientReadRetry } from "@/util/effect-http-client" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" @@ -340,7 +341,7 @@ export namespace Installation { export const defaultLayer = layer.pipe( Layer.provide(FetchHttpClient.layer), - Layer.provide(NodeChildProcessSpawner.layer), + Layer.provide(CrossSpawnSpawner.layer), Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer), ) diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts index 5f8c5aeffd..7068545d26 100644 --- a/packages/opencode/src/snapshot/index.ts +++ b/packages/opencode/src/snapshot/index.ts @@ -1,8 +1,9 @@ -import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { NodeFileSystem, NodePath } from "@effect/platform-node" import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import path from "path" import z from "zod" +import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import { InstanceState } from "@/effect/instance-state" import { makeRunPromise } from "@/effect/run-service" import { AppFileSystem } from "@/filesystem" @@ -354,9 +355,9 @@ export namespace Snapshot { ) export const defaultLayer = layer.pipe( - Layer.provide(NodeChildProcessSpawner.layer), + Layer.provide(CrossSpawnSpawner.layer), Layer.provide(AppFileSystem.defaultLayer), - Layer.provide(NodeFileSystem.layer), // needed by NodeChildProcessSpawner + Layer.provide(NodeFileSystem.layer), // needed by CrossSpawnSpawner Layer.provide(NodePath.layer), ) diff --git a/packages/opencode/test/effect/cross-spawn-spawner.test.ts b/packages/opencode/test/effect/cross-spawn-spawner.test.ts new file mode 100644 index 0000000000..7fdcb61cd4 --- /dev/null +++ b/packages/opencode/test/effect/cross-spawn-spawner.test.ts @@ -0,0 +1,518 @@ +import { NodeFileSystem, NodePath } from "@effect/platform-node" +import { describe, expect } from "bun:test" +import fs from "node:fs/promises" +import path from "node:path" +import { Effect, Exit, Layer, Stream } from "effect" +import type * as PlatformError from "effect/PlatformError" +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" +import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" +import { tmpdir } from "../fixture/fixture" +import { testEffect } from "../lib/effect" + +const live = CrossSpawnSpawner.layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer)) +const fx = testEffect(live) + +function js(code: string, opts?: ChildProcess.CommandOptions) { + return ChildProcess.make("node", ["-e", code], opts) +} + +function decodeByteStream(stream: Stream.Stream) { + return Stream.runCollect(stream).pipe( + Effect.map((chunks) => { + const total = chunks.reduce((acc, x) => acc + x.length, 0) + const out = new Uint8Array(total) + let off = 0 + for (const chunk of chunks) { + out.set(chunk, off) + off += chunk.length + } + return new TextDecoder("utf-8").decode(out).trim() + }), + ) +} + +function alive(pid: number) { + try { + process.kill(pid, 0) + return true + } catch { + return false + } +} + +async function gone(pid: number, timeout = 5_000) { + const end = Date.now() + timeout + while (Date.now() < end) { + if (!alive(pid)) return true + await Bun.sleep(50) + } + return !alive(pid) +} + +describe("cross-spawn spawner", () => { + describe("basic spawning", () => { + fx.effect( + "captures stdout", + Effect.gen(function* () { + const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.string(ChildProcess.make(process.execPath, ["-e", 'process.stdout.write("ok")'])), + ) + expect(out).toBe("ok") + }), + ) + + fx.effect( + "captures multiple lines", + Effect.gen(function* () { + const handle = yield* js('console.log("line1"); console.log("line2"); console.log("line3")') + const out = yield* decodeByteStream(handle.stdout) + expect(out).toBe("line1\nline2\nline3") + }), + ) + + fx.effect( + "returns exit code", + Effect.gen(function* () { + const handle = yield* js("process.exit(0)") + const code = yield* handle.exitCode + expect(code).toBe(ChildProcessSpawner.ExitCode(0)) + }), + ) + + fx.effect( + "returns non-zero exit code", + Effect.gen(function* () { + const handle = yield* js("process.exit(42)") + const code = yield* handle.exitCode + expect(code).toBe(ChildProcessSpawner.ExitCode(42)) + }), + ) + }) + + describe("cwd option", () => { + fx.effect( + "uses cwd when spawning commands", + Effect.gen(function* () { + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.string( + ChildProcess.make(process.execPath, ["-e", "process.stdout.write(process.cwd())"], { cwd: tmp.path }), + ), + ) + expect(out).toBe(tmp.path) + }), + ) + + fx.effect( + "fails for invalid cwd", + Effect.gen(function* () { + const exit = yield* Effect.exit( + ChildProcess.make("echo", ["test"], { cwd: "/nonexistent/directory/path" }).asEffect(), + ) + expect(Exit.isFailure(exit)).toBe(true) + }), + ) + }) + + describe("env option", () => { + fx.effect( + "passes environment variables with extendEnv", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write(process.env.TEST_VAR ?? "")', { + env: { TEST_VAR: "test_value" }, + extendEnv: true, + }) + const out = yield* decodeByteStream(handle.stdout) + expect(out).toBe("test_value") + }), + ) + + fx.effect( + "passes multiple environment variables", + Effect.gen(function* () { + const handle = yield* js( + "process.stdout.write(`${process.env.VAR1}-${process.env.VAR2}-${process.env.VAR3}`)", + { + env: { VAR1: "one", VAR2: "two", VAR3: "three" }, + extendEnv: true, + }, + ) + const out = yield* decodeByteStream(handle.stdout) + expect(out).toBe("one-two-three") + }), + ) + }) + + describe("stderr", () => { + fx.effect( + "captures stderr output", + Effect.gen(function* () { + const handle = yield* js('process.stderr.write("error message")') + const err = yield* decodeByteStream(handle.stderr) + expect(err).toBe("error message") + }), + ) + + fx.effect( + "captures both stdout and stderr", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")') + const [stdout, stderr] = yield* Effect.all([decodeByteStream(handle.stdout), decodeByteStream(handle.stderr)]) + expect(stdout).toBe("stdout") + expect(stderr).toBe("stderr") + }), + ) + }) + + describe("combined output (all)", () => { + fx.effect( + "captures stdout via .all when no stderr", + Effect.gen(function* () { + const handle = yield* ChildProcess.make("echo", ["hello from stdout"]) + const all = yield* decodeByteStream(handle.all) + expect(all).toBe("hello from stdout") + }), + ) + + fx.effect( + "captures stderr via .all when no stdout", + Effect.gen(function* () { + const handle = yield* js('process.stderr.write("hello from stderr")') + const all = yield* decodeByteStream(handle.all) + expect(all).toBe("hello from stderr") + }), + ) + }) + + describe("stdin", () => { + fx.effect( + "allows providing standard input to a command", + Effect.gen(function* () { + const input = "a b c" + const stdin = Stream.make(Buffer.from(input, "utf-8")) + const handle = yield* js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + { stdin }, + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("a b c") + }), + ) + }) + + describe("process control", () => { + fx.effect( + "kills a running process", + Effect.gen(function* () { + const exit = yield* Effect.exit( + Effect.gen(function* () { + const handle = yield* js("setTimeout(() => {}, 10_000)") + yield* handle.kill() + return yield* handle.exitCode + }), + ) + expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true) + }), + ) + + fx.effect( + "kills a child when scope exits", + Effect.gen(function* () { + const pid = yield* Effect.scoped( + Effect.gen(function* () { + const handle = yield* js("setInterval(() => {}, 10_000)") + return Number(handle.pid) + }), + ) + const done = yield* Effect.promise(() => gone(pid)) + expect(done).toBe(true) + }), + ) + + fx.effect( + "forceKillAfter escalates for stubborn processes", + Effect.gen(function* () { + if (process.platform === "win32") return + + const started = Date.now() + const exit = yield* Effect.exit( + Effect.gen(function* () { + const handle = yield* js('process.on("SIGTERM", () => {}); setInterval(() => {}, 10_000)') + yield* handle.kill({ forceKillAfter: 100 }) + return yield* handle.exitCode + }), + ) + + expect(Date.now() - started).toBeLessThan(1_000) + expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true) + }), + ) + + fx.effect( + "isRunning reflects process state", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("done")') + yield* handle.exitCode + const running = yield* handle.isRunning + expect(running).toBe(false) + }), + ) + }) + + describe("error handling", () => { + fx.effect( + "fails for invalid command", + Effect.gen(function* () { + const exit = yield* Effect.exit( + Effect.gen(function* () { + const handle = yield* ChildProcess.make("nonexistent-command-12345") + return yield* handle.exitCode + }), + ) + expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true) + }), + ) + }) + + describe("pipeline", () => { + fx.effect( + "pipes stdout of one command to stdin of another", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("hello world")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))', + ), + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("HELLO WORLD") + }), + ) + + fx.effect( + "three-stage pipeline", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("hello world")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))', + ), + ), + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.replaceAll(" ", "-")))', + ), + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("HELLO-WORLD") + }), + ) + + fx.effect( + "pipes stderr with { from: 'stderr' }", + Effect.gen(function* () { + const handle = yield* js('process.stderr.write("error")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + ), + { from: "stderr" }, + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("error") + }), + ) + + fx.effect( + "pipes combined output with { from: 'all' }", + Effect.gen(function* () { + const handle = yield* js('process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")').pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + ), + { from: "all" }, + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toContain("stdout") + expect(out).toContain("stderr") + }), + ) + + fx.effect( + "pipes output fd3 with { from: 'fd3' }", + Effect.gen(function* () { + const handle = yield* js('require("node:fs").writeSync(3, "hello from fd3\\n")', { + additionalFds: { fd3: { type: "output" } }, + }).pipe( + ChildProcess.pipeTo( + js( + 'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))', + ), + { from: "fd3" }, + ), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("hello from fd3") + }), + ) + + fx.effect( + "pipes stdout to fd3", + Effect.gen(function* () { + if (process.platform === "win32") return + + const handle = yield* js('process.stdout.write("hello from stdout")').pipe( + ChildProcess.pipeTo(js('process.stdout.write(require("node:fs").readFileSync(3, "utf8"))'), { to: "fd3" }), + ) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("hello from stdout") + }), + ) + }) + + describe("additional fds", () => { + fx.effect( + "reads data from output fd3", + Effect.gen(function* () { + const handle = yield* js('require("node:fs").writeSync(3, "hello from fd3\\n")', { + additionalFds: { fd3: { type: "output" } }, + }) + const out = yield* decodeByteStream(handle.getOutputFd(3)) + yield* handle.exitCode + expect(out).toBe("hello from fd3") + }), + ) + + fx.effect( + "writes data to input fd3", + Effect.gen(function* () { + if (process.platform === "win32") return + + const input = Stream.make(new TextEncoder().encode("data from parent")) + const handle = yield* js('process.stdout.write(require("node:fs").readFileSync(3, "utf8"))', { + additionalFds: { fd3: { type: "input", stream: input } }, + }) + const out = yield* decodeByteStream(handle.stdout) + yield* handle.exitCode + expect(out).toBe("data from parent") + }), + ) + + fx.effect( + "returns empty stream for unconfigured fd", + Effect.gen(function* () { + const handle = + process.platform === "win32" + ? yield* js('process.stdout.write("test")') + : yield* ChildProcess.make("echo", ["test"]) + const out = yield* decodeByteStream(handle.getOutputFd(3)) + yield* handle.exitCode + expect(out).toBe("") + }), + ) + + fx.effect( + "works alongside normal stdout and stderr", + Effect.gen(function* () { + const handle = yield* js( + 'require("node:fs").writeSync(3, "fd3\\n"); process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")', + { + additionalFds: { fd3: { type: "output" } }, + }, + ) + const stdout = yield* decodeByteStream(handle.stdout) + const stderr = yield* decodeByteStream(handle.stderr) + const fd3 = yield* decodeByteStream(handle.getOutputFd(3)) + yield* handle.exitCode + expect(stdout).toBe("stdout") + expect(stderr).toBe("stderr") + expect(fd3).toBe("fd3") + }), + ) + }) + + describe("large output", () => { + fx.effect( + "does not deadlock on large stdout", + Effect.gen(function* () { + const handle = yield* js("for (let i = 1; i <= 100000; i++) process.stdout.write(`${i}\\n`)") + const out = yield* handle.stdout.pipe( + Stream.decodeText(), + Stream.runFold( + () => "", + (acc, chunk) => acc + chunk, + ), + ) + yield* handle.exitCode + const lines = out.trim().split("\n") + expect(lines.length).toBe(100000) + expect(lines[0]).toBe("1") + expect(lines[99999]).toBe("100000") + }), + { timeout: 10_000 }, + ) + }) + + describe("Windows-specific", () => { + fx.effect( + "uses shell routing on Windows", + Effect.gen(function* () { + if (process.platform !== "win32") return + + const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.string( + ChildProcess.make("set", ["OPENCODE_TEST_SHELL"], { + shell: true, + extendEnv: true, + env: { OPENCODE_TEST_SHELL: "ok" }, + }), + ), + ) + expect(out).toContain("OPENCODE_TEST_SHELL=ok") + }), + ) + + fx.effect( + "runs cmd scripts with spaces on Windows without shell", + Effect.gen(function* () { + if (process.platform !== "win32") return + + const tmp = yield* Effect.acquireRelease( + Effect.promise(() => tmpdir()), + (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()), + ) + const dir = path.join(tmp.path, "with space") + const file = path.join(dir, "echo cmd.cmd") + + yield* Effect.promise(() => fs.mkdir(dir, { recursive: true })) + yield* Effect.promise(() => Bun.write(file, "@echo off\r\nif %~1==--stdio exit /b 0\r\nexit /b 7\r\n")) + + const code = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) => + svc.exitCode( + ChildProcess.make(file, ["--stdio"], { + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }), + ), + ) + expect(code).toBe(ChildProcessSpawner.ExitCode(0)) + }), + ) + }) +})