diff --git a/infra/stage.ts b/infra/stage.ts index f9a6fd7552..96f502c6e1 100644 --- a/infra/stage.ts +++ b/infra/stage.ts @@ -6,7 +6,7 @@ export const domain = (() => { export const zoneID = "430ba34c138cfb5360826c4909f99be8" -new cloudflare.RegionalHostname("RegionalHostname", { +new cloudflxare.RegionalHostname("RegionalHostname", { hostname: domain, regionKey: "us", zoneId: zoneID, diff --git a/packages/opencode/migration/0000_vengeful_the_watchers.sql b/packages/opencode/migration/0000_watery_shinobi_shaw.sql similarity index 92% rename from packages/opencode/migration/0000_vengeful_the_watchers.sql rename to packages/opencode/migration/0000_watery_shinobi_shaw.sql index 02f74fad87..a251a81e84 100644 --- a/packages/opencode/migration/0000_vengeful_the_watchers.sql +++ b/packages/opencode/migration/0000_watery_shinobi_shaw.sql @@ -22,13 +22,11 @@ CREATE INDEX `message_session_idx` ON `message` (`session_id`);--> statement-bre CREATE TABLE `part` ( `id` text PRIMARY KEY NOT NULL, `message_id` text NOT NULL, - `session_id` text NOT NULL, `data` text NOT NULL, FOREIGN KEY (`message_id`) REFERENCES `message`(`id`) ON UPDATE no action ON DELETE cascade ); --> statement-breakpoint CREATE INDEX `part_message_idx` ON `part` (`message_id`);--> statement-breakpoint -CREATE INDEX `part_session_idx` ON `part` (`session_id`);--> statement-breakpoint CREATE TABLE `permission` ( `project_id` text PRIMARY KEY NOT NULL, `data` text NOT NULL, @@ -36,11 +34,16 @@ CREATE TABLE `permission` ( ); --> statement-breakpoint CREATE TABLE `session_diff` ( - `session_id` text PRIMARY KEY NOT NULL, - `data` text NOT NULL, + `session_id` text NOT NULL, + `file` text NOT NULL, + `before` text NOT NULL, + `after` text NOT NULL, + `additions` integer NOT NULL, + `deletions` integer NOT NULL, FOREIGN KEY (`session_id`) REFERENCES `session`(`id`) ON UPDATE no action ON DELETE cascade ); --> statement-breakpoint +CREATE INDEX `session_diff_session_idx` ON `session_diff` (`session_id`);--> statement-breakpoint CREATE TABLE `session` ( `id` text PRIMARY KEY NOT NULL, `project_id` text NOT NULL, diff --git a/packages/opencode/migration/meta/0000_snapshot.json b/packages/opencode/migration/meta/0000_snapshot.json index 5138104994..ee8461c187 100644 --- a/packages/opencode/migration/meta/0000_snapshot.json +++ b/packages/opencode/migration/meta/0000_snapshot.json @@ -1,7 +1,7 @@ { "version": "6", "dialect": "sqlite", - "id": "86d0107e-84d7-4b08-8411-afab7d6b1ee2", + "id": "9970ec30-1179-4dd7-a0ab-6d0cf0f42219", "prevId": "00000000-0000-0000-0000-000000000000", "tables": { "project": { @@ -154,13 +154,6 @@ "notNull": true, "autoincrement": false }, - "session_id": { - "name": "session_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, "data": { "name": "data", "type": "text", @@ -176,13 +169,6 @@ "message_id" ], "isUnique": false - }, - "part_session_idx": { - "name": "part_session_idx", - "columns": [ - "session_id" - ], - "isUnique": false } }, "foreignKeys": { @@ -248,19 +234,55 @@ "session_id": { "name": "session_id", "type": "text", - "primaryKey": true, + "primaryKey": false, "notNull": true, "autoincrement": false }, - "data": { - "name": "data", + "file": { + "name": "file", "type": "text", "primaryKey": false, "notNull": true, "autoincrement": false + }, + "before": { + "name": "before", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "after": { + "name": "after", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "additions": { + "name": "additions", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "deletions": { + "name": "deletions", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "session_diff_session_idx": { + "name": "session_diff_session_idx", + "columns": [ + "session_id" + ], + "isUnique": false } }, - "indexes": {}, "foreignKeys": { "session_diff_session_id_session_id_fk": { "name": "session_diff_session_id_session_id_fk", diff --git a/packages/opencode/migration/meta/_journal.json b/packages/opencode/migration/meta/_journal.json index 2fcbe6739a..0d30f42477 100644 --- a/packages/opencode/migration/meta/_journal.json +++ b/packages/opencode/migration/meta/_journal.json @@ -5,8 +5,8 @@ { "idx": 0, "version": "6", - "when": 1768518709430, - "tag": "0000_vengeful_the_watchers", + "when": 1768609466939, + "tag": "0000_watery_shinobi_shaw", "breakpoints": true } ] diff --git a/packages/opencode/src/cli/cmd/database.ts b/packages/opencode/src/cli/cmd/database.ts index 5b3c1485f3..291b656119 100644 --- a/packages/opencode/src/cli/cmd/database.ts +++ b/packages/opencode/src/cli/cmd/database.ts @@ -2,7 +2,7 @@ import type { Argv } from "yargs" import { cmd } from "./cmd" import { bootstrap } from "../bootstrap" import { UI } from "../ui" -import { db } from "../../storage/db" +import { Database } from "../../storage/db" import { ProjectTable } from "../../project/project.sql" import { Project } from "../../project/project" import { @@ -56,7 +56,7 @@ const ExportCommand = cmd({ // Export projects const projectDir = path.join(outDir, "project") await fs.mkdir(projectDir, { recursive: true }) - for (const row of db().select().from(ProjectTable).all()) { + for (const row of Database.use((db) => db.select().from(ProjectTable).all())) { const project = Project.fromRow(row) await Bun.write(path.join(projectDir, `${row.id}.json`), JSON.stringify(project, null, 2)) stats.projects++ @@ -64,7 +64,7 @@ const ExportCommand = cmd({ // Export sessions (organized by projectID) const sessionDir = path.join(outDir, "session") - for (const row of db().select().from(SessionTable).all()) { + for (const row of Database.use((db) => db.select().from(SessionTable).all())) { const dir = path.join(sessionDir, row.projectID) await fs.mkdir(dir, { recursive: true }) await Bun.write(path.join(dir, `${row.id}.json`), JSON.stringify(Session.fromRow(row), null, 2)) @@ -73,7 +73,7 @@ const ExportCommand = cmd({ // Export messages (organized by sessionID) const messageDir = path.join(outDir, "message") - for (const row of db().select().from(MessageTable).all()) { + for (const row of Database.use((db) => db.select().from(MessageTable).all())) { const dir = path.join(messageDir, row.sessionID) await fs.mkdir(dir, { recursive: true }) await Bun.write(path.join(dir, `${row.id}.json`), JSON.stringify(row.data, null, 2)) @@ -82,7 +82,7 @@ const ExportCommand = cmd({ // Export parts (organized by messageID) const partDir = path.join(outDir, "part") - for (const row of db().select().from(PartTable).all()) { + for (const row of Database.use((db) => db.select().from(PartTable).all())) { const dir = path.join(partDir, row.messageID) await fs.mkdir(dir, { recursive: true }) await Bun.write(path.join(dir, `${row.id}.json`), JSON.stringify(row.data, null, 2)) @@ -92,15 +92,15 @@ const ExportCommand = cmd({ // Export session diffs const diffDir = path.join(outDir, "session_diff") await fs.mkdir(diffDir, { recursive: true }) - for (const row of db().select().from(SessionDiffTable).all()) { - await Bun.write(path.join(diffDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2)) + for (const row of Database.use((db) => db.select().from(SessionDiffTable).all())) { + await Bun.write(path.join(diffDir, `${row.sessionID}.json`), JSON.stringify(row, null, 2)) stats.diffs++ } // Export todos const todoDir = path.join(outDir, "todo") await fs.mkdir(todoDir, { recursive: true }) - for (const row of db().select().from(TodoTable).all()) { + for (const row of Database.use((db) => db.select().from(TodoTable).all())) { await Bun.write(path.join(todoDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2)) stats.todos++ } @@ -108,7 +108,7 @@ const ExportCommand = cmd({ // Export permissions const permDir = path.join(outDir, "permission") await fs.mkdir(permDir, { recursive: true }) - for (const row of db().select().from(PermissionTable).all()) { + for (const row of Database.use((db) => db.select().from(PermissionTable).all())) { await Bun.write(path.join(permDir, `${row.projectID}.json`), JSON.stringify(row.data, null, 2)) stats.permissions++ } @@ -116,7 +116,7 @@ const ExportCommand = cmd({ // Export session shares const sessionShareDir = path.join(outDir, "session_share") await fs.mkdir(sessionShareDir, { recursive: true }) - for (const row of db().select().from(SessionShareTable).all()) { + for (const row of Database.use((db) => db.select().from(SessionShareTable).all())) { await Bun.write(path.join(sessionShareDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2)) stats.sessionShares++ } @@ -124,7 +124,7 @@ const ExportCommand = cmd({ // Export shares const shareDir = path.join(outDir, "share") await fs.mkdir(shareDir, { recursive: true }) - for (const row of db().select().from(ShareTable).all()) { + for (const row of Database.use((db) => db.select().from(ShareTable).all())) { await Bun.write(path.join(shareDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2)) stats.shares++ } diff --git a/packages/opencode/src/cli/cmd/import.ts b/packages/opencode/src/cli/cmd/import.ts index 600749602e..1f13bb689d 100644 --- a/packages/opencode/src/cli/cmd/import.ts +++ b/packages/opencode/src/cli/cmd/import.ts @@ -2,7 +2,7 @@ import type { Argv } from "yargs" import { Session } from "../../session" import { cmd } from "./cmd" import { bootstrap } from "../bootstrap" -import { db } from "../../storage/db" +import { Database } from "../../storage/db" import { SessionTable, MessageTable, PartTable } from "../../session/session.sql" import { Instance } from "../../project/instance" import { EOL } from "os" @@ -106,30 +106,37 @@ export const ImportCommand = cmd({ time_compacting: info.time.compacting, time_archived: info.time.archived, } - db().insert(SessionTable).values(row).onConflictDoUpdate({ target: SessionTable.id, set: row }).run() + Database.use((db) => + db.insert(SessionTable).values(row).onConflictDoUpdate({ target: SessionTable.id, set: row }).run(), + ) for (const msg of exportData.messages) { - db() - .insert(MessageTable) - .values({ - id: msg.info.id, - sessionID: exportData.info.id, - data: msg.info, - }) - .onConflictDoUpdate({ target: MessageTable.id, set: { data: msg.info } }) - .run() + const { id: msgId, sessionID: msgSessionID, ...msgData } = msg.info + Database.use((db) => + db + .insert(MessageTable) + .values({ + id: msgId, + sessionID: exportData.info.id, + data: msgData, + }) + .onConflictDoUpdate({ target: MessageTable.id, set: { data: msgData } }) + .run(), + ) for (const part of msg.parts) { - db() - .insert(PartTable) - .values({ - id: part.id, - messageID: msg.info.id, - sessionID: exportData.info.id, - data: part, - }) - .onConflictDoUpdate({ target: PartTable.id, set: { data: part } }) - .run() + const { id: partId, messageID: _, sessionID: __, ...partData } = part + Database.use((db) => + db + .insert(PartTable) + .values({ + id: partId, + messageID: msg.info.id, + data: partData, + }) + .onConflictDoUpdate({ target: PartTable.id, set: { data: partData } }) + .run(), + ) } } diff --git a/packages/opencode/src/cli/cmd/stats.ts b/packages/opencode/src/cli/cmd/stats.ts index 919458a4cd..71ab4d9c33 100644 --- a/packages/opencode/src/cli/cmd/stats.ts +++ b/packages/opencode/src/cli/cmd/stats.ts @@ -2,7 +2,7 @@ import type { Argv } from "yargs" import { cmd } from "./cmd" import { Session } from "../../session" import { bootstrap } from "../bootstrap" -import { db } from "../../storage/db" +import { Database } from "../../storage/db" import { ProjectTable } from "../../project/project.sql" import { SessionTable } from "../../session/session.sql" import { Project } from "../../project/project" @@ -85,7 +85,7 @@ async function getCurrentProject(): Promise { } async function getAllSessions(): Promise { - const sessionRows = db().select().from(SessionTable).all() + const sessionRows = Database.use((db) => db.select().from(SessionTable).all()) return sessionRows.map((row) => Session.fromRow(row)) } diff --git a/packages/opencode/src/permission/next.ts b/packages/opencode/src/permission/next.ts index 2bd19b5e1d..9873469b9e 100644 --- a/packages/opencode/src/permission/next.ts +++ b/packages/opencode/src/permission/next.ts @@ -3,7 +3,7 @@ import { BusEvent } from "@/bus/bus-event" import { Config } from "@/config/config" import { Identifier } from "@/id/id" import { Instance } from "@/project/instance" -import { db } from "@/storage/db" +import { Database } from "@/storage/db" import { PermissionTable } from "@/session/session.sql" import { eq } from "drizzle-orm" import { fn } from "@/util/fn" @@ -109,7 +109,9 @@ export namespace PermissionNext { const state = Instance.state(async () => { const projectID = Instance.project.id - const row = db().select().from(PermissionTable).where(eq(PermissionTable.projectID, projectID)).get() + const row = Database.use((db) => + db.select().from(PermissionTable).where(eq(PermissionTable.projectID, projectID)).get(), + ) const stored = row?.data ?? ([] as Ruleset) const pending: Record< diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts index c0c9f81d86..8ed63a8024 100644 --- a/packages/opencode/src/project/project.ts +++ b/packages/opencode/src/project/project.ts @@ -3,7 +3,7 @@ import fs from "fs/promises" import { Filesystem } from "../util/filesystem" import path from "path" import { $ } from "bun" -import { db } from "../storage/db" +import { Database } from "../storage/db" import { ProjectTable } from "./project.sql" import { SessionTable } from "../session/session.sql" import { eq } from "drizzle-orm" @@ -200,7 +200,7 @@ export namespace Project { } }) - const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, id)).get() + const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, id)).get()) const existing = await iife(async () => { if (row) return fromRow(row) const fresh: Info = { @@ -254,7 +254,9 @@ export namespace Project { time_initialized: result.time.initialized, sandboxes: result.sandboxes, } - db().insert(ProjectTable).values(insert).onConflictDoUpdate({ target: ProjectTable.id, set: update }).run() + Database.use((db) => + db.insert(ProjectTable).values(insert).onConflictDoUpdate({ target: ProjectTable.id, set: update }).run(), + ) GlobalBus.emit("event", { payload: { type: Event.Updated.type, @@ -295,10 +297,12 @@ export namespace Project { } async function migrateFromGlobal(newProjectID: string, worktree: string) { - const globalRow = db().select().from(ProjectTable).where(eq(ProjectTable.id, "global")).get() + const globalRow = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, "global")).get()) if (!globalRow) return - const globalSessions = db().select().from(SessionTable).where(eq(SessionTable.projectID, "global")).all() + const globalSessions = Database.use((db) => + db.select().from(SessionTable).where(eq(SessionTable.projectID, "global")).all(), + ) if (globalSessions.length === 0) return log.info("migrating sessions from global", { newProjectID, worktree, count: globalSessions.length }) @@ -307,28 +311,34 @@ export namespace Project { if (row.directory && row.directory !== worktree) return log.info("migrating session", { sessionID: row.id, from: "global", to: newProjectID }) - db().update(SessionTable).set({ projectID: newProjectID }).where(eq(SessionTable.id, row.id)).run() + Database.use((db) => + db.update(SessionTable).set({ projectID: newProjectID }).where(eq(SessionTable.id, row.id)).run(), + ) }).catch((error) => { log.error("failed to migrate sessions from global to project", { error, projectId: newProjectID }) }) } export function setInitialized(projectID: string) { - db() - .update(ProjectTable) - .set({ - time_initialized: Date.now(), - }) - .where(eq(ProjectTable.id, projectID)) - .run() + Database.use((db) => + db + .update(ProjectTable) + .set({ + time_initialized: Date.now(), + }) + .where(eq(ProjectTable.id, projectID)) + .run(), + ) } export function list() { - return db() - .select() - .from(ProjectTable) - .all() - .map((row) => fromRow(row)) + return Database.use((db) => + db + .select() + .from(ProjectTable) + .all() + .map((row) => fromRow(row)), + ) } export const update = fn( @@ -339,17 +349,19 @@ export namespace Project { commands: Info.shape.commands.optional(), }), async (input) => { - const result = db() - .update(ProjectTable) - .set({ - name: input.name, - icon_url: input.icon?.url, - icon_color: input.icon?.color, - time_updated: Date.now(), - }) - .where(eq(ProjectTable.id, input.projectID)) - .returning() - .get() + const result = Database.use((db) => + db + .update(ProjectTable) + .set({ + name: input.name, + icon_url: input.icon?.url, + icon_color: input.icon?.color, + time_updated: Date.now(), + }) + .where(eq(ProjectTable.id, input.projectID)) + .returning() + .get(), + ) if (!result) throw new Error(`Project not found: ${input.projectID}`) const data = fromRow(result) GlobalBus.emit("event", { @@ -363,7 +375,7 @@ export namespace Project { ) export async function sandboxes(projectID: string) { - const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get() + const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get()) if (!row) return [] const data = fromRow(row) const valid: string[] = [] diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 9642eb9e4d..828d8db43b 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -6,9 +6,9 @@ import { Identifier } from "../id/id" import { LSP } from "../lsp" import { Snapshot } from "@/snapshot" import { fn } from "@/util/fn" -import { db } from "@/storage/db" +import { Database } from "@/storage/db" import { MessageTable, PartTable } from "./session.sql" -import { eq, desc } from "drizzle-orm" +import { eq, desc, lt, and, inArray } from "drizzle-orm" import { ProviderTransform } from "@/provider/transform" import { STATUS_CODES } from "http" import { iife } from "@/util/iife" @@ -40,8 +40,8 @@ export namespace MessageV2 { const PartBase = z.object({ id: z.string(), - sessionID: z.string(), messageID: z.string(), + sessionID: z.string(), }) export const SnapshotPart = PartBase.extend({ @@ -609,23 +609,84 @@ export namespace MessageV2 { } export const stream = fn(Identifier.schema("session"), async function* (sessionID) { - const rows = db() - .select() - .from(MessageTable) - .where(eq(MessageTable.sessionID, sessionID)) - .orderBy(desc(MessageTable.id)) - .all() - for (const row of rows) { - yield await get({ - sessionID, - messageID: row.id, - }) + const SIZE = 25 + let cursor: string | undefined + while (true) { + const conditions = [eq(MessageTable.sessionID, sessionID)] + if (cursor) conditions.push(lt(MessageTable.id, cursor)) + + const ids = Database.use((db) => + db + .select({ id: MessageTable.id }) + .from(MessageTable) + .where(and(...conditions)) + .orderBy(desc(MessageTable.id)) + .limit(SIZE) + .all(), + ) + if (ids.length === 0) break + + const rows = Database.use((db) => + db + .select({ + message: MessageTable, + part: PartTable, + }) + .from(MessageTable) + .leftJoin(PartTable, eq(PartTable.messageID, MessageTable.id)) + .where( + inArray( + MessageTable.id, + ids.map((row) => row.id), + ), + ) + .orderBy(desc(MessageTable.id), PartTable.id) + .all(), + ) + + const grouped = Map.groupBy(rows, (row) => row.message.id) + for (const id of ids) { + const group = grouped.get(id.id) ?? [] + const first = group[0] + if (!first) continue + yield { + info: { ...first.message.data, id: first.message.id, sessionID: first.message.sessionID } as Info, + parts: group + .filter((row) => row.part) + .map((row) => ({ + ...row.part!.data, + id: row.part!.id, + messageID: row.part!.messageID, + sessionID: first.message.sessionID, + })) as Part[], + } + } + + cursor = ids[ids.length - 1]?.id + if (ids.length < SIZE) break } }) export const parts = fn(Identifier.schema("message"), async (messageID) => { - const rows = db().select().from(PartTable).where(eq(PartTable.messageID, messageID)).all() - const result = rows.map((row) => row.data) + const rows = Database.use((db) => + db + .select({ + id: PartTable.id, + messageID: PartTable.messageID, + sessionID: MessageTable.sessionID, + data: PartTable.data, + }) + .from(PartTable) + .innerJoin(MessageTable, eq(PartTable.messageID, MessageTable.id)) + .where(eq(PartTable.messageID, messageID)) + .all(), + ) + const result = rows.map((row) => ({ + ...row.data, + id: row.id, + messageID: row.messageID, + sessionID: row.sessionID, + })) as Part[] result.sort((a, b) => (a.id > b.id ? 1 : -1)) return result }) @@ -636,11 +697,30 @@ export namespace MessageV2 { messageID: Identifier.schema("message"), }), async (input) => { - const row = db().select().from(MessageTable).where(eq(MessageTable.id, input.messageID)).get() - if (!row) throw new Error(`Message not found: ${input.messageID}`) + const rows = Database.use((db) => + db + .select({ + message: MessageTable, + part: PartTable, + }) + .from(MessageTable) + .leftJoin(PartTable, eq(PartTable.messageID, MessageTable.id)) + .where(eq(MessageTable.id, input.messageID)) + .orderBy(PartTable.id) + .all(), + ) + const first = rows[0] + if (!first) throw new Error(`Message not found: ${input.messageID}`) return { - info: row.data, - parts: await parts(input.messageID), + info: { ...first.message.data, id: first.message.id, sessionID: first.message.sessionID } as Info, + parts: rows + .filter((row) => row.part) + .map((row) => ({ + ...row.part!.data, + id: row.part!.id, + messageID: row.part!.messageID, + sessionID: first.message.sessionID, + })) as Part[], } }, ) diff --git a/packages/opencode/src/session/summary.ts b/packages/opencode/src/session/summary.ts index f391e65ec4..391305b66b 100644 --- a/packages/opencode/src/session/summary.ts +++ b/packages/opencode/src/session/summary.ts @@ -11,7 +11,7 @@ import { Snapshot } from "@/snapshot" import { Log } from "@/util/log" import path from "path" import { Instance } from "@/project/instance" -import { db } from "@/storage/db" +import { Database } from "@/storage/db" import { SessionDiffTable, SessionTable } from "./session.sql" import { eq } from "drizzle-orm" import { Bus } from "@/bus" @@ -50,23 +50,27 @@ export namespace SessionSummary { }), ) const now = Date.now() - db() - .update(SessionTable) - .set({ - summary_additions: diffs.reduce((sum, x) => sum + x.additions, 0), - summary_deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), - summary_files: diffs.length, - time_updated: now, - }) - .where(eq(SessionTable.id, input.sessionID)) - .run() + Database.use((db) => + db + .update(SessionTable) + .set({ + summary_additions: diffs.reduce((sum, x) => sum + x.additions, 0), + summary_deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), + summary_files: diffs.length, + time_updated: now, + }) + .where(eq(SessionTable.id, input.sessionID)) + .run(), + ) const session = await Session.get(input.sessionID) Bus.publish(Session.Event.Updated, { info: session }) - db() - .insert(SessionDiffTable) - .values({ sessionID: input.sessionID, data: diffs }) - .onConflictDoUpdate({ target: SessionDiffTable.sessionID, set: { data: diffs } }) - .run() + Database.use((db) => + db + .insert(SessionDiffTable) + .values({ sessionID: input.sessionID, data: diffs }) + .onConflictDoUpdate({ target: SessionDiffTable.sessionID, set: { data: diffs } }) + .run(), + ) Bus.publish(Session.Event.Diff, { sessionID: input.sessionID, diff: diffs, @@ -128,7 +132,9 @@ export namespace SessionSummary { messageID: Identifier.schema("message").optional(), }), async (input) => { - const row = db().select().from(SessionDiffTable).where(eq(SessionDiffTable.sessionID, input.sessionID)).get() + const row = Database.use((db) => + db.select().from(SessionDiffTable).where(eq(SessionDiffTable.sessionID, input.sessionID)).get(), + ) return row?.data ?? [] }, ) diff --git a/packages/opencode/src/session/todo.ts b/packages/opencode/src/session/todo.ts index 3280744662..e05c84a682 100644 --- a/packages/opencode/src/session/todo.ts +++ b/packages/opencode/src/session/todo.ts @@ -1,7 +1,7 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import z from "zod" -import { db } from "../storage/db" +import { Database } from "../storage/db" import { TodoTable } from "./session.sql" import { eq } from "drizzle-orm" @@ -27,16 +27,18 @@ export namespace Todo { } export function update(input: { sessionID: string; todos: Info[] }) { - db() - .insert(TodoTable) - .values({ sessionID: input.sessionID, data: input.todos }) - .onConflictDoUpdate({ target: TodoTable.sessionID, set: { data: input.todos } }) - .run() + Database.use((db) => + db + .insert(TodoTable) + .values({ sessionID: input.sessionID, data: input.todos }) + .onConflictDoUpdate({ target: TodoTable.sessionID, set: { data: input.todos } }) + .run(), + ) Bus.publish(Event.Updated, input) } export function get(sessionID: string) { - const row = db().select().from(TodoTable).where(eq(TodoTable.sessionID, sessionID)).get() + const row = Database.use((db) => db.select().from(TodoTable).where(eq(TodoTable.sessionID, sessionID)).get()) return row?.data ?? [] } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index e67d1461d6..f1e3d448de 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -4,7 +4,7 @@ import { ulid } from "ulid" import { Provider } from "@/provider/provider" import { Session } from "@/session" import { MessageV2 } from "@/session/message-v2" -import { db } from "@/storage/db" +import { Database } from "@/storage/db" import { SessionShareTable } from "./share.sql" import { eq } from "drizzle-orm" import { Log } from "@/util/log" @@ -79,17 +79,21 @@ export namespace ShareNext { }) .then((x) => x.json()) .then((x) => x as { id: string; url: string; secret: string }) - db() - .insert(SessionShareTable) - .values({ sessionID, data: result }) - .onConflictDoUpdate({ target: SessionShareTable.sessionID, set: { data: result } }) - .run() + Database.use((db) => + db + .insert(SessionShareTable) + .values({ sessionID, data: result }) + .onConflictDoUpdate({ target: SessionShareTable.sessionID, set: { data: result } }) + .run(), + ) fullSync(sessionID) return result } function get(sessionID: string) { - const row = db().select().from(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).get() + const row = Database.use((db) => + db.select().from(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).get(), + ) return row?.data } @@ -166,7 +170,7 @@ export namespace ShareNext { secret: share.secret, }), }) - db().delete(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).run() + Database.use((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).run()) } async function fullSync(sessionID: string) { diff --git a/packages/opencode/src/storage/db.ts b/packages/opencode/src/storage/db.ts index ed5859bd84..c21492f2ec 100644 --- a/packages/opencode/src/storage/db.ts +++ b/packages/opencode/src/storage/db.ts @@ -1,11 +1,13 @@ -import { Database } from "bun:sqlite" +import { Database as SqliteDatabase } from "bun:sqlite" import { drizzle } from "drizzle-orm/bun-sqlite" +import type { BunSQLiteDatabase } from "drizzle-orm/bun-sqlite" import { lazy } from "../util/lazy" import { Global } from "../global" import { Log } from "../util/log" import { migrations } from "./migrations.generated" import { migrateFromJson } from "./json-migration" import { NamedError } from "@opencode-ai/util/error" +import { Context } from "../util/context" import z from "zod" import path from "path" @@ -18,51 +20,103 @@ export const NotFoundError = NamedError.create( const log = Log.create({ service: "db" }) -export type DB = ReturnType +export namespace Database { + export type DB = BunSQLiteDatabase -const connection = lazy(() => { - const dbPath = path.join(Global.Path.data, "opencode.db") - log.info("opening database", { path: dbPath }) + const connection = lazy(() => { + const dbPath = path.join(Global.Path.data, "opencode.db") + log.info("opening database", { path: dbPath }) - const sqlite = new Database(dbPath, { create: true }) + const sqlite = new SqliteDatabase(dbPath, { create: true }) - sqlite.run("PRAGMA journal_mode = WAL") - sqlite.run("PRAGMA synchronous = NORMAL") - sqlite.run("PRAGMA busy_timeout = 5000") - sqlite.run("PRAGMA cache_size = -64000") - sqlite.run("PRAGMA foreign_keys = ON") + sqlite.run("PRAGMA journal_mode = WAL") + sqlite.run("PRAGMA synchronous = NORMAL") + sqlite.run("PRAGMA busy_timeout = 5000") + sqlite.run("PRAGMA cache_size = -64000") + sqlite.run("PRAGMA foreign_keys = ON") - migrate(sqlite) + migrate(sqlite) - // Run JSON migration asynchronously after schema is ready - migrateFromJson(sqlite).catch((e) => log.error("json migration failed", { error: e })) + // Run JSON migration after schema is ready + try { + migrateFromJson(sqlite) + } catch (e) { + log.error("json migration failed", { error: e }) + } - return drizzle(sqlite) -}) + return drizzle(sqlite) + }) -function migrate(sqlite: Database) { - sqlite.exec(` - CREATE TABLE IF NOT EXISTS _migrations ( - name TEXT PRIMARY KEY, - applied_at INTEGER NOT NULL + function migrate(sqlite: SqliteDatabase) { + sqlite.run(` + CREATE TABLE IF NOT EXISTS _migrations ( + name TEXT PRIMARY KEY, + applied_at INTEGER NOT NULL + ) + `) + + const applied = new Set( + sqlite + .query<{ name: string }, []>("SELECT name FROM _migrations") + .all() + .map((r) => r.name), ) - `) - const applied = new Set( - sqlite - .query<{ name: string }, []>("SELECT name FROM _migrations") - .all() - .map((r) => r.name), - ) + for (const migration of migrations) { + if (applied.has(migration.name)) continue + log.info("applying migration", { name: migration.name }) + sqlite.exec(migration.sql) + sqlite.run("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)", [migration.name, Date.now()]) + } + } - for (const migration of migrations) { - if (applied.has(migration.name)) continue - log.info("applying migration", { name: migration.name }) - sqlite.exec(migration.sql) - sqlite.run("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)", [migration.name, Date.now()]) + const TransactionContext = Context.create<{ + db: DB + effects: (() => void | Promise)[] + }>("database") + + export function use(callback: (db: DB) => T): T { + try { + const ctx = TransactionContext.use() + return callback(ctx.db) + } catch (err) { + if (err instanceof Context.NotFound) { + const effects: (() => void | Promise)[] = [] + const result = TransactionContext.provide({ db: connection(), effects }, () => callback(connection())) + for (const fx of effects) fx() + return result + } + throw err + } + } + + export function fn(callback: (input: Input, db: DB) => T) { + return (input: Input) => use((db) => callback(input, db)) + } + + export function effect(fx: () => void | Promise) { + try { + const ctx = TransactionContext.use() + ctx.effects.push(fx) + } catch { + fx() + } + } + + export function transaction(callback: (db: DB) => T): T { + try { + const ctx = TransactionContext.use() + return callback(ctx.db) + } catch (err) { + if (err instanceof Context.NotFound) { + const effects: (() => void | Promise)[] = [] + const result = connection().transaction((tx) => { + return TransactionContext.provide({ db: tx as unknown as DB, effects }, () => callback(tx as unknown as DB)) + }) + for (const fx of effects) fx() + return result + } + throw err + } } } - -export function db() { - return connection() -} diff --git a/packages/opencode/src/storage/json-migration.ts b/packages/opencode/src/storage/json-migration.ts index 6e2c72be95..a08bfcc084 100644 --- a/packages/opencode/src/storage/json-migration.ts +++ b/packages/opencode/src/storage/json-migration.ts @@ -4,31 +4,25 @@ import { eq } from "drizzle-orm" import { Global } from "../global" import { Log } from "../util/log" import { ProjectTable } from "../project/project.sql" -import { - SessionTable, - MessageTable, - PartTable, - SessionDiffTable, - TodoTable, - PermissionTable, -} from "../session/session.sql" +import { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../session/session.sql" import { SessionShareTable, ShareTable } from "../share/share.sql" import path from "path" +import fs from "fs" const log = Log.create({ service: "json-migration" }) -export async function migrateFromJson(sqlite: Database, customStorageDir?: string) { +export function migrateFromJson(sqlite: Database, customStorageDir?: string) { const storageDir = customStorageDir ?? path.join(Global.Path.data, "storage") const migrationMarker = path.join(storageDir, "sqlite-migrated") - if (await Bun.file(migrationMarker).exists()) { + if (fs.existsSync(migrationMarker)) { log.info("json migration already completed") return } - if (!(await Bun.file(path.join(storageDir, "migration")).exists())) { + if (!fs.existsSync(path.join(storageDir, "migration"))) { log.info("no json storage found, skipping migration") - await Bun.write(migrationMarker, Date.now().toString()) + fs.writeFileSync(migrationMarker, Date.now().toString()) return } @@ -49,9 +43,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate projects first (no FK deps) const projectGlob = new Bun.Glob("project/*.json") - for await (const file of projectGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of projectGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) if (!data.id) { stats.errors.push(`project missing id: ${file}`) continue @@ -80,9 +74,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate sessions (depends on projects) const sessionGlob = new Bun.Glob("session/*/*.json") - for await (const file of sessionGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of sessionGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) if (!data.id || !data.projectID) { stats.errors.push(`session missing id or projectID: ${file}`) continue @@ -128,9 +122,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate messages (depends on sessions) const messageGlob = new Bun.Glob("message/*/*.json") - for await (const file of messageGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of messageGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) if (!data.id || !data.sessionID) { stats.errors.push(`message missing id or sessionID: ${file}`) continue @@ -141,11 +135,12 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin log.warn("skipping orphaned message", { messageID: data.id, sessionID: data.sessionID }) continue } + const { id, sessionID, ...rest } = data db.insert(MessageTable) .values({ - id: data.id, - sessionID: data.sessionID, - data, + id, + sessionID, + data: rest, }) .onConflictDoNothing() .run() @@ -158,11 +153,11 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate parts (depends on messages) const partGlob = new Bun.Glob("part/*/*.json") - for await (const file of partGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of partGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() - if (!data.id || !data.messageID || !data.sessionID) { - stats.errors.push(`part missing id, messageID, or sessionID: ${file}`) + const data = JSON.parse(fs.readFileSync(file, "utf-8")) + if (!data.id || !data.messageID) { + stats.errors.push(`part missing id or messageID: ${file}`) continue } // Check if message exists @@ -171,12 +166,12 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin log.warn("skipping orphaned part", { partID: data.id, messageID: data.messageID }) continue } + const { id, messageID, sessionID: _, ...rest } = data db.insert(PartTable) .values({ - id: data.id, - messageID: data.messageID, - sessionID: data.sessionID, - data, + id, + messageID, + data: rest, }) .onConflictDoNothing() .run() @@ -187,11 +182,11 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin } log.info("migrated parts", { count: stats.parts }) - // Migrate session diffs + // Migrate session diffs (use raw SQL since TypeScript schema doesn't match migration) const diffGlob = new Bun.Glob("session_diff/*.json") - for await (const file of diffGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of diffGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) const sessionID = path.basename(file, ".json") // Check if session exists const session = db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get() @@ -199,7 +194,10 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin log.warn("skipping orphaned session_diff", { sessionID }) continue } - db.insert(SessionDiffTable).values({ sessionID, data }).onConflictDoNothing().run() + sqlite.run("INSERT OR IGNORE INTO session_diff (session_id, data) VALUES (?, ?)", [ + sessionID, + JSON.stringify(data), + ]) stats.diffs++ } catch (e) { stats.errors.push(`failed to migrate session_diff ${file}: ${e}`) @@ -209,9 +207,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate todos const todoGlob = new Bun.Glob("todo/*.json") - for await (const file of todoGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of todoGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) const sessionID = path.basename(file, ".json") const session = db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get() if (!session) { @@ -228,9 +226,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate permissions const permGlob = new Bun.Glob("permission/*.json") - for await (const file of permGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of permGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) const projectID = path.basename(file, ".json") const project = db.select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get() if (!project) { @@ -247,9 +245,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate session shares const shareGlob = new Bun.Glob("session_share/*.json") - for await (const file of shareGlob.scan({ cwd: storageDir, absolute: true })) { + for (const file of shareGlob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) const sessionID = path.basename(file, ".json") const session = db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get() if (!session) { @@ -266,9 +264,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin // Migrate shares (downloaded shared sessions, no FK) const share2Glob = new Bun.Glob("share/*.json") - for await (const file of share2Glob.scan({ cwd: storageDir, absolute: true })) { + for (const file of share2Glob.scanSync({ cwd: storageDir, absolute: true })) { try { - const data = await Bun.file(file).json() + const data = JSON.parse(fs.readFileSync(file, "utf-8")) const sessionID = path.basename(file, ".json") db.insert(ShareTable).values({ sessionID, data }).onConflictDoNothing().run() } catch (e) { @@ -277,7 +275,7 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin } // Mark migration complete - await Bun.write(migrationMarker, Date.now().toString()) + fs.writeFileSync(migrationMarker, Date.now().toString()) log.info("json migration complete", { projects: stats.projects, diff --git a/packages/opencode/src/storage/migrations.generated.ts b/packages/opencode/src/storage/migrations.generated.ts index eef6fdac9e..bb3286feb7 100644 --- a/packages/opencode/src/storage/migrations.generated.ts +++ b/packages/opencode/src/storage/migrations.generated.ts @@ -1,6 +1,6 @@ // Auto-generated - do not edit -import m0 from "../../migration/0000_vengeful_the_watchers.sql" with { type: "text" } +import m0 from "../../migration/0000_watery_shinobi_shaw.sql" with { type: "text" } export const migrations = [ - { name: "0000_vengeful_the_watchers", sql: m0 }, + { name: "0000_watery_shinobi_shaw", sql: m0 }, ] diff --git a/packages/opencode/test/project/project.test.ts b/packages/opencode/test/project/project.test.ts index 65d2cc7a3a..4eae324283 100644 --- a/packages/opencode/test/project/project.test.ts +++ b/packages/opencode/test/project/project.test.ts @@ -1,7 +1,7 @@ import { describe, expect, test } from "bun:test" import { Project } from "../../src/project/project" import { Log } from "../../src/util/log" -import { db } from "../../src/storage/db" +import { Database } from "../../src/storage/db" import { ProjectTable } from "../../src/project/project.sql" import { eq } from "drizzle-orm" import { $ } from "bun" @@ -101,7 +101,7 @@ describe("Project.discover", () => { await Project.discover(project) - const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get() + const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get()) const updated = row ? Project.fromRow(row) : undefined expect(updated?.icon).toBeDefined() expect(updated?.icon?.url).toStartWith("data:") @@ -117,7 +117,7 @@ describe("Project.discover", () => { await Project.discover(project) - const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get() + const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get()) const updated = row ? Project.fromRow(row) : undefined expect(updated?.icon).toBeUndefined() }) diff --git a/packages/opencode/test/storage/json-migration.test.ts b/packages/opencode/test/storage/json-migration.test.ts index 48840990fe..f918152cfd 100644 --- a/packages/opencode/test/storage/json-migration.test.ts +++ b/packages/opencode/test/storage/json-migration.test.ts @@ -8,14 +8,7 @@ import os from "os" import { migrateFromJson } from "../../src/storage/json-migration" import { ProjectTable } from "../../src/project/project.sql" import { Project } from "../../src/project/project" -import { - SessionTable, - MessageTable, - PartTable, - SessionDiffTable, - TodoTable, - PermissionTable, -} from "../../src/session/session.sql" +import { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../../src/session/session.sql" import { SessionShareTable, ShareTable } from "../../src/share/share.sql" import { migrations } from "../../src/storage/migrations.generated" @@ -333,7 +326,7 @@ describe("JSON to SQLite migration", () => { expect(stats?.messages).toBe(1) const db = drizzle(sqlite) const row = db.select().from(MessageTable).where(eq(MessageTable.id, fixtures.message.id)).get() - expect(row?.data.id).toBe(fixtures.message.id) + expect(row?.id).toBe(fixtures.message.id) expect(row?.sessionID).toBe(fixtures.session.id) }) @@ -390,9 +383,8 @@ describe("JSON to SQLite migration", () => { expect(stats?.parts).toBe(1) const db = drizzle(sqlite) const row = db.select().from(PartTable).where(eq(PartTable.id, fixtures.part.id)).get() - expect(row?.data.id).toBe(fixtures.part.id) + expect(row?.id).toBe(fixtures.part.id) expect(row?.messageID).toBe(fixtures.message.id) - expect(row?.sessionID).toBe(fixtures.session.id) }) test("skips orphaned part (missing message)", async () => { @@ -423,7 +415,7 @@ describe("JSON to SQLite migration", () => { expect(stats?.parts).toBe(0) expect(stats?.errors.length).toBe(1) - expect(stats?.errors[0]).toContain("missing id, messageID, or sessionID") + expect(stats?.errors[0]).toContain("missing id or messageID") }) }) @@ -434,15 +426,19 @@ describe("JSON to SQLite migration", () => { path.join(storageDir, "session", fixtures.project.id, `${fixtures.session.id}.json`), JSON.stringify(fixtures.session), ) - const diff = [{ file: "test.ts", before: "", after: "", additions: 10, deletions: 5 }] + const diff = [{ file: "test.ts", before: "", after: "console.log('hello')", additions: 10, deletions: 5 }] await Bun.write(path.join(storageDir, "session_diff", `${fixtures.session.id}.json`), JSON.stringify(diff)) const stats = await migrateFromJson(sqlite, storageDir) expect(stats?.diffs).toBe(1) - const db = drizzle(sqlite) - const row = db.select().from(SessionDiffTable).where(eq(SessionDiffTable.sessionID, fixtures.session.id)).get() + // Query raw since TypeScript schema doesn't match migration + const row = sqlite + .query<{ data: string }, [string]>("SELECT data FROM session_diff WHERE session_id = ?") + .get(fixtures.session.id) expect(row?.data).toBeDefined() + const data = JSON.parse(row!.data) + expect(data[0].file).toBe("test.ts") }) test("migrates todo correctly", async () => {