diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts index a78607cdfd..268b18f687 100644 --- a/packages/opencode/src/storage/storage.ts +++ b/packages/opencode/src/storage/storage.ts @@ -1,19 +1,17 @@ import { Log } from "../util/log" import path from "path" -import fs from "fs/promises" import { Global } from "../global" -import { Filesystem } from "../util/filesystem" -import { lazy } from "../util/lazy" -import { Lock } from "../util/lock" import { NamedError } from "@opencode-ai/util/error" import z from "zod" -import { Glob } from "../util/glob" import { git } from "@/util/git" +import { AppFileSystem } from "@/filesystem" +import { makeRuntime } from "@/effect/run-service" +import { Effect, Exit, Layer, Option, RcMap, Schema, ServiceMap, TxReentrantLock } from "effect" export namespace Storage { const log = Log.create({ service: "storage" }) - type Migration = (dir: string) => Promise + type Migration = (dir: string, fs: AppFileSystem.Interface) => Effect.Effect export const NotFoundError = NamedError.create( "NotFoundError", @@ -22,36 +20,101 @@ export namespace Storage { }), ) + export type Error = AppFileSystem.Error | InstanceType + + const RootFile = Schema.Struct({ + path: Schema.optional( + Schema.Struct({ + root: Schema.optional(Schema.String), + }), + ), + }) + + const SessionFile = Schema.Struct({ + id: Schema.String, + }) + + const MessageFile = Schema.Struct({ + id: Schema.String, + }) + + const DiffFile = Schema.Struct({ + additions: Schema.Number, + deletions: Schema.Number, + }) + + const SummaryFile = Schema.Struct({ + id: Schema.String, + projectID: Schema.String, + summary: Schema.Struct({ diffs: Schema.Array(DiffFile) }), + }) + + const decodeRoot = Schema.decodeUnknownOption(RootFile) + const decodeSession = Schema.decodeUnknownOption(SessionFile) + const decodeMessage = Schema.decodeUnknownOption(MessageFile) + const decodeSummary = Schema.decodeUnknownOption(SummaryFile) + + export interface Interface { + readonly remove: (key: string[]) => Effect.Effect + readonly read: (key: string[]) => Effect.Effect + readonly update: (key: string[], fn: (draft: T) => void) => Effect.Effect + readonly write: (key: string[], content: T) => Effect.Effect + readonly list: (prefix: string[]) => Effect.Effect + } + + export class Service extends ServiceMap.Service()("@opencode/Storage") {} + + function file(dir: string, key: string[]) { + return path.join(dir, ...key) + ".json" + } + + function missing(err: unknown) { + if (!err || typeof err !== "object") return false + if ("code" in err && err.code === "ENOENT") return true + if ("reason" in err && err.reason && typeof err.reason === "object" && "_tag" in err.reason) { + return err.reason._tag === "NotFound" + } + return false + } + + function parseMigration(text: string) { + const value = Number.parseInt(text, 10) + return Number.isNaN(value) ? 0 : value + } + const MIGRATIONS: Migration[] = [ - async (dir) => { + Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface) { const project = path.resolve(dir, "../project") - if (!(await Filesystem.isDir(project))) return - const projectDirs = await Glob.scan("*", { + if (!(yield* fs.isDir(project))) return + const projectDirs = yield* fs.glob("*", { cwd: project, include: "all", }) for (const projectDir of projectDirs) { - const fullPath = path.join(project, projectDir) - if (!(await Filesystem.isDir(fullPath))) continue + const full = path.join(project, projectDir) + if (!(yield* fs.isDir(full))) continue log.info(`migrating project ${projectDir}`) let projectID = projectDir - const fullProjectDir = path.join(project, projectDir) let worktree = "/" if (projectID !== "global") { - for (const msgFile of await Glob.scan("storage/session/message/*/*.json", { - cwd: path.join(project, projectDir), + for (const msgFile of yield* fs.glob("storage/session/message/*/*.json", { + cwd: full, absolute: true, })) { - const json = await Filesystem.readJson(msgFile) - worktree = json.path?.root - if (worktree) break + const json = decodeRoot(yield* fs.readJson(msgFile), { onExcessProperty: "preserve" }) + const root = Option.isSome(json) ? json.value.path?.root : undefined + if (!root) continue + worktree = root + break } if (!worktree) continue - if (!(await Filesystem.isDir(worktree))) continue - const result = await git(["rev-list", "--max-parents=0", "--all"], { - cwd: worktree, - }) + if (!(yield* fs.isDir(worktree))) continue + const result = yield* Effect.promise(() => + git(["rev-list", "--max-parents=0", "--all"], { + cwd: worktree, + }), + ) const [id] = result .text() .split("\n") @@ -61,157 +124,230 @@ export namespace Storage { if (!id) continue projectID = id - await Filesystem.writeJson(path.join(dir, "project", projectID + ".json"), { - id, - vcs: "git", - worktree, - time: { - created: Date.now(), - initialized: Date.now(), - }, - }) + yield* fs.writeWithDirs( + path.join(dir, "project", projectID + ".json"), + JSON.stringify( + { + id, + vcs: "git", + worktree, + time: { + created: Date.now(), + initialized: Date.now(), + }, + }, + null, + 2, + ), + ) log.info(`migrating sessions for project ${projectID}`) - for (const sessionFile of await Glob.scan("storage/session/info/*.json", { - cwd: fullProjectDir, + for (const sessionFile of yield* fs.glob("storage/session/info/*.json", { + cwd: full, absolute: true, })) { const dest = path.join(dir, "session", projectID, path.basename(sessionFile)) - log.info("copying", { - sessionFile, - dest, - }) - const session = await Filesystem.readJson(sessionFile) - await Filesystem.writeJson(dest, session) - log.info(`migrating messages for session ${session.id}`) - for (const msgFile of await Glob.scan(`storage/session/message/${session.id}/*.json`, { - cwd: fullProjectDir, + log.info("copying", { sessionFile, dest }) + const session = yield* fs.readJson(sessionFile) + const info = decodeSession(session, { onExcessProperty: "preserve" }) + yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2)) + if (Option.isNone(info)) continue + log.info(`migrating messages for session ${info.value.id}`) + for (const msgFile of yield* fs.glob(`storage/session/message/${info.value.id}/*.json`, { + cwd: full, absolute: true, })) { - const dest = path.join(dir, "message", session.id, path.basename(msgFile)) + const next = path.join(dir, "message", info.value.id, path.basename(msgFile)) log.info("copying", { msgFile, - dest, + dest: next, }) - const message = await Filesystem.readJson(msgFile) - await Filesystem.writeJson(dest, message) + const message = yield* fs.readJson(msgFile) + const item = decodeMessage(message, { onExcessProperty: "preserve" }) + yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2)) + if (Option.isNone(item)) continue - log.info(`migrating parts for message ${message.id}`) - for (const partFile of await Glob.scan(`storage/session/part/${session.id}/${message.id}/*.json`, { - cwd: fullProjectDir, + log.info(`migrating parts for message ${item.value.id}`) + for (const partFile of yield* fs.glob(`storage/session/part/${info.value.id}/${item.value.id}/*.json`, { + cwd: full, absolute: true, })) { - const dest = path.join(dir, "part", message.id, path.basename(partFile)) - const part = await Filesystem.readJson(partFile) + const out = path.join(dir, "part", item.value.id, path.basename(partFile)) + const part = yield* fs.readJson(partFile) log.info("copying", { partFile, - dest, + dest: out, }) - await Filesystem.writeJson(dest, part) + yield* fs.writeWithDirs(out, JSON.stringify(part, null, 2)) } } } } } - }, - async (dir) => { - for (const item of await Glob.scan("session/*/*.json", { + }), + Effect.fn("Storage.migration.2")(function* (dir: string, fs: AppFileSystem.Interface) { + for (const item of yield* fs.glob("session/*/*.json", { cwd: dir, absolute: true, })) { - const session = await Filesystem.readJson(item) - if (!session.projectID) continue - if (!session.summary?.diffs) continue - const { diffs } = session.summary - await Filesystem.write(path.join(dir, "session_diff", session.id + ".json"), JSON.stringify(diffs)) - await Filesystem.writeJson(path.join(dir, "session", session.projectID, session.id + ".json"), { - ...session, - summary: { - additions: diffs.reduce((sum: any, x: any) => sum + x.additions, 0), - deletions: diffs.reduce((sum: any, x: any) => sum + x.deletions, 0), - }, - }) + const raw = yield* fs.readJson(item) + const session = decodeSummary(raw, { onExcessProperty: "preserve" }) + if (Option.isNone(session)) continue + const diffs = session.value.summary.diffs + yield* fs.writeWithDirs( + path.join(dir, "session_diff", session.value.id + ".json"), + JSON.stringify(diffs, null, 2), + ) + yield* fs.writeWithDirs( + path.join(dir, "session", session.value.projectID, session.value.id + ".json"), + JSON.stringify( + { + ...(raw as Record), + summary: { + additions: diffs.reduce((sum, x) => sum + x.additions, 0), + deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), + }, + }, + null, + 2, + ), + ) } - }, + }), ] - const state = lazy(async () => { - const dir = path.join(Global.Path.data, "storage") - const migration = await Filesystem.readJson(path.join(dir, "migration")) - .then((x) => parseInt(x)) - .catch(() => 0) - for (let index = migration; index < MIGRATIONS.length; index++) { - log.info("running migration", { index }) - const migration = MIGRATIONS[index] - await migration(dir).catch(() => log.error("failed to run migration", { index })) - await Filesystem.write(path.join(dir, "migration"), (index + 1).toString()) - } - return { - dir, - } - }) + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const locks = yield* RcMap.make({ + lookup: () => TxReentrantLock.make(), + idleTimeToLive: 0, + }) + const state = yield* Effect.cached( + Effect.gen(function* () { + const dir = path.join(Global.Path.data, "storage") + const marker = path.join(dir, "migration") + const migration = yield* fs.readFileString(marker).pipe( + Effect.map(parseMigration), + Effect.catchIf(missing, () => Effect.succeed(0)), + Effect.orElseSucceed(() => 0), + ) + for (let i = migration; i < MIGRATIONS.length; i++) { + log.info("running migration", { index: i }) + const step = MIGRATIONS[i]! + const exit = yield* Effect.exit(step(dir, fs)) + if (Exit.isFailure(exit)) { + log.error("failed to run migration", { index: i, cause: exit.cause }) + break + } + yield* fs.writeWithDirs(marker, String(i + 1)) + } + return { dir } + }), + ) + + const fail = (target: string): Effect.Effect> => + Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` })) + + const wrap = (target: string, body: Effect.Effect) => + body.pipe(Effect.catchIf(missing, () => fail(target))) + + const writeJson = Effect.fnUntraced(function* (target: string, content: unknown) { + yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2)) + }) + + const withResolved = ( + key: string[], + fn: (target: string, rw: TxReentrantLock.TxReentrantLock) => Effect.Effect, + ): Effect.Effect => + Effect.scoped( + Effect.gen(function* () { + const target = file((yield* state).dir, key) + return yield* fn(target, yield* RcMap.get(locks, target)) + }), + ) + + const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) { + yield* withResolved(key, (target, rw) => + TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))), + ) + }) + + const read: Interface["read"] = (key: string[]) => + Effect.gen(function* () { + const value = yield* withResolved(key, (target, rw) => + TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))), + ) + return value as T + }) + + const update: Interface["update"] = (key: string[], fn: (draft: T) => void) => + Effect.gen(function* () { + const value = yield* withResolved(key, (target, rw) => + TxReentrantLock.withWriteLock( + rw, + Effect.gen(function* () { + const content = yield* wrap(target, fs.readJson(target)) + fn(content as T) + yield* writeJson(target, content) + return content + }), + ), + ) + return value as T + }) + + const write: Interface["write"] = (key: string[], content: unknown) => + Effect.gen(function* () { + yield* withResolved(key, (target, rw) => TxReentrantLock.withWriteLock(rw, writeJson(target, content))) + }) + + const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) { + const dir = (yield* state).dir + const cwd = path.join(dir, ...prefix) + const result = yield* fs + .glob("**/*", { + cwd, + include: "file", + }) + .pipe(Effect.catch(() => Effect.succeed([]))) + return result + .map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]) + .toSorted((a, b) => a.join("/").localeCompare(b.join("/"))) + }) + + return Service.of({ + remove, + read, + update, + write, + list, + }) + }), + ) + + export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer)) + + const { runPromise } = makeRuntime(Service, defaultLayer) export async function remove(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - await fs.unlink(target).catch(() => {}) - }) + return runPromise((svc) => svc.remove(key)) } export async function read(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.read(target) - const result = await Filesystem.readJson(target) - return result as T - }) + return runPromise((svc) => svc.read(key)) } export async function update(key: string[], fn: (draft: T) => void) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.write(target) - const content = await Filesystem.readJson(target) - fn(content as T) - await Filesystem.writeJson(target, content) - return content - }) + return runPromise((svc) => svc.update(key, fn)) } export async function write(key: string[], content: T) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - return withErrorHandling(async () => { - using _ = await Lock.write(target) - await Filesystem.writeJson(target, content) - }) - } - - async function withErrorHandling(body: () => Promise) { - return body().catch((e) => { - if (!(e instanceof Error)) throw e - const errnoException = e as NodeJS.ErrnoException - if (errnoException.code === "ENOENT") { - throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` }) - } - throw e - }) + return runPromise((svc) => svc.write(key, content)) } export async function list(prefix: string[]) { - const dir = await state().then((x) => x.dir) - try { - const result = await Glob.scan("**/*", { - cwd: path.join(dir, ...prefix), - include: "file", - }).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])) - result.sort() - return result - } catch { - return [] - } + return runPromise((svc) => svc.list(prefix)) } } diff --git a/packages/opencode/test/server/session-messages.test.ts b/packages/opencode/test/server/session-messages.test.ts index d7e44cbecc..89e6fba5c5 100644 --- a/packages/opencode/test/server/session-messages.test.ts +++ b/packages/opencode/test/server/session-messages.test.ts @@ -13,6 +13,18 @@ afterEach(async () => { await Instance.disposeAll() }) +async function withoutWatcher(fn: () => Promise) { + if (process.platform !== "win32") return fn() + const prev = process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER + process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = "true" + try { + return await fn() + } finally { + if (prev === undefined) delete process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER + else process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = prev + } +} + async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) { const ids = [] as MessageID[] for (let i = 0; i < count; i++) { @@ -42,86 +54,94 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D describe("session messages endpoint", () => { test("returns cursor headers for older pages", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - const ids = await fill(session.id, 5) - const app = Server.Default() + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 5) + const app = Server.Default() - const a = await app.request(`/session/${session.id}/message?limit=2`) - expect(a.status).toBe(200) - const aBody = (await a.json()) as MessageV2.WithParts[] - expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2)) - const cursor = a.headers.get("x-next-cursor") - expect(cursor).toBeTruthy() - expect(a.headers.get("link")).toContain('rel="next"') + const a = await app.request(`/session/${session.id}/message?limit=2`) + expect(a.status).toBe(200) + const aBody = (await a.json()) as MessageV2.WithParts[] + expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2)) + const cursor = a.headers.get("x-next-cursor") + expect(cursor).toBeTruthy() + expect(a.headers.get("link")).toContain('rel="next"') - const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`) - expect(b.status).toBe(200) - const bBody = (await b.json()) as MessageV2.WithParts[] - expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) + const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`) + expect(b.status).toBe(200) + const bBody = (await b.json()) as MessageV2.WithParts[] + expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) - await Session.remove(session.id) - }, - }) + await Session.remove(session.id) + }, + }), + ) }) test("keeps full-history responses when limit is omitted", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - const ids = await fill(session.id, 3) - const app = Server.Default() + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 3) + const app = Server.Default() - const res = await app.request(`/session/${session.id}/message`) - expect(res.status).toBe(200) - const body = (await res.json()) as MessageV2.WithParts[] - expect(body.map((item) => item.info.id)).toEqual(ids) + const res = await app.request(`/session/${session.id}/message`) + expect(res.status).toBe(200) + const body = (await res.json()) as MessageV2.WithParts[] + expect(body.map((item) => item.info.id)).toEqual(ids) - await Session.remove(session.id) - }, - }) + await Session.remove(session.id) + }, + }), + ) }) test("rejects invalid cursors and missing sessions", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - const app = Server.Default() + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + const app = Server.Default() - const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`) - expect(bad.status).toBe(400) + const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`) + expect(bad.status).toBe(400) - const miss = await app.request(`/session/ses_missing/message?limit=2`) - expect(miss.status).toBe(404) + const miss = await app.request(`/session/ses_missing/message?limit=2`) + expect(miss.status).toBe(404) - await Session.remove(session.id) - }, - }) + await Session.remove(session.id) + }, + }), + ) }) test("does not truncate large legacy limit requests", async () => { await using tmp = await tmpdir({ git: true }) - await Instance.provide({ - directory: tmp.path, - fn: async () => { - const session = await Session.create({}) - await fill(session.id, 520) - const app = Server.Default() + await withoutWatcher(() => + Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 520) + const app = Server.Default() - const res = await app.request(`/session/${session.id}/message?limit=510`) - expect(res.status).toBe(200) - const body = (await res.json()) as MessageV2.WithParts[] - expect(body).toHaveLength(510) + const res = await app.request(`/session/${session.id}/message?limit=510`) + expect(res.status).toBe(200) + const body = (await res.json()) as MessageV2.WithParts[] + expect(body).toHaveLength(510) - await Session.remove(session.id) - }, - }) + await Session.remove(session.id) + }, + }), + ) }) }) diff --git a/packages/opencode/test/storage/storage.test.ts b/packages/opencode/test/storage/storage.test.ts new file mode 100644 index 0000000000..e5a04c082d --- /dev/null +++ b/packages/opencode/test/storage/storage.test.ts @@ -0,0 +1,295 @@ +import { describe, expect, test } from "bun:test" +import fs from "fs/promises" +import path from "path" +import { Effect, Layer, ManagedRuntime } from "effect" +import { AppFileSystem } from "../../src/filesystem" +import { Global } from "../../src/global" +import { Storage } from "../../src/storage/storage" +import { tmpdir } from "../fixture/fixture" + +const dir = path.join(Global.Path.data, "storage") + +async function withScope(fn: (root: string[]) => Promise) { + const root = ["storage_test", crypto.randomUUID()] + try { + return await fn(root) + } finally { + await fs.rm(path.join(dir, ...root), { recursive: true, force: true }) + } +} + +function map(root: string, file: string) { + if (file === Global.Path.data) return root + if (file.startsWith(Global.Path.data + path.sep)) return path.join(root, path.relative(Global.Path.data, file)) + return file +} + +function layer(root: string) { + return Layer.effect( + AppFileSystem.Service, + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + return AppFileSystem.Service.of({ + ...fs, + isDir: (file) => fs.isDir(map(root, file)), + readJson: (file) => fs.readJson(map(root, file)), + writeWithDirs: (file, content, mode) => fs.writeWithDirs(map(root, file), content, mode), + readFileString: (file) => fs.readFileString(map(root, file)), + remove: (file) => fs.remove(map(root, file)), + glob: (pattern, options) => + fs.glob(pattern, options?.cwd ? { ...options, cwd: map(root, options.cwd) } : options), + }) + }), + ).pipe(Layer.provide(AppFileSystem.defaultLayer)) +} + +async function withStorage( + root: string, + fn: (run: (body: Effect.Effect) => Promise) => Promise, +) { + const rt = ManagedRuntime.make(Storage.layer.pipe(Layer.provide(layer(root)))) + try { + return await fn((body) => rt.runPromise(body)) + } finally { + await rt.dispose() + } +} + +async function write(file: string, value: unknown) { + await fs.mkdir(path.dirname(file), { recursive: true }) + await Bun.write(file, JSON.stringify(value, null, 2)) +} + +async function text(file: string, value: string) { + await fs.mkdir(path.dirname(file), { recursive: true }) + await Bun.write(file, value) +} + +async function exists(file: string) { + return fs + .stat(file) + .then(() => true) + .catch(() => false) +} + +describe("Storage", () => { + test("round-trips JSON content", async () => { + await withScope(async (root) => { + const key = [...root, "session_diff", "roundtrip"] + const value = [{ file: "a.ts", additions: 2, deletions: 1 }] + + await Storage.write(key, value) + + expect(await Storage.read(key)).toEqual(value) + }) + }) + + test("maps missing reads to NotFoundError", async () => { + await withScope(async (root) => { + await expect(Storage.read([...root, "missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" }) + }) + }) + + test("update on missing key throws NotFoundError", async () => { + await withScope(async (root) => { + await expect( + Storage.update<{ value: number }>([...root, "missing", "key"], (draft) => { + draft.value += 1 + }), + ).rejects.toMatchObject({ name: "NotFoundError" }) + }) + }) + + test("write overwrites existing value", async () => { + await withScope(async (root) => { + const key = [...root, "overwrite", "test"] + await Storage.write<{ v: number }>(key, { v: 1 }) + await Storage.write<{ v: number }>(key, { v: 2 }) + + expect(await Storage.read<{ v: number }>(key)).toEqual({ v: 2 }) + }) + }) + + test("remove on missing key is a no-op", async () => { + await withScope(async (root) => { + await expect(Storage.remove([...root, "nonexistent", "key"])).resolves.toBeUndefined() + }) + }) + + test("list on missing prefix returns empty", async () => { + await withScope(async (root) => { + expect(await Storage.list([...root, "nonexistent"])).toEqual([]) + }) + }) + + test("serializes concurrent updates for the same key", async () => { + await withScope(async (root) => { + const key = [...root, "counter", "shared"] + await Storage.write(key, { value: 0 }) + + await Promise.all( + Array.from({ length: 25 }, () => + Storage.update<{ value: number }>(key, (draft) => { + draft.value += 1 + }), + ), + ) + + expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 }) + }) + }) + + test("concurrent reads do not block each other", async () => { + await withScope(async (root) => { + const key = [...root, "concurrent", "reads"] + await Storage.write(key, { ok: true }) + + const results = await Promise.all(Array.from({ length: 10 }, () => Storage.read(key))) + + expect(results).toHaveLength(10) + for (const r of results) expect(r).toEqual({ ok: true }) + }) + }) + + test("nested keys create deep paths", async () => { + await withScope(async (root) => { + const key = [...root, "a", "b", "c", "deep"] + await Storage.write<{ nested: boolean }>(key, { nested: true }) + + expect(await Storage.read<{ nested: boolean }>(key)).toEqual({ nested: true }) + expect(await Storage.list([...root, "a"])).toEqual([key]) + }) + }) + + test("lists and removes stored entries", async () => { + await withScope(async (root) => { + const a = [...root, "list", "a"] + const b = [...root, "list", "b"] + const prefix = [...root, "list"] + + await Storage.write(b, { value: 2 }) + await Storage.write(a, { value: 1 }) + + expect(await Storage.list(prefix)).toEqual([a, b]) + + await Storage.remove(a) + + expect(await Storage.list(prefix)).toEqual([b]) + await expect(Storage.read(a)).rejects.toMatchObject({ name: "NotFoundError" }) + }) + }) + + test("migration 2 runs when marker contents are invalid", async () => { + await using tmp = await tmpdir() + const storage = path.join(tmp.path, "storage") + const diffs = [ + { additions: 2, deletions: 1 }, + { additions: 3, deletions: 4 }, + ] + + await text(path.join(storage, "migration"), "wat") + await write(path.join(storage, "session", "proj_test", "ses_test.json"), { + id: "ses_test", + projectID: "proj_test", + title: "legacy", + summary: { diffs }, + }) + + await withStorage(tmp.path, async (run) => { + expect(await run(Storage.Service.use((svc) => svc.list(["session_diff"])))).toEqual([ + ["session_diff", "ses_test"], + ]) + expect(await run(Storage.Service.use((svc) => svc.read(["session_diff", "ses_test"])))).toEqual( + diffs, + ) + expect( + await run( + Storage.Service.use((svc) => + svc.read<{ + id: string + projectID: string + title: string + summary: { + additions: number + deletions: number + } + }>(["session", "proj_test", "ses_test"]), + ), + ), + ).toEqual({ + id: "ses_test", + projectID: "proj_test", + title: "legacy", + summary: { + additions: 5, + deletions: 5, + }, + }) + }) + + expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2") + }) + + test("migration 1 tolerates malformed legacy records", async () => { + await using tmp = await tmpdir({ git: true }) + const storage = path.join(tmp.path, "storage") + const legacy = path.join(tmp.path, "project", "legacy") + + await write(path.join(legacy, "storage", "session", "message", "probe", "0.json"), []) + await write(path.join(legacy, "storage", "session", "message", "probe", "1.json"), { + path: { root: tmp.path }, + }) + await write(path.join(legacy, "storage", "session", "info", "ses_legacy.json"), { + id: "ses_legacy", + title: "legacy", + }) + await write(path.join(legacy, "storage", "session", "message", "ses_legacy", "msg_legacy.json"), { + role: "user", + text: "hello", + }) + + await withStorage(tmp.path, async (run) => { + const projects = await run(Storage.Service.use((svc) => svc.list(["project"]))) + expect(projects).toHaveLength(1) + const project = projects[0]![1] + + expect(await run(Storage.Service.use((svc) => svc.list(["session", project])))).toEqual([ + ["session", project, "ses_legacy"], + ]) + expect( + await run( + Storage.Service.use((svc) => svc.read<{ id: string; title: string }>(["session", project, "ses_legacy"])), + ), + ).toEqual({ + id: "ses_legacy", + title: "legacy", + }) + expect( + await run( + Storage.Service.use((svc) => + svc.read<{ role: string; text: string }>(["message", "ses_legacy", "msg_legacy"]), + ), + ), + ).toEqual({ + role: "user", + text: "hello", + }) + }) + + expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2") + }) + + test("failed migrations do not advance the marker", async () => { + await using tmp = await tmpdir() + const storage = path.join(tmp.path, "storage") + const legacy = path.join(tmp.path, "project", "legacy") + + await text(path.join(legacy, "storage", "session", "message", "probe", "0.json"), "{") + + await withStorage(tmp.path, async (run) => { + expect(await run(Storage.Service.use((svc) => svc.list(["project"])))).toEqual([]) + }) + + expect(await exists(path.join(storage, "migration"))).toBe(false) + }) +})