refactor(storage): effectify Storage service (#20132)

pull/20163/head
Kit Langton 2026-03-30 21:16:02 -04:00 committed by GitHub
parent bf777298c8
commit a898c2ea3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 641 additions and 190 deletions

View File

@ -1,19 +1,17 @@
import { Log } from "../util/log" import { Log } from "../util/log"
import path from "path" import path from "path"
import fs from "fs/promises"
import { Global } from "../global" import { Global } from "../global"
import { Filesystem } from "../util/filesystem"
import { lazy } from "../util/lazy"
import { Lock } from "../util/lock"
import { NamedError } from "@opencode-ai/util/error" import { NamedError } from "@opencode-ai/util/error"
import z from "zod" import z from "zod"
import { Glob } from "../util/glob"
import { git } from "@/util/git" import { git } from "@/util/git"
import { AppFileSystem } from "@/filesystem"
import { makeRuntime } from "@/effect/run-service"
import { Effect, Exit, Layer, Option, RcMap, Schema, ServiceMap, TxReentrantLock } from "effect"
export namespace Storage { export namespace Storage {
const log = Log.create({ service: "storage" }) const log = Log.create({ service: "storage" })
type Migration = (dir: string) => Promise<void> type Migration = (dir: string, fs: AppFileSystem.Interface) => Effect.Effect<void, AppFileSystem.Error>
export const NotFoundError = NamedError.create( export const NotFoundError = NamedError.create(
"NotFoundError", "NotFoundError",
@ -22,36 +20,101 @@ export namespace Storage {
}), }),
) )
export type Error = AppFileSystem.Error | InstanceType<typeof NotFoundError>
const RootFile = Schema.Struct({
path: Schema.optional(
Schema.Struct({
root: Schema.optional(Schema.String),
}),
),
})
const SessionFile = Schema.Struct({
id: Schema.String,
})
const MessageFile = Schema.Struct({
id: Schema.String,
})
const DiffFile = Schema.Struct({
additions: Schema.Number,
deletions: Schema.Number,
})
const SummaryFile = Schema.Struct({
id: Schema.String,
projectID: Schema.String,
summary: Schema.Struct({ diffs: Schema.Array(DiffFile) }),
})
const decodeRoot = Schema.decodeUnknownOption(RootFile)
const decodeSession = Schema.decodeUnknownOption(SessionFile)
const decodeMessage = Schema.decodeUnknownOption(MessageFile)
const decodeSummary = Schema.decodeUnknownOption(SummaryFile)
export interface Interface {
readonly remove: (key: string[]) => Effect.Effect<void, AppFileSystem.Error>
readonly read: <T>(key: string[]) => Effect.Effect<T, Error>
readonly update: <T>(key: string[], fn: (draft: T) => void) => Effect.Effect<T, Error>
readonly write: <T>(key: string[], content: T) => Effect.Effect<void, AppFileSystem.Error>
readonly list: (prefix: string[]) => Effect.Effect<string[][], AppFileSystem.Error>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Storage") {}
function file(dir: string, key: string[]) {
return path.join(dir, ...key) + ".json"
}
function missing(err: unknown) {
if (!err || typeof err !== "object") return false
if ("code" in err && err.code === "ENOENT") return true
if ("reason" in err && err.reason && typeof err.reason === "object" && "_tag" in err.reason) {
return err.reason._tag === "NotFound"
}
return false
}
function parseMigration(text: string) {
const value = Number.parseInt(text, 10)
return Number.isNaN(value) ? 0 : value
}
const MIGRATIONS: Migration[] = [ const MIGRATIONS: Migration[] = [
async (dir) => { Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface) {
const project = path.resolve(dir, "../project") const project = path.resolve(dir, "../project")
if (!(await Filesystem.isDir(project))) return if (!(yield* fs.isDir(project))) return
const projectDirs = await Glob.scan("*", { const projectDirs = yield* fs.glob("*", {
cwd: project, cwd: project,
include: "all", include: "all",
}) })
for (const projectDir of projectDirs) { for (const projectDir of projectDirs) {
const fullPath = path.join(project, projectDir) const full = path.join(project, projectDir)
if (!(await Filesystem.isDir(fullPath))) continue if (!(yield* fs.isDir(full))) continue
log.info(`migrating project ${projectDir}`) log.info(`migrating project ${projectDir}`)
let projectID = projectDir let projectID = projectDir
const fullProjectDir = path.join(project, projectDir)
let worktree = "/" let worktree = "/"
if (projectID !== "global") { if (projectID !== "global") {
for (const msgFile of await Glob.scan("storage/session/message/*/*.json", { for (const msgFile of yield* fs.glob("storage/session/message/*/*.json", {
cwd: path.join(project, projectDir), cwd: full,
absolute: true, absolute: true,
})) { })) {
const json = await Filesystem.readJson<any>(msgFile) const json = decodeRoot(yield* fs.readJson(msgFile), { onExcessProperty: "preserve" })
worktree = json.path?.root const root = Option.isSome(json) ? json.value.path?.root : undefined
if (worktree) break if (!root) continue
worktree = root
break
} }
if (!worktree) continue if (!worktree) continue
if (!(await Filesystem.isDir(worktree))) continue if (!(yield* fs.isDir(worktree))) continue
const result = await git(["rev-list", "--max-parents=0", "--all"], { const result = yield* Effect.promise(() =>
cwd: worktree, git(["rev-list", "--max-parents=0", "--all"], {
}) cwd: worktree,
}),
)
const [id] = result const [id] = result
.text() .text()
.split("\n") .split("\n")
@ -61,157 +124,230 @@ export namespace Storage {
if (!id) continue if (!id) continue
projectID = id projectID = id
await Filesystem.writeJson(path.join(dir, "project", projectID + ".json"), { yield* fs.writeWithDirs(
id, path.join(dir, "project", projectID + ".json"),
vcs: "git", JSON.stringify(
worktree, {
time: { id,
created: Date.now(), vcs: "git",
initialized: Date.now(), worktree,
}, time: {
}) created: Date.now(),
initialized: Date.now(),
},
},
null,
2,
),
)
log.info(`migrating sessions for project ${projectID}`) log.info(`migrating sessions for project ${projectID}`)
for (const sessionFile of await Glob.scan("storage/session/info/*.json", { for (const sessionFile of yield* fs.glob("storage/session/info/*.json", {
cwd: fullProjectDir, cwd: full,
absolute: true, absolute: true,
})) { })) {
const dest = path.join(dir, "session", projectID, path.basename(sessionFile)) const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
log.info("copying", { log.info("copying", { sessionFile, dest })
sessionFile, const session = yield* fs.readJson(sessionFile)
dest, const info = decodeSession(session, { onExcessProperty: "preserve" })
}) yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2))
const session = await Filesystem.readJson<any>(sessionFile) if (Option.isNone(info)) continue
await Filesystem.writeJson(dest, session) log.info(`migrating messages for session ${info.value.id}`)
log.info(`migrating messages for session ${session.id}`) for (const msgFile of yield* fs.glob(`storage/session/message/${info.value.id}/*.json`, {
for (const msgFile of await Glob.scan(`storage/session/message/${session.id}/*.json`, { cwd: full,
cwd: fullProjectDir,
absolute: true, absolute: true,
})) { })) {
const dest = path.join(dir, "message", session.id, path.basename(msgFile)) const next = path.join(dir, "message", info.value.id, path.basename(msgFile))
log.info("copying", { log.info("copying", {
msgFile, msgFile,
dest, dest: next,
}) })
const message = await Filesystem.readJson<any>(msgFile) const message = yield* fs.readJson(msgFile)
await Filesystem.writeJson(dest, message) const item = decodeMessage(message, { onExcessProperty: "preserve" })
yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2))
if (Option.isNone(item)) continue
log.info(`migrating parts for message ${message.id}`) log.info(`migrating parts for message ${item.value.id}`)
for (const partFile of await Glob.scan(`storage/session/part/${session.id}/${message.id}/*.json`, { for (const partFile of yield* fs.glob(`storage/session/part/${info.value.id}/${item.value.id}/*.json`, {
cwd: fullProjectDir, cwd: full,
absolute: true, absolute: true,
})) { })) {
const dest = path.join(dir, "part", message.id, path.basename(partFile)) const out = path.join(dir, "part", item.value.id, path.basename(partFile))
const part = await Filesystem.readJson(partFile) const part = yield* fs.readJson(partFile)
log.info("copying", { log.info("copying", {
partFile, partFile,
dest, dest: out,
}) })
await Filesystem.writeJson(dest, part) yield* fs.writeWithDirs(out, JSON.stringify(part, null, 2))
} }
} }
} }
} }
} }
}, }),
async (dir) => { Effect.fn("Storage.migration.2")(function* (dir: string, fs: AppFileSystem.Interface) {
for (const item of await Glob.scan("session/*/*.json", { for (const item of yield* fs.glob("session/*/*.json", {
cwd: dir, cwd: dir,
absolute: true, absolute: true,
})) { })) {
const session = await Filesystem.readJson<any>(item) const raw = yield* fs.readJson(item)
if (!session.projectID) continue const session = decodeSummary(raw, { onExcessProperty: "preserve" })
if (!session.summary?.diffs) continue if (Option.isNone(session)) continue
const { diffs } = session.summary const diffs = session.value.summary.diffs
await Filesystem.write(path.join(dir, "session_diff", session.id + ".json"), JSON.stringify(diffs)) yield* fs.writeWithDirs(
await Filesystem.writeJson(path.join(dir, "session", session.projectID, session.id + ".json"), { path.join(dir, "session_diff", session.value.id + ".json"),
...session, JSON.stringify(diffs, null, 2),
summary: { )
additions: diffs.reduce((sum: any, x: any) => sum + x.additions, 0), yield* fs.writeWithDirs(
deletions: diffs.reduce((sum: any, x: any) => sum + x.deletions, 0), path.join(dir, "session", session.value.projectID, session.value.id + ".json"),
}, JSON.stringify(
}) {
...(raw as Record<string, unknown>),
summary: {
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
},
},
null,
2,
),
)
} }
}, }),
] ]
const state = lazy(async () => { export const layer = Layer.effect(
const dir = path.join(Global.Path.data, "storage") Service,
const migration = await Filesystem.readJson<string>(path.join(dir, "migration")) Effect.gen(function* () {
.then((x) => parseInt(x)) const fs = yield* AppFileSystem.Service
.catch(() => 0) const locks = yield* RcMap.make({
for (let index = migration; index < MIGRATIONS.length; index++) { lookup: () => TxReentrantLock.make(),
log.info("running migration", { index }) idleTimeToLive: 0,
const migration = MIGRATIONS[index] })
await migration(dir).catch(() => log.error("failed to run migration", { index })) const state = yield* Effect.cached(
await Filesystem.write(path.join(dir, "migration"), (index + 1).toString()) Effect.gen(function* () {
} const dir = path.join(Global.Path.data, "storage")
return { const marker = path.join(dir, "migration")
dir, const migration = yield* fs.readFileString(marker).pipe(
} Effect.map(parseMigration),
}) Effect.catchIf(missing, () => Effect.succeed(0)),
Effect.orElseSucceed(() => 0),
)
for (let i = migration; i < MIGRATIONS.length; i++) {
log.info("running migration", { index: i })
const step = MIGRATIONS[i]!
const exit = yield* Effect.exit(step(dir, fs))
if (Exit.isFailure(exit)) {
log.error("failed to run migration", { index: i, cause: exit.cause })
break
}
yield* fs.writeWithDirs(marker, String(i + 1))
}
return { dir }
}),
)
const fail = (target: string): Effect.Effect<never, InstanceType<typeof NotFoundError>> =>
Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` }))
const wrap = <A>(target: string, body: Effect.Effect<A, AppFileSystem.Error>) =>
body.pipe(Effect.catchIf(missing, () => fail(target)))
const writeJson = Effect.fnUntraced(function* (target: string, content: unknown) {
yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2))
})
const withResolved = <A, E>(
key: string[],
fn: (target: string, rw: TxReentrantLock.TxReentrantLock) => Effect.Effect<A, E>,
): Effect.Effect<A, E | AppFileSystem.Error> =>
Effect.scoped(
Effect.gen(function* () {
const target = file((yield* state).dir, key)
return yield* fn(target, yield* RcMap.get(locks, target))
}),
)
const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) {
yield* withResolved(key, (target, rw) =>
TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))),
)
})
const read: Interface["read"] = <T>(key: string[]) =>
Effect.gen(function* () {
const value = yield* withResolved(key, (target, rw) =>
TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))),
)
return value as T
})
const update: Interface["update"] = <T>(key: string[], fn: (draft: T) => void) =>
Effect.gen(function* () {
const value = yield* withResolved(key, (target, rw) =>
TxReentrantLock.withWriteLock(
rw,
Effect.gen(function* () {
const content = yield* wrap(target, fs.readJson(target))
fn(content as T)
yield* writeJson(target, content)
return content
}),
),
)
return value as T
})
const write: Interface["write"] = (key: string[], content: unknown) =>
Effect.gen(function* () {
yield* withResolved(key, (target, rw) => TxReentrantLock.withWriteLock(rw, writeJson(target, content)))
})
const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) {
const dir = (yield* state).dir
const cwd = path.join(dir, ...prefix)
const result = yield* fs
.glob("**/*", {
cwd,
include: "file",
})
.pipe(Effect.catch(() => Effect.succeed<string[]>([])))
return result
.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])
.toSorted((a, b) => a.join("/").localeCompare(b.join("/")))
})
return Service.of({
remove,
read,
update,
write,
list,
})
}),
)
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer))
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function remove(key: string[]) { export async function remove(key: string[]) {
const dir = await state().then((x) => x.dir) return runPromise((svc) => svc.remove(key))
const target = path.join(dir, ...key) + ".json"
return withErrorHandling(async () => {
await fs.unlink(target).catch(() => {})
})
} }
export async function read<T>(key: string[]) { export async function read<T>(key: string[]) {
const dir = await state().then((x) => x.dir) return runPromise((svc) => svc.read<T>(key))
const target = path.join(dir, ...key) + ".json"
return withErrorHandling(async () => {
using _ = await Lock.read(target)
const result = await Filesystem.readJson<T>(target)
return result as T
})
} }
export async function update<T>(key: string[], fn: (draft: T) => void) { export async function update<T>(key: string[], fn: (draft: T) => void) {
const dir = await state().then((x) => x.dir) return runPromise((svc) => svc.update<T>(key, fn))
const target = path.join(dir, ...key) + ".json"
return withErrorHandling(async () => {
using _ = await Lock.write(target)
const content = await Filesystem.readJson<T>(target)
fn(content as T)
await Filesystem.writeJson(target, content)
return content
})
} }
export async function write<T>(key: string[], content: T) { export async function write<T>(key: string[], content: T) {
const dir = await state().then((x) => x.dir) return runPromise((svc) => svc.write(key, content))
const target = path.join(dir, ...key) + ".json"
return withErrorHandling(async () => {
using _ = await Lock.write(target)
await Filesystem.writeJson(target, content)
})
}
async function withErrorHandling<T>(body: () => Promise<T>) {
return body().catch((e) => {
if (!(e instanceof Error)) throw e
const errnoException = e as NodeJS.ErrnoException
if (errnoException.code === "ENOENT") {
throw new NotFoundError({ message: `Resource not found: ${errnoException.path}` })
}
throw e
})
} }
export async function list(prefix: string[]) { export async function list(prefix: string[]) {
const dir = await state().then((x) => x.dir) return runPromise((svc) => svc.list(prefix))
try {
const result = await Glob.scan("**/*", {
cwd: path.join(dir, ...prefix),
include: "file",
}).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)]))
result.sort()
return result
} catch {
return []
}
} }
} }

View File

@ -13,6 +13,18 @@ afterEach(async () => {
await Instance.disposeAll() await Instance.disposeAll()
}) })
async function withoutWatcher<T>(fn: () => Promise<T>) {
if (process.platform !== "win32") return fn()
const prev = process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = "true"
try {
return await fn()
} finally {
if (prev === undefined) delete process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
else process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = prev
}
}
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) { async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
const ids = [] as MessageID[] const ids = [] as MessageID[]
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
@ -42,86 +54,94 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D
describe("session messages endpoint", () => { describe("session messages endpoint", () => {
test("returns cursor headers for older pages", async () => { test("returns cursor headers for older pages", async () => {
await using tmp = await tmpdir({ git: true }) await using tmp = await tmpdir({ git: true })
await Instance.provide({ await withoutWatcher(() =>
directory: tmp.path, Instance.provide({
fn: async () => { directory: tmp.path,
const session = await Session.create({}) fn: async () => {
const ids = await fill(session.id, 5) const session = await Session.create({})
const app = Server.Default() const ids = await fill(session.id, 5)
const app = Server.Default()
const a = await app.request(`/session/${session.id}/message?limit=2`) const a = await app.request(`/session/${session.id}/message?limit=2`)
expect(a.status).toBe(200) expect(a.status).toBe(200)
const aBody = (await a.json()) as MessageV2.WithParts[] const aBody = (await a.json()) as MessageV2.WithParts[]
expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2)) expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2))
const cursor = a.headers.get("x-next-cursor") const cursor = a.headers.get("x-next-cursor")
expect(cursor).toBeTruthy() expect(cursor).toBeTruthy()
expect(a.headers.get("link")).toContain('rel="next"') expect(a.headers.get("link")).toContain('rel="next"')
const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`) const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`)
expect(b.status).toBe(200) expect(b.status).toBe(200)
const bBody = (await b.json()) as MessageV2.WithParts[] const bBody = (await b.json()) as MessageV2.WithParts[]
expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
await Session.remove(session.id) await Session.remove(session.id)
}, },
}) }),
)
}) })
test("keeps full-history responses when limit is omitted", async () => { test("keeps full-history responses when limit is omitted", async () => {
await using tmp = await tmpdir({ git: true }) await using tmp = await tmpdir({ git: true })
await Instance.provide({ await withoutWatcher(() =>
directory: tmp.path, Instance.provide({
fn: async () => { directory: tmp.path,
const session = await Session.create({}) fn: async () => {
const ids = await fill(session.id, 3) const session = await Session.create({})
const app = Server.Default() const ids = await fill(session.id, 3)
const app = Server.Default()
const res = await app.request(`/session/${session.id}/message`) const res = await app.request(`/session/${session.id}/message`)
expect(res.status).toBe(200) expect(res.status).toBe(200)
const body = (await res.json()) as MessageV2.WithParts[] const body = (await res.json()) as MessageV2.WithParts[]
expect(body.map((item) => item.info.id)).toEqual(ids) expect(body.map((item) => item.info.id)).toEqual(ids)
await Session.remove(session.id) await Session.remove(session.id)
}, },
}) }),
)
}) })
test("rejects invalid cursors and missing sessions", async () => { test("rejects invalid cursors and missing sessions", async () => {
await using tmp = await tmpdir({ git: true }) await using tmp = await tmpdir({ git: true })
await Instance.provide({ await withoutWatcher(() =>
directory: tmp.path, Instance.provide({
fn: async () => { directory: tmp.path,
const session = await Session.create({}) fn: async () => {
const app = Server.Default() const session = await Session.create({})
const app = Server.Default()
const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`) const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`)
expect(bad.status).toBe(400) expect(bad.status).toBe(400)
const miss = await app.request(`/session/ses_missing/message?limit=2`) const miss = await app.request(`/session/ses_missing/message?limit=2`)
expect(miss.status).toBe(404) expect(miss.status).toBe(404)
await Session.remove(session.id) await Session.remove(session.id)
}, },
}) }),
)
}) })
test("does not truncate large legacy limit requests", async () => { test("does not truncate large legacy limit requests", async () => {
await using tmp = await tmpdir({ git: true }) await using tmp = await tmpdir({ git: true })
await Instance.provide({ await withoutWatcher(() =>
directory: tmp.path, Instance.provide({
fn: async () => { directory: tmp.path,
const session = await Session.create({}) fn: async () => {
await fill(session.id, 520) const session = await Session.create({})
const app = Server.Default() await fill(session.id, 520)
const app = Server.Default()
const res = await app.request(`/session/${session.id}/message?limit=510`) const res = await app.request(`/session/${session.id}/message?limit=510`)
expect(res.status).toBe(200) expect(res.status).toBe(200)
const body = (await res.json()) as MessageV2.WithParts[] const body = (await res.json()) as MessageV2.WithParts[]
expect(body).toHaveLength(510) expect(body).toHaveLength(510)
await Session.remove(session.id) await Session.remove(session.id)
}, },
}) }),
)
}) })
}) })

View File

@ -0,0 +1,295 @@
import { describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
import { Effect, Layer, ManagedRuntime } from "effect"
import { AppFileSystem } from "../../src/filesystem"
import { Global } from "../../src/global"
import { Storage } from "../../src/storage/storage"
import { tmpdir } from "../fixture/fixture"
const dir = path.join(Global.Path.data, "storage")
async function withScope<T>(fn: (root: string[]) => Promise<T>) {
const root = ["storage_test", crypto.randomUUID()]
try {
return await fn(root)
} finally {
await fs.rm(path.join(dir, ...root), { recursive: true, force: true })
}
}
function map(root: string, file: string) {
if (file === Global.Path.data) return root
if (file.startsWith(Global.Path.data + path.sep)) return path.join(root, path.relative(Global.Path.data, file))
return file
}
function layer(root: string) {
return Layer.effect(
AppFileSystem.Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
return AppFileSystem.Service.of({
...fs,
isDir: (file) => fs.isDir(map(root, file)),
readJson: (file) => fs.readJson(map(root, file)),
writeWithDirs: (file, content, mode) => fs.writeWithDirs(map(root, file), content, mode),
readFileString: (file) => fs.readFileString(map(root, file)),
remove: (file) => fs.remove(map(root, file)),
glob: (pattern, options) =>
fs.glob(pattern, options?.cwd ? { ...options, cwd: map(root, options.cwd) } : options),
})
}),
).pipe(Layer.provide(AppFileSystem.defaultLayer))
}
async function withStorage<T>(
root: string,
fn: (run: <A, E>(body: Effect.Effect<A, E, Storage.Service>) => Promise<A>) => Promise<T>,
) {
const rt = ManagedRuntime.make(Storage.layer.pipe(Layer.provide(layer(root))))
try {
return await fn((body) => rt.runPromise(body))
} finally {
await rt.dispose()
}
}
async function write(file: string, value: unknown) {
await fs.mkdir(path.dirname(file), { recursive: true })
await Bun.write(file, JSON.stringify(value, null, 2))
}
async function text(file: string, value: string) {
await fs.mkdir(path.dirname(file), { recursive: true })
await Bun.write(file, value)
}
async function exists(file: string) {
return fs
.stat(file)
.then(() => true)
.catch(() => false)
}
describe("Storage", () => {
test("round-trips JSON content", async () => {
await withScope(async (root) => {
const key = [...root, "session_diff", "roundtrip"]
const value = [{ file: "a.ts", additions: 2, deletions: 1 }]
await Storage.write(key, value)
expect(await Storage.read<typeof value>(key)).toEqual(value)
})
})
test("maps missing reads to NotFoundError", async () => {
await withScope(async (root) => {
await expect(Storage.read([...root, "missing", "value"])).rejects.toMatchObject({ name: "NotFoundError" })
})
})
test("update on missing key throws NotFoundError", async () => {
await withScope(async (root) => {
await expect(
Storage.update<{ value: number }>([...root, "missing", "key"], (draft) => {
draft.value += 1
}),
).rejects.toMatchObject({ name: "NotFoundError" })
})
})
test("write overwrites existing value", async () => {
await withScope(async (root) => {
const key = [...root, "overwrite", "test"]
await Storage.write<{ v: number }>(key, { v: 1 })
await Storage.write<{ v: number }>(key, { v: 2 })
expect(await Storage.read<{ v: number }>(key)).toEqual({ v: 2 })
})
})
test("remove on missing key is a no-op", async () => {
await withScope(async (root) => {
await expect(Storage.remove([...root, "nonexistent", "key"])).resolves.toBeUndefined()
})
})
test("list on missing prefix returns empty", async () => {
await withScope(async (root) => {
expect(await Storage.list([...root, "nonexistent"])).toEqual([])
})
})
test("serializes concurrent updates for the same key", async () => {
await withScope(async (root) => {
const key = [...root, "counter", "shared"]
await Storage.write(key, { value: 0 })
await Promise.all(
Array.from({ length: 25 }, () =>
Storage.update<{ value: number }>(key, (draft) => {
draft.value += 1
}),
),
)
expect(await Storage.read<{ value: number }>(key)).toEqual({ value: 25 })
})
})
test("concurrent reads do not block each other", async () => {
await withScope(async (root) => {
const key = [...root, "concurrent", "reads"]
await Storage.write(key, { ok: true })
const results = await Promise.all(Array.from({ length: 10 }, () => Storage.read(key)))
expect(results).toHaveLength(10)
for (const r of results) expect(r).toEqual({ ok: true })
})
})
test("nested keys create deep paths", async () => {
await withScope(async (root) => {
const key = [...root, "a", "b", "c", "deep"]
await Storage.write<{ nested: boolean }>(key, { nested: true })
expect(await Storage.read<{ nested: boolean }>(key)).toEqual({ nested: true })
expect(await Storage.list([...root, "a"])).toEqual([key])
})
})
test("lists and removes stored entries", async () => {
await withScope(async (root) => {
const a = [...root, "list", "a"]
const b = [...root, "list", "b"]
const prefix = [...root, "list"]
await Storage.write(b, { value: 2 })
await Storage.write(a, { value: 1 })
expect(await Storage.list(prefix)).toEqual([a, b])
await Storage.remove(a)
expect(await Storage.list(prefix)).toEqual([b])
await expect(Storage.read(a)).rejects.toMatchObject({ name: "NotFoundError" })
})
})
test("migration 2 runs when marker contents are invalid", async () => {
await using tmp = await tmpdir()
const storage = path.join(tmp.path, "storage")
const diffs = [
{ additions: 2, deletions: 1 },
{ additions: 3, deletions: 4 },
]
await text(path.join(storage, "migration"), "wat")
await write(path.join(storage, "session", "proj_test", "ses_test.json"), {
id: "ses_test",
projectID: "proj_test",
title: "legacy",
summary: { diffs },
})
await withStorage(tmp.path, async (run) => {
expect(await run(Storage.Service.use((svc) => svc.list(["session_diff"])))).toEqual([
["session_diff", "ses_test"],
])
expect(await run(Storage.Service.use((svc) => svc.read<typeof diffs>(["session_diff", "ses_test"])))).toEqual(
diffs,
)
expect(
await run(
Storage.Service.use((svc) =>
svc.read<{
id: string
projectID: string
title: string
summary: {
additions: number
deletions: number
}
}>(["session", "proj_test", "ses_test"]),
),
),
).toEqual({
id: "ses_test",
projectID: "proj_test",
title: "legacy",
summary: {
additions: 5,
deletions: 5,
},
})
})
expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2")
})
test("migration 1 tolerates malformed legacy records", async () => {
await using tmp = await tmpdir({ git: true })
const storage = path.join(tmp.path, "storage")
const legacy = path.join(tmp.path, "project", "legacy")
await write(path.join(legacy, "storage", "session", "message", "probe", "0.json"), [])
await write(path.join(legacy, "storage", "session", "message", "probe", "1.json"), {
path: { root: tmp.path },
})
await write(path.join(legacy, "storage", "session", "info", "ses_legacy.json"), {
id: "ses_legacy",
title: "legacy",
})
await write(path.join(legacy, "storage", "session", "message", "ses_legacy", "msg_legacy.json"), {
role: "user",
text: "hello",
})
await withStorage(tmp.path, async (run) => {
const projects = await run(Storage.Service.use((svc) => svc.list(["project"])))
expect(projects).toHaveLength(1)
const project = projects[0]![1]
expect(await run(Storage.Service.use((svc) => svc.list(["session", project])))).toEqual([
["session", project, "ses_legacy"],
])
expect(
await run(
Storage.Service.use((svc) => svc.read<{ id: string; title: string }>(["session", project, "ses_legacy"])),
),
).toEqual({
id: "ses_legacy",
title: "legacy",
})
expect(
await run(
Storage.Service.use((svc) =>
svc.read<{ role: string; text: string }>(["message", "ses_legacy", "msg_legacy"]),
),
),
).toEqual({
role: "user",
text: "hello",
})
})
expect(await Bun.file(path.join(storage, "migration")).text()).toBe("2")
})
test("failed migrations do not advance the marker", async () => {
await using tmp = await tmpdir()
const storage = path.join(tmp.path, "storage")
const legacy = path.join(tmp.path, "project", "legacy")
await text(path.join(legacy, "storage", "session", "message", "probe", "0.json"), "{")
await withStorage(tmp.path, async (run) => {
expect(await run(Storage.Service.use((svc) => svc.list(["project"])))).toEqual([])
})
expect(await exists(path.join(storage, "migration"))).toBe(false)
})
})