From 8864fdce2f21261909f169a369fa0c3ba44ca85c Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 25 Mar 2026 19:49:14 -0400 Subject: [PATCH] fix: Windows e2e stability (CrossSpawnSpawner, snapshot isolation, session race guards) (#19163) --- packages/app/playwright.config.ts | 3 +- packages/opencode/src/git/index.ts | 5 +- packages/opencode/src/session/compaction.ts | 7 +- packages/opencode/src/session/projectors.ts | 57 ++-- packages/opencode/src/snapshot/index.ts | 346 +++++++++++--------- 5 files changed, 246 insertions(+), 172 deletions(-) diff --git a/packages/app/playwright.config.ts b/packages/app/playwright.config.ts index 2667b89a1c..d70eb9a8eb 100644 --- a/packages/app/playwright.config.ts +++ b/packages/app/playwright.config.ts @@ -6,7 +6,8 @@ const serverHost = process.env.PLAYWRIGHT_SERVER_HOST ?? "127.0.0.1" const serverPort = process.env.PLAYWRIGHT_SERVER_PORT ?? "4096" const command = `bun run dev -- --host 0.0.0.0 --port ${port}` const reuse = !process.env.CI -const workers = Number(process.env.PLAYWRIGHT_WORKERS ?? (process.env.CI ? 5 : 0)) || undefined +const workers = + Number(process.env.PLAYWRIGHT_WORKERS ?? (process.env.CI ? (process.platform === "win32" ? 2 : 5) : 0)) || undefined export default defineConfig({ testDir: "./e2e", diff --git a/packages/opencode/src/git/index.ts b/packages/opencode/src/git/index.ts index 3a0af7b0d9..1442b8cb65 100644 --- a/packages/opencode/src/git/index.ts +++ b/packages/opencode/src/git/index.ts @@ -1,4 +1,5 @@ -import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node" +import { NodeFileSystem, NodePath } from "@effect/platform-node" +import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import { Effect, Layer, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import { makeRunPromise } from "@/effect/run-service" @@ -258,7 +259,7 @@ export namespace Git { ) export const defaultLayer = layer.pipe( - Layer.provide(NodeChildProcessSpawner.layer), + Layer.provide(CrossSpawnSpawner.layer), Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer), ) diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 072ea1d574..f6145b7a47 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -13,6 +13,7 @@ import { fn } from "@/util/fn" import { Agent } from "@/agent/agent" import { Plugin } from "@/plugin" import { Config } from "@/config/config" +import { NotFoundError } from "@/storage/db" import { ProviderTransform } from "@/provider/transform" import { ModelID, ProviderID } from "@/provider/schema" @@ -60,7 +61,11 @@ export namespace SessionCompaction { const config = await Config.get() if (config.compaction?.prune === false) return log.info("pruning") - const msgs = await Session.messages({ sessionID: input.sessionID }) + const msgs = await Session.messages({ sessionID: input.sessionID }).catch((err) => { + if (NotFoundError.isInstance(err)) return undefined + throw err + }) + if (!msgs) return let total = 0 let pruned = 0 const toPrune = [] diff --git a/packages/opencode/src/session/projectors.ts b/packages/opencode/src/session/projectors.ts index 61aa68d24a..81929cddca 100644 --- a/packages/opencode/src/session/projectors.ts +++ b/packages/opencode/src/session/projectors.ts @@ -4,6 +4,15 @@ import { Session } from "./index" import { MessageV2 } from "./message-v2" import { SessionTable, MessageTable, PartTable } from "./session.sql" import { ProjectTable } from "../project/project.sql" +import { Log } from "../util/log" + +const log = Log.create({ service: "session.projector" }) + +function foreign(err: unknown) { + if (typeof err !== "object" || err === null) return false + if ("code" in err && err.code === "SQLITE_CONSTRAINT_FOREIGNKEY") return true + return "message" in err && typeof err.message === "string" && err.message.includes("FOREIGN KEY constraint failed") +} export type DeepPartial = T extends object ? { [K in keyof T]?: DeepPartial | null } : T @@ -76,15 +85,20 @@ export default [ const time_created = data.info.time.created const { id, sessionID, ...rest } = data.info - db.insert(MessageTable) - .values({ - id, - session_id: sessionID, - time_created, - data: rest, - }) - .onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } }) - .run() + try { + db.insert(MessageTable) + .values({ + id, + session_id: sessionID, + time_created, + data: rest, + }) + .onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } }) + .run() + } catch (err) { + if (!foreign(err)) throw err + log.warn("ignored late message update", { messageID: id, sessionID }) + } }), SyncEvent.project(MessageV2.Event.Removed, (db, data) => { @@ -102,15 +116,20 @@ export default [ SyncEvent.project(MessageV2.Event.PartUpdated, (db, data) => { const { id, messageID, sessionID, ...rest } = data.part - db.insert(PartTable) - .values({ - id, - message_id: messageID, - session_id: sessionID, - time_created: data.time, - data: rest, - }) - .onConflictDoUpdate({ target: PartTable.id, set: { data: rest } }) - .run() + try { + db.insert(PartTable) + .values({ + id, + message_id: messageID, + session_id: sessionID, + time_created: data.time, + data: rest, + }) + .onConflictDoUpdate({ target: PartTable.id, set: { data: rest } }) + .run() + } catch (err) { + if (!foreign(err)) throw err + log.warn("ignored late part update", { partID: id, messageID, sessionID }) + } }), ] diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts index ec07173c83..98a9d322c5 100644 --- a/packages/opencode/src/snapshot/index.ts +++ b/packages/opencode/src/snapshot/index.ts @@ -1,5 +1,5 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node" -import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect" +import { Cause, Duration, Effect, Layer, Schedule, Semaphore, ServiceMap, Stream } from "effect" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import path from "path" import z from "zod" @@ -7,6 +7,7 @@ import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" import { InstanceState } from "@/effect/instance-state" import { makeRunPromise } from "@/effect/run-service" import { AppFileSystem } from "@/filesystem" +import { Hash } from "@/util/hash" import { Config } from "../config/config" import { Global } from "../global" import { Log } from "../util/log" @@ -38,7 +39,6 @@ export namespace Snapshot { const core = ["-c", "core.longpaths=true", "-c", "core.symlinks=true"] const cfg = ["-c", "core.autocrlf=false", ...core] const quote = [...cfg, "-c", "core.quotepath=false"] - interface GitResult { readonly code: ChildProcessSpawner.ExitCode readonly text: string @@ -66,12 +66,23 @@ export namespace Snapshot { Effect.gen(function* () { const fs = yield* AppFileSystem.Service const spawner = yield* ChildProcessSpawner.ChildProcessSpawner + const locks = new Map() + + const lock = (key: string) => { + const hit = locks.get(key) + if (hit) return hit + + const next = Semaphore.makeUnsafe(1) + locks.set(key, next) + return next + } + const state = yield* InstanceState.make( Effect.fn("Snapshot.state")(function* (ctx) { const state = { directory: ctx.directory, worktree: ctx.worktree, - gitdir: path.join(Global.Path.data, "snapshot", ctx.project.id), + gitdir: path.join(Global.Path.data, "snapshot", ctx.project.id, Hash.fast(ctx.worktree)), vcs: ctx.project.vcs, } @@ -108,6 +119,7 @@ export namespace Snapshot { const exists = (file: string) => fs.exists(file).pipe(Effect.orDie) const read = (file: string) => fs.readFileString(file).pipe(Effect.catch(() => Effect.succeed(""))) const remove = (file: string) => fs.remove(file).pipe(Effect.catch(() => Effect.void)) + const locked = (fx: Effect.Effect) => lock(state.gitdir).withPermits(1)(fx) const enabled = Effect.fnUntraced(function* () { if (state.vcs !== "git") return false @@ -190,175 +202,211 @@ export namespace Snapshot { }) const cleanup = Effect.fnUntraced(function* () { - if (!(yield* enabled())) return - if (!(yield* exists(state.gitdir))) return - const result = yield* git(args(["gc", `--prune=${prune}`]), { cwd: state.directory }) - if (result.code !== 0) { - log.warn("cleanup failed", { - exitCode: result.code, - stderr: result.stderr, - }) - return - } - log.info("cleanup", { prune }) + return yield* locked( + Effect.gen(function* () { + if (!(yield* enabled())) return + if (!(yield* exists(state.gitdir))) return + const result = yield* git(args(["gc", `--prune=${prune}`]), { cwd: state.directory }) + if (result.code !== 0) { + log.warn("cleanup failed", { + exitCode: result.code, + stderr: result.stderr, + }) + return + } + log.info("cleanup", { prune }) + }), + ) }) const track = Effect.fnUntraced(function* () { - if (!(yield* enabled())) return - const existed = yield* exists(state.gitdir) - yield* fs.ensureDir(state.gitdir).pipe(Effect.orDie) - if (!existed) { - yield* git(["init"], { - env: { GIT_DIR: state.gitdir, GIT_WORK_TREE: state.worktree }, - }) - yield* git(["--git-dir", state.gitdir, "config", "core.autocrlf", "false"]) - yield* git(["--git-dir", state.gitdir, "config", "core.longpaths", "true"]) - yield* git(["--git-dir", state.gitdir, "config", "core.symlinks", "true"]) - yield* git(["--git-dir", state.gitdir, "config", "core.fsmonitor", "false"]) - log.info("initialized") - } - yield* add() - const result = yield* git(args(["write-tree"]), { cwd: state.directory }) - const hash = result.text.trim() - log.info("tracking", { hash, cwd: state.directory, git: state.gitdir }) - return hash + return yield* locked( + Effect.gen(function* () { + if (!(yield* enabled())) return + const existed = yield* exists(state.gitdir) + yield* fs.ensureDir(state.gitdir).pipe(Effect.orDie) + if (!existed) { + yield* git(["init"], { + env: { GIT_DIR: state.gitdir, GIT_WORK_TREE: state.worktree }, + }) + yield* git(["--git-dir", state.gitdir, "config", "core.autocrlf", "false"]) + yield* git(["--git-dir", state.gitdir, "config", "core.longpaths", "true"]) + yield* git(["--git-dir", state.gitdir, "config", "core.symlinks", "true"]) + yield* git(["--git-dir", state.gitdir, "config", "core.fsmonitor", "false"]) + log.info("initialized") + } + yield* add() + const result = yield* git(args(["write-tree"]), { cwd: state.directory }) + const hash = result.text.trim() + log.info("tracking", { hash, cwd: state.directory, git: state.gitdir }) + return hash + }), + ) }) const patch = Effect.fnUntraced(function* (hash: string) { - yield* add() - const result = yield* git( - [...quote, ...args(["diff", "--cached", "--no-ext-diff", "--name-only", hash, "--", "."])], - { - cwd: state.directory, - }, + return yield* locked( + Effect.gen(function* () { + yield* add() + const result = yield* git( + [...quote, ...args(["diff", "--cached", "--no-ext-diff", "--name-only", hash, "--", "."])], + { + cwd: state.directory, + }, + ) + if (result.code !== 0) { + log.warn("failed to get diff", { hash, exitCode: result.code }) + return { hash, files: [] } + } + return { + hash, + files: result.text + .trim() + .split("\n") + .map((x) => x.trim()) + .filter(Boolean) + .map((x) => path.join(state.worktree, x).replaceAll("\\", "/")), + } + }), ) - if (result.code !== 0) { - log.warn("failed to get diff", { hash, exitCode: result.code }) - return { hash, files: [] } - } - return { - hash, - files: result.text - .trim() - .split("\n") - .map((x) => x.trim()) - .filter(Boolean) - .map((x) => path.join(state.worktree, x).replaceAll("\\", "/")), - } }) const restore = Effect.fnUntraced(function* (snapshot: string) { - log.info("restore", { commit: snapshot }) - const result = yield* git([...core, ...args(["read-tree", snapshot])], { cwd: state.worktree }) - if (result.code === 0) { - const checkout = yield* git([...core, ...args(["checkout-index", "-a", "-f"])], { cwd: state.worktree }) - if (checkout.code === 0) return - log.error("failed to restore snapshot", { - snapshot, - exitCode: checkout.code, - stderr: checkout.stderr, - }) - return - } - log.error("failed to restore snapshot", { - snapshot, - exitCode: result.code, - stderr: result.stderr, - }) + return yield* locked( + Effect.gen(function* () { + log.info("restore", { commit: snapshot }) + const result = yield* git([...core, ...args(["read-tree", snapshot])], { cwd: state.worktree }) + if (result.code === 0) { + const checkout = yield* git([...core, ...args(["checkout-index", "-a", "-f"])], { + cwd: state.worktree, + }) + if (checkout.code === 0) return + log.error("failed to restore snapshot", { + snapshot, + exitCode: checkout.code, + stderr: checkout.stderr, + }) + return + } + log.error("failed to restore snapshot", { + snapshot, + exitCode: result.code, + stderr: result.stderr, + }) + }), + ) }) const revert = Effect.fnUntraced(function* (patches: Snapshot.Patch[]) { - const seen = new Set() - for (const item of patches) { - for (const file of item.files) { - if (seen.has(file)) continue - seen.add(file) - log.info("reverting", { file, hash: item.hash }) - const result = yield* git([...core, ...args(["checkout", item.hash, "--", file])], { - cwd: state.worktree, - }) - if (result.code !== 0) { - const rel = path.relative(state.worktree, file) - const tree = yield* git([...core, ...args(["ls-tree", item.hash, "--", rel])], { - cwd: state.worktree, - }) - if (tree.code === 0 && tree.text.trim()) { - log.info("file existed in snapshot but checkout failed, keeping", { file }) - } else { - log.info("file did not exist in snapshot, deleting", { file }) - yield* remove(file) + return yield* locked( + Effect.gen(function* () { + const seen = new Set() + for (const item of patches) { + for (const file of item.files) { + if (seen.has(file)) continue + seen.add(file) + log.info("reverting", { file, hash: item.hash }) + const result = yield* git([...core, ...args(["checkout", item.hash, "--", file])], { + cwd: state.worktree, + }) + if (result.code !== 0) { + const rel = path.relative(state.worktree, file) + const tree = yield* git([...core, ...args(["ls-tree", item.hash, "--", rel])], { + cwd: state.worktree, + }) + if (tree.code === 0 && tree.text.trim()) { + log.info("file existed in snapshot but checkout failed, keeping", { file }) + } else { + log.info("file did not exist in snapshot, deleting", { file }) + yield* remove(file) + } + } } } - } - } + }), + ) }) const diff = Effect.fnUntraced(function* (hash: string) { - yield* add() - const result = yield* git([...quote, ...args(["diff", "--cached", "--no-ext-diff", hash, "--", "."])], { - cwd: state.worktree, - }) - if (result.code !== 0) { - log.warn("failed to get diff", { - hash, - exitCode: result.code, - stderr: result.stderr, - }) - return "" - } - return result.text.trim() + return yield* locked( + Effect.gen(function* () { + yield* add() + const result = yield* git( + [...quote, ...args(["diff", "--cached", "--no-ext-diff", hash, "--", "."])], + { + cwd: state.worktree, + }, + ) + if (result.code !== 0) { + log.warn("failed to get diff", { + hash, + exitCode: result.code, + stderr: result.stderr, + }) + return "" + } + return result.text.trim() + }), + ) }) const diffFull = Effect.fnUntraced(function* (from: string, to: string) { - const result: Snapshot.FileDiff[] = [] - const status = new Map() + return yield* locked( + Effect.gen(function* () { + const result: Snapshot.FileDiff[] = [] + const status = new Map() - const statuses = yield* git( - [...quote, ...args(["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."])], - { cwd: state.directory }, + const statuses = yield* git( + [ + ...quote, + ...args(["diff", "--no-ext-diff", "--name-status", "--no-renames", from, to, "--", "."]), + ], + { cwd: state.directory }, + ) + + for (const line of statuses.text.trim().split("\n")) { + if (!line) continue + const [code, file] = line.split("\t") + if (!code || !file) continue + status.set(file, code.startsWith("A") ? "added" : code.startsWith("D") ? "deleted" : "modified") + } + + const numstat = yield* git( + [...quote, ...args(["diff", "--no-ext-diff", "--no-renames", "--numstat", from, to, "--", "."])], + { + cwd: state.directory, + }, + ) + + for (const line of numstat.text.trim().split("\n")) { + if (!line) continue + const [adds, dels, file] = line.split("\t") + if (!file) continue + const binary = adds === "-" && dels === "-" + const [before, after] = binary + ? ["", ""] + : yield* Effect.all( + [ + git([...cfg, ...args(["show", `${from}:${file}`])]).pipe(Effect.map((item) => item.text)), + git([...cfg, ...args(["show", `${to}:${file}`])]).pipe(Effect.map((item) => item.text)), + ], + { concurrency: 2 }, + ) + const additions = binary ? 0 : parseInt(adds) + const deletions = binary ? 0 : parseInt(dels) + result.push({ + file, + before, + after, + additions: Number.isFinite(additions) ? additions : 0, + deletions: Number.isFinite(deletions) ? deletions : 0, + status: status.get(file) ?? "modified", + }) + } + + return result + }), ) - - for (const line of statuses.text.trim().split("\n")) { - if (!line) continue - const [code, file] = line.split("\t") - if (!code || !file) continue - status.set(file, code.startsWith("A") ? "added" : code.startsWith("D") ? "deleted" : "modified") - } - - const numstat = yield* git( - [...quote, ...args(["diff", "--no-ext-diff", "--no-renames", "--numstat", from, to, "--", "."])], - { - cwd: state.directory, - }, - ) - - for (const line of numstat.text.trim().split("\n")) { - if (!line) continue - const [adds, dels, file] = line.split("\t") - if (!file) continue - const binary = adds === "-" && dels === "-" - const [before, after] = binary - ? ["", ""] - : yield* Effect.all( - [ - git([...cfg, ...args(["show", `${from}:${file}`])]).pipe(Effect.map((item) => item.text)), - git([...cfg, ...args(["show", `${to}:${file}`])]).pipe(Effect.map((item) => item.text)), - ], - { concurrency: 2 }, - ) - const additions = binary ? 0 : parseInt(adds) - const deletions = binary ? 0 : parseInt(dels) - result.push({ - file, - before, - after, - additions: Number.isFinite(additions) ? additions : 0, - deletions: Number.isFinite(deletions) ? deletions : 0, - status: status.get(file) ?? "modified", - }) - } - - return result }) yield* cleanup().pipe(