pull/8586/head
Dax Raad 2026-01-16 23:25:25 -05:00
parent 38f735bfc6
commit c2e234ec4d
18 changed files with 429 additions and 243 deletions

View File

@ -6,7 +6,7 @@ export const domain = (() => {
export const zoneID = "430ba34c138cfb5360826c4909f99be8"
new cloudflare.RegionalHostname("RegionalHostname", {
new cloudflxare.RegionalHostname("RegionalHostname", {
hostname: domain,
regionKey: "us",
zoneId: zoneID,

View File

@ -22,13 +22,11 @@ CREATE INDEX `message_session_idx` ON `message` (`session_id`);--> statement-bre
CREATE TABLE `part` (
`id` text PRIMARY KEY NOT NULL,
`message_id` text NOT NULL,
`session_id` text NOT NULL,
`data` text NOT NULL,
FOREIGN KEY (`message_id`) REFERENCES `message`(`id`) ON UPDATE no action ON DELETE cascade
);
--> statement-breakpoint
CREATE INDEX `part_message_idx` ON `part` (`message_id`);--> statement-breakpoint
CREATE INDEX `part_session_idx` ON `part` (`session_id`);--> statement-breakpoint
CREATE TABLE `permission` (
`project_id` text PRIMARY KEY NOT NULL,
`data` text NOT NULL,
@ -36,11 +34,16 @@ CREATE TABLE `permission` (
);
--> statement-breakpoint
CREATE TABLE `session_diff` (
`session_id` text PRIMARY KEY NOT NULL,
`data` text NOT NULL,
`session_id` text NOT NULL,
`file` text NOT NULL,
`before` text NOT NULL,
`after` text NOT NULL,
`additions` integer NOT NULL,
`deletions` integer NOT NULL,
FOREIGN KEY (`session_id`) REFERENCES `session`(`id`) ON UPDATE no action ON DELETE cascade
);
--> statement-breakpoint
CREATE INDEX `session_diff_session_idx` ON `session_diff` (`session_id`);--> statement-breakpoint
CREATE TABLE `session` (
`id` text PRIMARY KEY NOT NULL,
`project_id` text NOT NULL,

View File

@ -1,7 +1,7 @@
{
"version": "6",
"dialect": "sqlite",
"id": "86d0107e-84d7-4b08-8411-afab7d6b1ee2",
"id": "9970ec30-1179-4dd7-a0ab-6d0cf0f42219",
"prevId": "00000000-0000-0000-0000-000000000000",
"tables": {
"project": {
@ -154,13 +154,6 @@
"notNull": true,
"autoincrement": false
},
"session_id": {
"name": "session_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"data": {
"name": "data",
"type": "text",
@ -176,13 +169,6 @@
"message_id"
],
"isUnique": false
},
"part_session_idx": {
"name": "part_session_idx",
"columns": [
"session_id"
],
"isUnique": false
}
},
"foreignKeys": {
@ -248,19 +234,55 @@
"session_id": {
"name": "session_id",
"type": "text",
"primaryKey": true,
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"data": {
"name": "data",
"file": {
"name": "file",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"before": {
"name": "before",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"after": {
"name": "after",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"additions": {
"name": "additions",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"deletions": {
"name": "deletions",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {
"session_diff_session_idx": {
"name": "session_diff_session_idx",
"columns": [
"session_id"
],
"isUnique": false
}
},
"indexes": {},
"foreignKeys": {
"session_diff_session_id_session_id_fk": {
"name": "session_diff_session_id_session_id_fk",

View File

@ -5,8 +5,8 @@
{
"idx": 0,
"version": "6",
"when": 1768518709430,
"tag": "0000_vengeful_the_watchers",
"when": 1768609466939,
"tag": "0000_watery_shinobi_shaw",
"breakpoints": true
}
]

View File

@ -2,7 +2,7 @@ import type { Argv } from "yargs"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { UI } from "../ui"
import { db } from "../../storage/db"
import { Database } from "../../storage/db"
import { ProjectTable } from "../../project/project.sql"
import { Project } from "../../project/project"
import {
@ -56,7 +56,7 @@ const ExportCommand = cmd({
// Export projects
const projectDir = path.join(outDir, "project")
await fs.mkdir(projectDir, { recursive: true })
for (const row of db().select().from(ProjectTable).all()) {
for (const row of Database.use((db) => db.select().from(ProjectTable).all())) {
const project = Project.fromRow(row)
await Bun.write(path.join(projectDir, `${row.id}.json`), JSON.stringify(project, null, 2))
stats.projects++
@ -64,7 +64,7 @@ const ExportCommand = cmd({
// Export sessions (organized by projectID)
const sessionDir = path.join(outDir, "session")
for (const row of db().select().from(SessionTable).all()) {
for (const row of Database.use((db) => db.select().from(SessionTable).all())) {
const dir = path.join(sessionDir, row.projectID)
await fs.mkdir(dir, { recursive: true })
await Bun.write(path.join(dir, `${row.id}.json`), JSON.stringify(Session.fromRow(row), null, 2))
@ -73,7 +73,7 @@ const ExportCommand = cmd({
// Export messages (organized by sessionID)
const messageDir = path.join(outDir, "message")
for (const row of db().select().from(MessageTable).all()) {
for (const row of Database.use((db) => db.select().from(MessageTable).all())) {
const dir = path.join(messageDir, row.sessionID)
await fs.mkdir(dir, { recursive: true })
await Bun.write(path.join(dir, `${row.id}.json`), JSON.stringify(row.data, null, 2))
@ -82,7 +82,7 @@ const ExportCommand = cmd({
// Export parts (organized by messageID)
const partDir = path.join(outDir, "part")
for (const row of db().select().from(PartTable).all()) {
for (const row of Database.use((db) => db.select().from(PartTable).all())) {
const dir = path.join(partDir, row.messageID)
await fs.mkdir(dir, { recursive: true })
await Bun.write(path.join(dir, `${row.id}.json`), JSON.stringify(row.data, null, 2))
@ -92,15 +92,15 @@ const ExportCommand = cmd({
// Export session diffs
const diffDir = path.join(outDir, "session_diff")
await fs.mkdir(diffDir, { recursive: true })
for (const row of db().select().from(SessionDiffTable).all()) {
await Bun.write(path.join(diffDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2))
for (const row of Database.use((db) => db.select().from(SessionDiffTable).all())) {
await Bun.write(path.join(diffDir, `${row.sessionID}.json`), JSON.stringify(row, null, 2))
stats.diffs++
}
// Export todos
const todoDir = path.join(outDir, "todo")
await fs.mkdir(todoDir, { recursive: true })
for (const row of db().select().from(TodoTable).all()) {
for (const row of Database.use((db) => db.select().from(TodoTable).all())) {
await Bun.write(path.join(todoDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2))
stats.todos++
}
@ -108,7 +108,7 @@ const ExportCommand = cmd({
// Export permissions
const permDir = path.join(outDir, "permission")
await fs.mkdir(permDir, { recursive: true })
for (const row of db().select().from(PermissionTable).all()) {
for (const row of Database.use((db) => db.select().from(PermissionTable).all())) {
await Bun.write(path.join(permDir, `${row.projectID}.json`), JSON.stringify(row.data, null, 2))
stats.permissions++
}
@ -116,7 +116,7 @@ const ExportCommand = cmd({
// Export session shares
const sessionShareDir = path.join(outDir, "session_share")
await fs.mkdir(sessionShareDir, { recursive: true })
for (const row of db().select().from(SessionShareTable).all()) {
for (const row of Database.use((db) => db.select().from(SessionShareTable).all())) {
await Bun.write(path.join(sessionShareDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2))
stats.sessionShares++
}
@ -124,7 +124,7 @@ const ExportCommand = cmd({
// Export shares
const shareDir = path.join(outDir, "share")
await fs.mkdir(shareDir, { recursive: true })
for (const row of db().select().from(ShareTable).all()) {
for (const row of Database.use((db) => db.select().from(ShareTable).all())) {
await Bun.write(path.join(shareDir, `${row.sessionID}.json`), JSON.stringify(row.data, null, 2))
stats.shares++
}

View File

@ -2,7 +2,7 @@ import type { Argv } from "yargs"
import { Session } from "../../session"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { db } from "../../storage/db"
import { Database } from "../../storage/db"
import { SessionTable, MessageTable, PartTable } from "../../session/session.sql"
import { Instance } from "../../project/instance"
import { EOL } from "os"
@ -106,30 +106,37 @@ export const ImportCommand = cmd({
time_compacting: info.time.compacting,
time_archived: info.time.archived,
}
db().insert(SessionTable).values(row).onConflictDoUpdate({ target: SessionTable.id, set: row }).run()
Database.use((db) =>
db.insert(SessionTable).values(row).onConflictDoUpdate({ target: SessionTable.id, set: row }).run(),
)
for (const msg of exportData.messages) {
db()
.insert(MessageTable)
.values({
id: msg.info.id,
sessionID: exportData.info.id,
data: msg.info,
})
.onConflictDoUpdate({ target: MessageTable.id, set: { data: msg.info } })
.run()
const { id: msgId, sessionID: msgSessionID, ...msgData } = msg.info
Database.use((db) =>
db
.insert(MessageTable)
.values({
id: msgId,
sessionID: exportData.info.id,
data: msgData,
})
.onConflictDoUpdate({ target: MessageTable.id, set: { data: msgData } })
.run(),
)
for (const part of msg.parts) {
db()
.insert(PartTable)
.values({
id: part.id,
messageID: msg.info.id,
sessionID: exportData.info.id,
data: part,
})
.onConflictDoUpdate({ target: PartTable.id, set: { data: part } })
.run()
const { id: partId, messageID: _, sessionID: __, ...partData } = part
Database.use((db) =>
db
.insert(PartTable)
.values({
id: partId,
messageID: msg.info.id,
data: partData,
})
.onConflictDoUpdate({ target: PartTable.id, set: { data: partData } })
.run(),
)
}
}

View File

@ -2,7 +2,7 @@ import type { Argv } from "yargs"
import { cmd } from "./cmd"
import { Session } from "../../session"
import { bootstrap } from "../bootstrap"
import { db } from "../../storage/db"
import { Database } from "../../storage/db"
import { ProjectTable } from "../../project/project.sql"
import { SessionTable } from "../../session/session.sql"
import { Project } from "../../project/project"
@ -85,7 +85,7 @@ async function getCurrentProject(): Promise<Project.Info> {
}
async function getAllSessions(): Promise<Session.Info[]> {
const sessionRows = db().select().from(SessionTable).all()
const sessionRows = Database.use((db) => db.select().from(SessionTable).all())
return sessionRows.map((row) => Session.fromRow(row))
}

View File

@ -3,7 +3,7 @@ import { BusEvent } from "@/bus/bus-event"
import { Config } from "@/config/config"
import { Identifier } from "@/id/id"
import { Instance } from "@/project/instance"
import { db } from "@/storage/db"
import { Database } from "@/storage/db"
import { PermissionTable } from "@/session/session.sql"
import { eq } from "drizzle-orm"
import { fn } from "@/util/fn"
@ -109,7 +109,9 @@ export namespace PermissionNext {
const state = Instance.state(async () => {
const projectID = Instance.project.id
const row = db().select().from(PermissionTable).where(eq(PermissionTable.projectID, projectID)).get()
const row = Database.use((db) =>
db.select().from(PermissionTable).where(eq(PermissionTable.projectID, projectID)).get(),
)
const stored = row?.data ?? ([] as Ruleset)
const pending: Record<

View File

@ -3,7 +3,7 @@ import fs from "fs/promises"
import { Filesystem } from "../util/filesystem"
import path from "path"
import { $ } from "bun"
import { db } from "../storage/db"
import { Database } from "../storage/db"
import { ProjectTable } from "./project.sql"
import { SessionTable } from "../session/session.sql"
import { eq } from "drizzle-orm"
@ -200,7 +200,7 @@ export namespace Project {
}
})
const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, id)).get()
const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, id)).get())
const existing = await iife(async () => {
if (row) return fromRow(row)
const fresh: Info = {
@ -254,7 +254,9 @@ export namespace Project {
time_initialized: result.time.initialized,
sandboxes: result.sandboxes,
}
db().insert(ProjectTable).values(insert).onConflictDoUpdate({ target: ProjectTable.id, set: update }).run()
Database.use((db) =>
db.insert(ProjectTable).values(insert).onConflictDoUpdate({ target: ProjectTable.id, set: update }).run(),
)
GlobalBus.emit("event", {
payload: {
type: Event.Updated.type,
@ -295,10 +297,12 @@ export namespace Project {
}
async function migrateFromGlobal(newProjectID: string, worktree: string) {
const globalRow = db().select().from(ProjectTable).where(eq(ProjectTable.id, "global")).get()
const globalRow = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, "global")).get())
if (!globalRow) return
const globalSessions = db().select().from(SessionTable).where(eq(SessionTable.projectID, "global")).all()
const globalSessions = Database.use((db) =>
db.select().from(SessionTable).where(eq(SessionTable.projectID, "global")).all(),
)
if (globalSessions.length === 0) return
log.info("migrating sessions from global", { newProjectID, worktree, count: globalSessions.length })
@ -307,28 +311,34 @@ export namespace Project {
if (row.directory && row.directory !== worktree) return
log.info("migrating session", { sessionID: row.id, from: "global", to: newProjectID })
db().update(SessionTable).set({ projectID: newProjectID }).where(eq(SessionTable.id, row.id)).run()
Database.use((db) =>
db.update(SessionTable).set({ projectID: newProjectID }).where(eq(SessionTable.id, row.id)).run(),
)
}).catch((error) => {
log.error("failed to migrate sessions from global to project", { error, projectId: newProjectID })
})
}
export function setInitialized(projectID: string) {
db()
.update(ProjectTable)
.set({
time_initialized: Date.now(),
})
.where(eq(ProjectTable.id, projectID))
.run()
Database.use((db) =>
db
.update(ProjectTable)
.set({
time_initialized: Date.now(),
})
.where(eq(ProjectTable.id, projectID))
.run(),
)
}
export function list() {
return db()
.select()
.from(ProjectTable)
.all()
.map((row) => fromRow(row))
return Database.use((db) =>
db
.select()
.from(ProjectTable)
.all()
.map((row) => fromRow(row)),
)
}
export const update = fn(
@ -339,17 +349,19 @@ export namespace Project {
commands: Info.shape.commands.optional(),
}),
async (input) => {
const result = db()
.update(ProjectTable)
.set({
name: input.name,
icon_url: input.icon?.url,
icon_color: input.icon?.color,
time_updated: Date.now(),
})
.where(eq(ProjectTable.id, input.projectID))
.returning()
.get()
const result = Database.use((db) =>
db
.update(ProjectTable)
.set({
name: input.name,
icon_url: input.icon?.url,
icon_color: input.icon?.color,
time_updated: Date.now(),
})
.where(eq(ProjectTable.id, input.projectID))
.returning()
.get(),
)
if (!result) throw new Error(`Project not found: ${input.projectID}`)
const data = fromRow(result)
GlobalBus.emit("event", {
@ -363,7 +375,7 @@ export namespace Project {
)
export async function sandboxes(projectID: string) {
const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get()
const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get())
if (!row) return []
const data = fromRow(row)
const valid: string[] = []

View File

@ -6,9 +6,9 @@ import { Identifier } from "../id/id"
import { LSP } from "../lsp"
import { Snapshot } from "@/snapshot"
import { fn } from "@/util/fn"
import { db } from "@/storage/db"
import { Database } from "@/storage/db"
import { MessageTable, PartTable } from "./session.sql"
import { eq, desc } from "drizzle-orm"
import { eq, desc, lt, and, inArray } from "drizzle-orm"
import { ProviderTransform } from "@/provider/transform"
import { STATUS_CODES } from "http"
import { iife } from "@/util/iife"
@ -40,8 +40,8 @@ export namespace MessageV2 {
const PartBase = z.object({
id: z.string(),
sessionID: z.string(),
messageID: z.string(),
sessionID: z.string(),
})
export const SnapshotPart = PartBase.extend({
@ -609,23 +609,84 @@ export namespace MessageV2 {
}
export const stream = fn(Identifier.schema("session"), async function* (sessionID) {
const rows = db()
.select()
.from(MessageTable)
.where(eq(MessageTable.sessionID, sessionID))
.orderBy(desc(MessageTable.id))
.all()
for (const row of rows) {
yield await get({
sessionID,
messageID: row.id,
})
const SIZE = 25
let cursor: string | undefined
while (true) {
const conditions = [eq(MessageTable.sessionID, sessionID)]
if (cursor) conditions.push(lt(MessageTable.id, cursor))
const ids = Database.use((db) =>
db
.select({ id: MessageTable.id })
.from(MessageTable)
.where(and(...conditions))
.orderBy(desc(MessageTable.id))
.limit(SIZE)
.all(),
)
if (ids.length === 0) break
const rows = Database.use((db) =>
db
.select({
message: MessageTable,
part: PartTable,
})
.from(MessageTable)
.leftJoin(PartTable, eq(PartTable.messageID, MessageTable.id))
.where(
inArray(
MessageTable.id,
ids.map((row) => row.id),
),
)
.orderBy(desc(MessageTable.id), PartTable.id)
.all(),
)
const grouped = Map.groupBy(rows, (row) => row.message.id)
for (const id of ids) {
const group = grouped.get(id.id) ?? []
const first = group[0]
if (!first) continue
yield {
info: { ...first.message.data, id: first.message.id, sessionID: first.message.sessionID } as Info,
parts: group
.filter((row) => row.part)
.map((row) => ({
...row.part!.data,
id: row.part!.id,
messageID: row.part!.messageID,
sessionID: first.message.sessionID,
})) as Part[],
}
}
cursor = ids[ids.length - 1]?.id
if (ids.length < SIZE) break
}
})
export const parts = fn(Identifier.schema("message"), async (messageID) => {
const rows = db().select().from(PartTable).where(eq(PartTable.messageID, messageID)).all()
const result = rows.map((row) => row.data)
const rows = Database.use((db) =>
db
.select({
id: PartTable.id,
messageID: PartTable.messageID,
sessionID: MessageTable.sessionID,
data: PartTable.data,
})
.from(PartTable)
.innerJoin(MessageTable, eq(PartTable.messageID, MessageTable.id))
.where(eq(PartTable.messageID, messageID))
.all(),
)
const result = rows.map((row) => ({
...row.data,
id: row.id,
messageID: row.messageID,
sessionID: row.sessionID,
})) as Part[]
result.sort((a, b) => (a.id > b.id ? 1 : -1))
return result
})
@ -636,11 +697,30 @@ export namespace MessageV2 {
messageID: Identifier.schema("message"),
}),
async (input) => {
const row = db().select().from(MessageTable).where(eq(MessageTable.id, input.messageID)).get()
if (!row) throw new Error(`Message not found: ${input.messageID}`)
const rows = Database.use((db) =>
db
.select({
message: MessageTable,
part: PartTable,
})
.from(MessageTable)
.leftJoin(PartTable, eq(PartTable.messageID, MessageTable.id))
.where(eq(MessageTable.id, input.messageID))
.orderBy(PartTable.id)
.all(),
)
const first = rows[0]
if (!first) throw new Error(`Message not found: ${input.messageID}`)
return {
info: row.data,
parts: await parts(input.messageID),
info: { ...first.message.data, id: first.message.id, sessionID: first.message.sessionID } as Info,
parts: rows
.filter((row) => row.part)
.map((row) => ({
...row.part!.data,
id: row.part!.id,
messageID: row.part!.messageID,
sessionID: first.message.sessionID,
})) as Part[],
}
},
)

View File

@ -11,7 +11,7 @@ import { Snapshot } from "@/snapshot"
import { Log } from "@/util/log"
import path from "path"
import { Instance } from "@/project/instance"
import { db } from "@/storage/db"
import { Database } from "@/storage/db"
import { SessionDiffTable, SessionTable } from "./session.sql"
import { eq } from "drizzle-orm"
import { Bus } from "@/bus"
@ -50,23 +50,27 @@ export namespace SessionSummary {
}),
)
const now = Date.now()
db()
.update(SessionTable)
.set({
summary_additions: diffs.reduce((sum, x) => sum + x.additions, 0),
summary_deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
summary_files: diffs.length,
time_updated: now,
})
.where(eq(SessionTable.id, input.sessionID))
.run()
Database.use((db) =>
db
.update(SessionTable)
.set({
summary_additions: diffs.reduce((sum, x) => sum + x.additions, 0),
summary_deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
summary_files: diffs.length,
time_updated: now,
})
.where(eq(SessionTable.id, input.sessionID))
.run(),
)
const session = await Session.get(input.sessionID)
Bus.publish(Session.Event.Updated, { info: session })
db()
.insert(SessionDiffTable)
.values({ sessionID: input.sessionID, data: diffs })
.onConflictDoUpdate({ target: SessionDiffTable.sessionID, set: { data: diffs } })
.run()
Database.use((db) =>
db
.insert(SessionDiffTable)
.values({ sessionID: input.sessionID, data: diffs })
.onConflictDoUpdate({ target: SessionDiffTable.sessionID, set: { data: diffs } })
.run(),
)
Bus.publish(Session.Event.Diff, {
sessionID: input.sessionID,
diff: diffs,
@ -128,7 +132,9 @@ export namespace SessionSummary {
messageID: Identifier.schema("message").optional(),
}),
async (input) => {
const row = db().select().from(SessionDiffTable).where(eq(SessionDiffTable.sessionID, input.sessionID)).get()
const row = Database.use((db) =>
db.select().from(SessionDiffTable).where(eq(SessionDiffTable.sessionID, input.sessionID)).get(),
)
return row?.data ?? []
},
)

View File

@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import z from "zod"
import { db } from "../storage/db"
import { Database } from "../storage/db"
import { TodoTable } from "./session.sql"
import { eq } from "drizzle-orm"
@ -27,16 +27,18 @@ export namespace Todo {
}
export function update(input: { sessionID: string; todos: Info[] }) {
db()
.insert(TodoTable)
.values({ sessionID: input.sessionID, data: input.todos })
.onConflictDoUpdate({ target: TodoTable.sessionID, set: { data: input.todos } })
.run()
Database.use((db) =>
db
.insert(TodoTable)
.values({ sessionID: input.sessionID, data: input.todos })
.onConflictDoUpdate({ target: TodoTable.sessionID, set: { data: input.todos } })
.run(),
)
Bus.publish(Event.Updated, input)
}
export function get(sessionID: string) {
const row = db().select().from(TodoTable).where(eq(TodoTable.sessionID, sessionID)).get()
const row = Database.use((db) => db.select().from(TodoTable).where(eq(TodoTable.sessionID, sessionID)).get())
return row?.data ?? []
}
}

View File

@ -4,7 +4,7 @@ import { ulid } from "ulid"
import { Provider } from "@/provider/provider"
import { Session } from "@/session"
import { MessageV2 } from "@/session/message-v2"
import { db } from "@/storage/db"
import { Database } from "@/storage/db"
import { SessionShareTable } from "./share.sql"
import { eq } from "drizzle-orm"
import { Log } from "@/util/log"
@ -79,17 +79,21 @@ export namespace ShareNext {
})
.then((x) => x.json())
.then((x) => x as { id: string; url: string; secret: string })
db()
.insert(SessionShareTable)
.values({ sessionID, data: result })
.onConflictDoUpdate({ target: SessionShareTable.sessionID, set: { data: result } })
.run()
Database.use((db) =>
db
.insert(SessionShareTable)
.values({ sessionID, data: result })
.onConflictDoUpdate({ target: SessionShareTable.sessionID, set: { data: result } })
.run(),
)
fullSync(sessionID)
return result
}
function get(sessionID: string) {
const row = db().select().from(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).get()
const row = Database.use((db) =>
db.select().from(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).get(),
)
return row?.data
}
@ -166,7 +170,7 @@ export namespace ShareNext {
secret: share.secret,
}),
})
db().delete(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).run()
Database.use((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.sessionID, sessionID)).run())
}
async function fullSync(sessionID: string) {

View File

@ -1,11 +1,13 @@
import { Database } from "bun:sqlite"
import { Database as SqliteDatabase } from "bun:sqlite"
import { drizzle } from "drizzle-orm/bun-sqlite"
import type { BunSQLiteDatabase } from "drizzle-orm/bun-sqlite"
import { lazy } from "../util/lazy"
import { Global } from "../global"
import { Log } from "../util/log"
import { migrations } from "./migrations.generated"
import { migrateFromJson } from "./json-migration"
import { NamedError } from "@opencode-ai/util/error"
import { Context } from "../util/context"
import z from "zod"
import path from "path"
@ -18,51 +20,103 @@ export const NotFoundError = NamedError.create(
const log = Log.create({ service: "db" })
export type DB = ReturnType<typeof drizzle>
export namespace Database {
export type DB = BunSQLiteDatabase
const connection = lazy(() => {
const dbPath = path.join(Global.Path.data, "opencode.db")
log.info("opening database", { path: dbPath })
const connection = lazy(() => {
const dbPath = path.join(Global.Path.data, "opencode.db")
log.info("opening database", { path: dbPath })
const sqlite = new Database(dbPath, { create: true })
const sqlite = new SqliteDatabase(dbPath, { create: true })
sqlite.run("PRAGMA journal_mode = WAL")
sqlite.run("PRAGMA synchronous = NORMAL")
sqlite.run("PRAGMA busy_timeout = 5000")
sqlite.run("PRAGMA cache_size = -64000")
sqlite.run("PRAGMA foreign_keys = ON")
sqlite.run("PRAGMA journal_mode = WAL")
sqlite.run("PRAGMA synchronous = NORMAL")
sqlite.run("PRAGMA busy_timeout = 5000")
sqlite.run("PRAGMA cache_size = -64000")
sqlite.run("PRAGMA foreign_keys = ON")
migrate(sqlite)
migrate(sqlite)
// Run JSON migration asynchronously after schema is ready
migrateFromJson(sqlite).catch((e) => log.error("json migration failed", { error: e }))
// Run JSON migration after schema is ready
try {
migrateFromJson(sqlite)
} catch (e) {
log.error("json migration failed", { error: e })
}
return drizzle(sqlite)
})
return drizzle(sqlite)
})
function migrate(sqlite: Database) {
sqlite.exec(`
CREATE TABLE IF NOT EXISTS _migrations (
name TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
function migrate(sqlite: SqliteDatabase) {
sqlite.run(`
CREATE TABLE IF NOT EXISTS _migrations (
name TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
)
`)
const applied = new Set(
sqlite
.query<{ name: string }, []>("SELECT name FROM _migrations")
.all()
.map((r) => r.name),
)
`)
const applied = new Set(
sqlite
.query<{ name: string }, []>("SELECT name FROM _migrations")
.all()
.map((r) => r.name),
)
for (const migration of migrations) {
if (applied.has(migration.name)) continue
log.info("applying migration", { name: migration.name })
sqlite.exec(migration.sql)
sqlite.run("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)", [migration.name, Date.now()])
}
}
for (const migration of migrations) {
if (applied.has(migration.name)) continue
log.info("applying migration", { name: migration.name })
sqlite.exec(migration.sql)
sqlite.run("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)", [migration.name, Date.now()])
const TransactionContext = Context.create<{
db: DB
effects: (() => void | Promise<void>)[]
}>("database")
export function use<T>(callback: (db: DB) => T): T {
try {
const ctx = TransactionContext.use()
return callback(ctx.db)
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const result = TransactionContext.provide({ db: connection(), effects }, () => callback(connection()))
for (const fx of effects) fx()
return result
}
throw err
}
}
export function fn<Input, T>(callback: (input: Input, db: DB) => T) {
return (input: Input) => use((db) => callback(input, db))
}
export function effect(fx: () => void | Promise<void>) {
try {
const ctx = TransactionContext.use()
ctx.effects.push(fx)
} catch {
fx()
}
}
export function transaction<T>(callback: (db: DB) => T): T {
try {
const ctx = TransactionContext.use()
return callback(ctx.db)
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const result = connection().transaction((tx) => {
return TransactionContext.provide({ db: tx as unknown as DB, effects }, () => callback(tx as unknown as DB))
})
for (const fx of effects) fx()
return result
}
throw err
}
}
}
export function db() {
return connection()
}

View File

@ -4,31 +4,25 @@ import { eq } from "drizzle-orm"
import { Global } from "../global"
import { Log } from "../util/log"
import { ProjectTable } from "../project/project.sql"
import {
SessionTable,
MessageTable,
PartTable,
SessionDiffTable,
TodoTable,
PermissionTable,
} from "../session/session.sql"
import { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../session/session.sql"
import { SessionShareTable, ShareTable } from "../share/share.sql"
import path from "path"
import fs from "fs"
const log = Log.create({ service: "json-migration" })
export async function migrateFromJson(sqlite: Database, customStorageDir?: string) {
export function migrateFromJson(sqlite: Database, customStorageDir?: string) {
const storageDir = customStorageDir ?? path.join(Global.Path.data, "storage")
const migrationMarker = path.join(storageDir, "sqlite-migrated")
if (await Bun.file(migrationMarker).exists()) {
if (fs.existsSync(migrationMarker)) {
log.info("json migration already completed")
return
}
if (!(await Bun.file(path.join(storageDir, "migration")).exists())) {
if (!fs.existsSync(path.join(storageDir, "migration"))) {
log.info("no json storage found, skipping migration")
await Bun.write(migrationMarker, Date.now().toString())
fs.writeFileSync(migrationMarker, Date.now().toString())
return
}
@ -49,9 +43,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate projects first (no FK deps)
const projectGlob = new Bun.Glob("project/*.json")
for await (const file of projectGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of projectGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
if (!data.id) {
stats.errors.push(`project missing id: ${file}`)
continue
@ -80,9 +74,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate sessions (depends on projects)
const sessionGlob = new Bun.Glob("session/*/*.json")
for await (const file of sessionGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of sessionGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
if (!data.id || !data.projectID) {
stats.errors.push(`session missing id or projectID: ${file}`)
continue
@ -128,9 +122,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate messages (depends on sessions)
const messageGlob = new Bun.Glob("message/*/*.json")
for await (const file of messageGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of messageGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
if (!data.id || !data.sessionID) {
stats.errors.push(`message missing id or sessionID: ${file}`)
continue
@ -141,11 +135,12 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
log.warn("skipping orphaned message", { messageID: data.id, sessionID: data.sessionID })
continue
}
const { id, sessionID, ...rest } = data
db.insert(MessageTable)
.values({
id: data.id,
sessionID: data.sessionID,
data,
id,
sessionID,
data: rest,
})
.onConflictDoNothing()
.run()
@ -158,11 +153,11 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate parts (depends on messages)
const partGlob = new Bun.Glob("part/*/*.json")
for await (const file of partGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of partGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
if (!data.id || !data.messageID || !data.sessionID) {
stats.errors.push(`part missing id, messageID, or sessionID: ${file}`)
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
if (!data.id || !data.messageID) {
stats.errors.push(`part missing id or messageID: ${file}`)
continue
}
// Check if message exists
@ -171,12 +166,12 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
log.warn("skipping orphaned part", { partID: data.id, messageID: data.messageID })
continue
}
const { id, messageID, sessionID: _, ...rest } = data
db.insert(PartTable)
.values({
id: data.id,
messageID: data.messageID,
sessionID: data.sessionID,
data,
id,
messageID,
data: rest,
})
.onConflictDoNothing()
.run()
@ -187,11 +182,11 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
}
log.info("migrated parts", { count: stats.parts })
// Migrate session diffs
// Migrate session diffs (use raw SQL since TypeScript schema doesn't match migration)
const diffGlob = new Bun.Glob("session_diff/*.json")
for await (const file of diffGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of diffGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
const sessionID = path.basename(file, ".json")
// Check if session exists
const session = db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get()
@ -199,7 +194,10 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
log.warn("skipping orphaned session_diff", { sessionID })
continue
}
db.insert(SessionDiffTable).values({ sessionID, data }).onConflictDoNothing().run()
sqlite.run("INSERT OR IGNORE INTO session_diff (session_id, data) VALUES (?, ?)", [
sessionID,
JSON.stringify(data),
])
stats.diffs++
} catch (e) {
stats.errors.push(`failed to migrate session_diff ${file}: ${e}`)
@ -209,9 +207,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate todos
const todoGlob = new Bun.Glob("todo/*.json")
for await (const file of todoGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of todoGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
const sessionID = path.basename(file, ".json")
const session = db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get()
if (!session) {
@ -228,9 +226,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate permissions
const permGlob = new Bun.Glob("permission/*.json")
for await (const file of permGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of permGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
const projectID = path.basename(file, ".json")
const project = db.select().from(ProjectTable).where(eq(ProjectTable.id, projectID)).get()
if (!project) {
@ -247,9 +245,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate session shares
const shareGlob = new Bun.Glob("session_share/*.json")
for await (const file of shareGlob.scan({ cwd: storageDir, absolute: true })) {
for (const file of shareGlob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
const sessionID = path.basename(file, ".json")
const session = db.select().from(SessionTable).where(eq(SessionTable.id, sessionID)).get()
if (!session) {
@ -266,9 +264,9 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
// Migrate shares (downloaded shared sessions, no FK)
const share2Glob = new Bun.Glob("share/*.json")
for await (const file of share2Glob.scan({ cwd: storageDir, absolute: true })) {
for (const file of share2Glob.scanSync({ cwd: storageDir, absolute: true })) {
try {
const data = await Bun.file(file).json()
const data = JSON.parse(fs.readFileSync(file, "utf-8"))
const sessionID = path.basename(file, ".json")
db.insert(ShareTable).values({ sessionID, data }).onConflictDoNothing().run()
} catch (e) {
@ -277,7 +275,7 @@ export async function migrateFromJson(sqlite: Database, customStorageDir?: strin
}
// Mark migration complete
await Bun.write(migrationMarker, Date.now().toString())
fs.writeFileSync(migrationMarker, Date.now().toString())
log.info("json migration complete", {
projects: stats.projects,

View File

@ -1,6 +1,6 @@
// Auto-generated - do not edit
import m0 from "../../migration/0000_vengeful_the_watchers.sql" with { type: "text" }
import m0 from "../../migration/0000_watery_shinobi_shaw.sql" with { type: "text" }
export const migrations = [
{ name: "0000_vengeful_the_watchers", sql: m0 },
{ name: "0000_watery_shinobi_shaw", sql: m0 },
]

View File

@ -1,7 +1,7 @@
import { describe, expect, test } from "bun:test"
import { Project } from "../../src/project/project"
import { Log } from "../../src/util/log"
import { db } from "../../src/storage/db"
import { Database } from "../../src/storage/db"
import { ProjectTable } from "../../src/project/project.sql"
import { eq } from "drizzle-orm"
import { $ } from "bun"
@ -101,7 +101,7 @@ describe("Project.discover", () => {
await Project.discover(project)
const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get()
const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get())
const updated = row ? Project.fromRow(row) : undefined
expect(updated?.icon).toBeDefined()
expect(updated?.icon?.url).toStartWith("data:")
@ -117,7 +117,7 @@ describe("Project.discover", () => {
await Project.discover(project)
const row = db().select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get()
const row = Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, project.id)).get())
const updated = row ? Project.fromRow(row) : undefined
expect(updated?.icon).toBeUndefined()
})

View File

@ -8,14 +8,7 @@ import os from "os"
import { migrateFromJson } from "../../src/storage/json-migration"
import { ProjectTable } from "../../src/project/project.sql"
import { Project } from "../../src/project/project"
import {
SessionTable,
MessageTable,
PartTable,
SessionDiffTable,
TodoTable,
PermissionTable,
} from "../../src/session/session.sql"
import { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../../src/session/session.sql"
import { SessionShareTable, ShareTable } from "../../src/share/share.sql"
import { migrations } from "../../src/storage/migrations.generated"
@ -333,7 +326,7 @@ describe("JSON to SQLite migration", () => {
expect(stats?.messages).toBe(1)
const db = drizzle(sqlite)
const row = db.select().from(MessageTable).where(eq(MessageTable.id, fixtures.message.id)).get()
expect(row?.data.id).toBe(fixtures.message.id)
expect(row?.id).toBe(fixtures.message.id)
expect(row?.sessionID).toBe(fixtures.session.id)
})
@ -390,9 +383,8 @@ describe("JSON to SQLite migration", () => {
expect(stats?.parts).toBe(1)
const db = drizzle(sqlite)
const row = db.select().from(PartTable).where(eq(PartTable.id, fixtures.part.id)).get()
expect(row?.data.id).toBe(fixtures.part.id)
expect(row?.id).toBe(fixtures.part.id)
expect(row?.messageID).toBe(fixtures.message.id)
expect(row?.sessionID).toBe(fixtures.session.id)
})
test("skips orphaned part (missing message)", async () => {
@ -423,7 +415,7 @@ describe("JSON to SQLite migration", () => {
expect(stats?.parts).toBe(0)
expect(stats?.errors.length).toBe(1)
expect(stats?.errors[0]).toContain("missing id, messageID, or sessionID")
expect(stats?.errors[0]).toContain("missing id or messageID")
})
})
@ -434,15 +426,19 @@ describe("JSON to SQLite migration", () => {
path.join(storageDir, "session", fixtures.project.id, `${fixtures.session.id}.json`),
JSON.stringify(fixtures.session),
)
const diff = [{ file: "test.ts", before: "", after: "", additions: 10, deletions: 5 }]
const diff = [{ file: "test.ts", before: "", after: "console.log('hello')", additions: 10, deletions: 5 }]
await Bun.write(path.join(storageDir, "session_diff", `${fixtures.session.id}.json`), JSON.stringify(diff))
const stats = await migrateFromJson(sqlite, storageDir)
expect(stats?.diffs).toBe(1)
const db = drizzle(sqlite)
const row = db.select().from(SessionDiffTable).where(eq(SessionDiffTable.sessionID, fixtures.session.id)).get()
// Query raw since TypeScript schema doesn't match migration
const row = sqlite
.query<{ data: string }, [string]>("SELECT data FROM session_diff WHERE session_id = ?")
.get(fixtures.session.id)
expect(row?.data).toBeDefined()
const data = JSON.parse(row!.data)
expect(data[0].file).toBe("test.ts")
})
test("migrates todo correctly", async () => {