From 26fb6b87882fa054b69463be75ffef1cdeda2629 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 1 Apr 2026 19:48:36 -0400 Subject: [PATCH] refactor: add Effect-returning versions of MessageV2 functions (#20374) --- packages/opencode/src/session/compaction.ts | 2 +- packages/opencode/src/session/index.ts | 13 +- packages/opencode/src/session/message-v2.ts | 156 ++-- packages/opencode/src/session/processor.ts | 4 +- packages/opencode/src/session/prompt.ts | 4 +- .../test/session/messages-pagination.test.ts | 793 +++++++++++++++++- .../test/session/processor-effect.test.ts | 16 +- .../test/session/prompt-effect.test.ts | 4 +- 8 files changed, 878 insertions(+), 114 deletions(-) diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 999a37b122..e48b1c7b08 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template: const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n") const msgs = structuredClone(messages) yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) - const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) + const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true }) const ctx = yield* InstanceState.context const msg: MessageV2.Assistant = { id: MessageID.ascending(), diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 5ed5acafaf..41fad1a9d4 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -593,15 +593,10 @@ export namespace Session { }) const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) { - return yield* Effect.promise(async () => { - const result = [] as MessageV2.WithParts[] - for await (const msg of MessageV2.stream(input.sessionID)) { - if (input.limit && result.length >= input.limit) break - result.push(msg) - } - result.reverse() - return result - }) + if (input.limit) { + return MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items + } + return Array.from(MessageV2.stream(input.sessionID)).reverse() }) const removeMessage = Effect.fn("Session.removeMessage")(function* (input: { diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 7260a8af2e..0bcdb7aebe 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -5,7 +5,6 @@ import { NamedError } from "@opencode-ai/util/error" import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessage, type UIMessage } from "ai" import { LSP } from "../lsp" import { Snapshot } from "@/snapshot" -import { fn } from "@/util/fn" import { SyncEvent } from "../sync" import { Database, NotFoundError, and, desc, eq, inArray, lt, or } from "@/storage/db" import { MessageTable, PartTable, SessionTable } from "./session.sql" @@ -15,6 +14,7 @@ import { errorMessage } from "@/util/error" import type { SystemError } from "bun" import type { Provider } from "@/provider/provider" import { ModelID, ProviderID } from "@/provider/schema" +import { Effect } from "effect" /** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */ interface FetchDecompressionError extends Error { @@ -547,7 +547,7 @@ export namespace MessageV2 { and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)), ) - async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) { + function hydrate(rows: (typeof MessageTable.$inferSelect)[]) { const ids = rows.map((row) => row.id) const partByMessage = new Map() if (ids.length > 0) { @@ -573,11 +573,11 @@ export namespace MessageV2 { })) } - export async function toModelMessages( + export const toModelMessagesEffect = Effect.fnUntraced(function* ( input: WithParts[], model: Provider.Model, options?: { stripMedia?: boolean }, - ): Promise { + ) { const result: UIMessage[] = [] const toolNames = new Set() // Track media from tool results that need to be injected as user messages @@ -800,64 +800,67 @@ export namespace MessageV2 { const tools = Object.fromEntries(Array.from(toolNames).map((toolName) => [toolName, { toModelOutput }])) - return await convertToModelMessages( - result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")), - { - //@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput) - tools, - }, + return yield* Effect.promise(() => + convertToModelMessages( + result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")), + { + //@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput) + tools, + }, + ), ) + }) + + export function toModelMessages( + input: WithParts[], + model: Provider.Model, + options?: { stripMedia?: boolean }, + ): Promise { + return Effect.runPromise(toModelMessagesEffect(input, model, options)) } - export const page = fn( - z.object({ - sessionID: SessionID.zod, - limit: z.number().int().positive(), - before: z.string().optional(), - }), - async (input) => { - const before = input.before ? cursor.decode(input.before) : undefined - const where = before - ? and(eq(MessageTable.session_id, input.sessionID), older(before)) - : eq(MessageTable.session_id, input.sessionID) - const rows = Database.use((db) => - db - .select() - .from(MessageTable) - .where(where) - .orderBy(desc(MessageTable.time_created), desc(MessageTable.id)) - .limit(input.limit + 1) - .all(), + export function page(input: { sessionID: SessionID; limit: number; before?: string }) { + const before = input.before ? cursor.decode(input.before) : undefined + const where = before + ? and(eq(MessageTable.session_id, input.sessionID), older(before)) + : eq(MessageTable.session_id, input.sessionID) + const rows = Database.use((db) => + db + .select() + .from(MessageTable) + .where(where) + .orderBy(desc(MessageTable.time_created), desc(MessageTable.id)) + .limit(input.limit + 1) + .all(), + ) + if (rows.length === 0) { + const row = Database.use((db) => + db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(), ) - if (rows.length === 0) { - const row = Database.use((db) => - db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(), - ) - if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` }) - return { - items: [] as MessageV2.WithParts[], - more: false, - } - } - - const more = rows.length > input.limit - const page = more ? rows.slice(0, input.limit) : rows - const items = await hydrate(page) - items.reverse() - const tail = page.at(-1) + if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` }) return { - items, - more, - cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined, + items: [] as MessageV2.WithParts[], + more: false, } - }, - ) + } - export const stream = fn(SessionID.zod, async function* (sessionID) { + const more = rows.length > input.limit + const slice = more ? rows.slice(0, input.limit) : rows + const items = hydrate(slice) + items.reverse() + const tail = slice.at(-1) + return { + items, + more, + cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined, + } + } + + export function* stream(sessionID: SessionID) { const size = 50 let before: string | undefined while (true) { - const next = await page({ sessionID, limit: size, before }) + const next = page({ sessionID, limit: size, before }) if (next.items.length === 0) break for (let i = next.items.length - 1; i >= 0; i--) { yield next.items[i] @@ -865,9 +868,9 @@ export namespace MessageV2 { if (!next.more || !next.cursor) break before = next.cursor } - }) + } - export const parts = fn(MessageID.zod, async (message_id) => { + export function parts(message_id: MessageID) { const rows = Database.use((db) => db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(), ) @@ -880,33 +883,28 @@ export namespace MessageV2 { messageID: row.message_id, }) as MessageV2.Part, ) - }) + } - export const get = fn( - z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - }), - async (input): Promise => { - const row = Database.use((db) => - db - .select() - .from(MessageTable) - .where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID))) - .get(), - ) - if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` }) - return { - info: info(row), - parts: await parts(input.messageID), - } - }, - ) + export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts { + const row = Database.use((db) => + db + .select() + .from(MessageTable) + .where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID))) + .get(), + ) + if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` }) + return { + info: info(row), + parts: parts(input.messageID), + } + } - export async function filterCompacted(stream: AsyncIterable) { + + export function filterCompacted(msgs: Iterable) { const result = [] as MessageV2.WithParts[] const completed = new Set() - for await (const msg of stream) { + for (const msg of msgs) { result.push(msg) if ( msg.info.role === "user" && @@ -921,6 +919,10 @@ export namespace MessageV2 { return result } + export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) { + return filterCompacted(stream(sessionID)) + }) + export function fromError( e: unknown, ctx: { providerID: ProviderID; aborted?: boolean }, diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index abc101f180..b1a1b8dbd3 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -184,7 +184,7 @@ export namespace SessionProcessor { metadata: value.providerMetadata, } satisfies MessageV2.ToolPart) - const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) + const parts = MessageV2.parts(ctx.assistantMessage.id) const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) if ( @@ -396,7 +396,7 @@ export namespace SessionProcessor { } ctx.reasoningMap = {} - const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) + const parts = MessageV2.parts(ctx.assistantMessage.id) for (const part of parts) { if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue yield* session.updatePart({ diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index fbce62e7dc..2c799b1100 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -216,7 +216,7 @@ export namespace SessionPrompt { (yield* provider.getModel(input.providerID, input.modelID))) const msgs = onlySubtasks ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] - : yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl)) + : yield* MessageV2.toModelMessagesEffect(context, mdl) const text = yield* Effect.promise(async (signal) => { const result = await LLM.stream({ agent: ag, @@ -1342,7 +1342,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the yield* status.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) - let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID))) + let msgs = yield* MessageV2.filterCompactedEffect(sessionID) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined diff --git a/packages/opencode/test/session/messages-pagination.test.ts b/packages/opencode/test/session/messages-pagination.test.ts index 3614b17d08..ea7a0727c6 100644 --- a/packages/opencode/test/session/messages-pagination.test.ts +++ b/packages/opencode/test/session/messages-pagination.test.ts @@ -4,6 +4,7 @@ import { Instance } from "../../src/project/instance" import { Session } from "../../src/session" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, type SessionID } from "../../src/session/schema" +import { ModelID, ProviderID } from "../../src/provider/schema" import { Log } from "../../src/util/log" const root = path.join(__dirname, "../..") @@ -35,7 +36,83 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D return ids } -describe("session message pagination", () => { +async function addUser(sessionID: SessionID, text?: string) { + const id = MessageID.ascending() + await Session.updateMessage({ + id, + sessionID, + role: "user", + time: { created: Date.now() }, + agent: "test", + model: { providerID: "test", modelID: "test" }, + tools: {}, + mode: "", + } as unknown as MessageV2.Info) + if (text) { + await Session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID: id, + type: "text", + text, + }) + } + return id +} + +async function addAssistant( + sessionID: SessionID, + parentID: MessageID, + opts?: { summary?: boolean; finish?: string; error?: MessageV2.Assistant["error"] }, +) { + const id = MessageID.ascending() + await Session.updateMessage({ + id, + sessionID, + role: "assistant", + time: { created: Date.now() }, + parentID, + modelID: ModelID.make("test"), + providerID: ProviderID.make("test"), + mode: "", + agent: "default", + path: { cwd: "/", root: "/" }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + summary: opts?.summary, + finish: opts?.finish, + error: opts?.error, + } as unknown as MessageV2.Info) + return id +} + +async function addCompactionPart(sessionID: SessionID, messageID: MessageID) { + await Session.updatePart({ + id: PartID.ascending(), + sessionID, + messageID, + type: "compaction", + auto: true, + } as any) +} + +describe("MessageV2.page", () => { + test("returns sync result", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 2) + + const result = MessageV2.page({ sessionID: session.id, limit: 10 }) + expect(result).toBeDefined() + expect(result.items).toBeArray() + + await Session.remove(session.id) + }, + }) + }) + test("pages backward with opaque cursors", async () => { await Instance.provide({ directory: root, @@ -43,18 +120,18 @@ describe("session message pagination", () => { const session = await Session.create({}) const ids = await fill(session.id, 6) - const a = await MessageV2.page({ sessionID: session.id, limit: 2 }) + const a = MessageV2.page({ sessionID: session.id, limit: 2 }) expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) expect(a.items.every((item) => item.parts.length === 1)).toBe(true) expect(a.more).toBe(true) expect(a.cursor).toBeTruthy() - const b = await MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) + const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) expect(b.more).toBe(true) expect(b.cursor).toBeTruthy() - const c = await MessageV2.page({ sessionID: session.id, limit: 2, before: b.cursor! }) + const c = MessageV2.page({ sessionID: session.id, limit: 2, before: b.cursor! }) expect(c.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) expect(c.more).toBe(false) expect(c.cursor).toBeUndefined() @@ -64,30 +141,114 @@ describe("session message pagination", () => { }) }) - test("keeps stream order newest first", async () => { + test("returns items in chronological order within a page", async () => { await Instance.provide({ directory: root, fn: async () => { const session = await Session.create({}) - const ids = await fill(session.id, 5) + const ids = await fill(session.id, 4) - const items = await Array.fromAsync(MessageV2.stream(session.id)) - expect(items.map((item) => item.info.id)).toEqual(ids.slice().reverse()) + const result = MessageV2.page({ sessionID: session.id, limit: 4 }) + expect(result.items.map((item) => item.info.id)).toEqual(ids) await Session.remove(session.id) }, }) }) - test("accepts cursors generated from fractional timestamps", async () => { + test("returns empty items for session with no messages", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + const result = MessageV2.page({ sessionID: session.id, limit: 10 }) + expect(result.items).toEqual([]) + expect(result.more).toBe(false) + expect(result.cursor).toBeUndefined() + + await Session.remove(session.id) + }, + }) + }) + + test("throws NotFoundError for non-existent session", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const fake = "non-existent-session" as SessionID + expect(() => MessageV2.page({ sessionID: fake, limit: 10 })).toThrow("NotFoundError") + }, + }) + }) + + test("handles exact limit boundary", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 3) + + const result = MessageV2.page({ sessionID: session.id, limit: 3 }) + expect(result.items.map((item) => item.info.id)).toEqual(ids) + expect(result.more).toBe(false) + expect(result.cursor).toBeUndefined() + + await Session.remove(session.id) + }, + }) + }) + + test("limit of 1 returns single newest message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 5) + + const result = MessageV2.page({ sessionID: session.id, limit: 1 }) + expect(result.items).toHaveLength(1) + expect(result.items[0].info.id).toBe(ids[ids.length - 1]) + expect(result.more).toBe(true) + + await Session.remove(session.id) + }, + }) + }) + + test("hydrates multiple parts per message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: id, + type: "text", + text: "extra", + }) + + const result = MessageV2.page({ sessionID: session.id, limit: 10 }) + expect(result.items).toHaveLength(1) + expect(result.items[0].parts).toHaveLength(2) + + await Session.remove(session.id) + }, + }) + }) + + test("accepts cursors from fractional timestamps", async () => { await Instance.provide({ directory: root, fn: async () => { const session = await Session.create({}) const ids = await fill(session.id, 4, (i) => 1000.5 + i) - const a = await MessageV2.page({ sessionID: session.id, limit: 2 }) - const b = await MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) + const a = MessageV2.page({ sessionID: session.id, limit: 2 }) + const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) @@ -97,7 +258,298 @@ describe("session message pagination", () => { }) }) - test("scopes get by session id", async () => { + test("messages with same timestamp are ordered by id", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 4, () => 1000) + + const a = MessageV2.page({ sessionID: session.id, limit: 2 }) + expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) + expect(a.more).toBe(true) + + const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! }) + expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) + expect(b.more).toBe(false) + + await Session.remove(session.id) + }, + }) + }) + + test("does not return messages from other sessions", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const a = await Session.create({}) + const b = await Session.create({}) + await fill(a.id, 3) + await fill(b.id, 2) + + const resultA = MessageV2.page({ sessionID: a.id, limit: 10 }) + const resultB = MessageV2.page({ sessionID: b.id, limit: 10 }) + expect(resultA.items).toHaveLength(3) + expect(resultB.items).toHaveLength(2) + expect(resultA.items.every((item) => item.info.sessionID === a.id)).toBe(true) + expect(resultB.items.every((item) => item.info.sessionID === b.id)).toBe(true) + + await Session.remove(a.id) + await Session.remove(b.id) + }, + }) + }) + + test("large limit returns all messages without cursor", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 10) + + const result = MessageV2.page({ sessionID: session.id, limit: 100 }) + expect(result.items).toHaveLength(10) + expect(result.items.map((item) => item.info.id)).toEqual(ids) + expect(result.more).toBe(false) + expect(result.cursor).toBeUndefined() + + await Session.remove(session.id) + }, + }) + }) +}) + +describe("MessageV2.stream", () => { + test("yields items newest first", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 5) + + const items = Array.from(MessageV2.stream(session.id)) + expect(items.map((item) => item.info.id)).toEqual(ids.slice().reverse()) + + await Session.remove(session.id) + }, + }) + }) + + test("yields nothing for empty session", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + const items = Array.from(MessageV2.stream(session.id)) + expect(items).toHaveLength(0) + + await Session.remove(session.id) + }, + }) + }) + + test("yields single message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 1) + + const items = Array.from(MessageV2.stream(session.id)) + expect(items).toHaveLength(1) + expect(items[0].info.id).toBe(ids[0]) + + await Session.remove(session.id) + }, + }) + }) + + test("hydrates parts for each yielded message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 3) + + const items = Array.from(MessageV2.stream(session.id)) + for (const item of items) { + expect(item.parts).toHaveLength(1) + expect(item.parts[0].type).toBe("text") + } + + await Session.remove(session.id) + }, + }) + }) + + test("handles sets exceeding internal page size", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 60) + + const items = Array.from(MessageV2.stream(session.id)) + expect(items).toHaveLength(60) + expect(items[0].info.id).toBe(ids[ids.length - 1]) + expect(items[59].info.id).toBe(ids[0]) + + await Session.remove(session.id) + }, + }) + }) + + test("is a sync generator", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 1) + + const gen = MessageV2.stream(session.id) + const first = gen.next() + // sync generator returns { value, done } directly, not a Promise + expect(first).toHaveProperty("value") + expect(first).toHaveProperty("done") + expect(first.done).toBe(false) + + await Session.remove(session.id) + }, + }) + }) +}) + +describe("MessageV2.parts", () => { + test("returns parts for a message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + const result = MessageV2.parts(id) + expect(result).toHaveLength(1) + expect(result[0].type).toBe("text") + expect((result[0] as MessageV2.TextPart).text).toBe("m0") + + await Session.remove(session.id) + }, + }) + }) + + test("returns empty array for message with no parts", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const id = await addUser(session.id) + + const result = MessageV2.parts(id) + expect(result).toEqual([]) + + await Session.remove(session.id) + }, + }) + }) + + test("returns multiple parts in order", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: id, + type: "text", + text: "second", + }) + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: id, + type: "text", + text: "third", + }) + + const result = MessageV2.parts(id) + expect(result).toHaveLength(3) + expect((result[0] as MessageV2.TextPart).text).toBe("m0") + expect((result[1] as MessageV2.TextPart).text).toBe("second") + expect((result[2] as MessageV2.TextPart).text).toBe("third") + + await Session.remove(session.id) + }, + }) + }) + + test("returns empty for non-existent message id", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + await Session.create({}) + const result = MessageV2.parts(MessageID.ascending()) + expect(result).toEqual([]) + }, + }) + }) + + test("parts contain sessionID and messageID", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + const result = MessageV2.parts(id) + expect(result[0].sessionID).toBe(session.id) + expect(result[0].messageID).toBe(id) + + await Session.remove(session.id) + }, + }) + }) +}) + +describe("MessageV2.get", () => { + test("returns message with hydrated parts", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + const result = MessageV2.get({ sessionID: session.id, messageID: id }) + expect(result.info.id).toBe(id) + expect(result.info.sessionID).toBe(session.id) + expect(result.info.role).toBe("user") + expect(result.parts).toHaveLength(1) + expect((result.parts[0] as MessageV2.TextPart).text).toBe("m0") + + await Session.remove(session.id) + }, + }) + }) + + test("throws NotFoundError for non-existent message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + expect(() => MessageV2.get({ sessionID: session.id, messageID: MessageID.ascending() })).toThrow( + "NotFoundError", + ) + + await Session.remove(session.id) + }, + }) + }) + + test("scopes by session id", async () => { await Instance.provide({ directory: root, fn: async () => { @@ -105,11 +557,326 @@ describe("session message pagination", () => { const b = await Session.create({}) const [id] = await fill(a.id, 1) - await expect(MessageV2.get({ sessionID: b.id, messageID: id })).rejects.toMatchObject({ name: "NotFoundError" }) + expect(() => MessageV2.get({ sessionID: b.id, messageID: id })).toThrow("NotFoundError") + const result = MessageV2.get({ sessionID: a.id, messageID: id }) + expect(result.info.id).toBe(id) await Session.remove(a.id) await Session.remove(b.id) }, }) }) + + test("returns message with multiple parts", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: id, + type: "text", + text: "extra", + }) + + const result = MessageV2.get({ sessionID: session.id, messageID: id }) + expect(result.parts).toHaveLength(2) + + await Session.remove(session.id) + }, + }) + }) + + test("returns assistant message with correct role", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const uid = await addUser(session.id, "hello") + const aid = await addAssistant(session.id, uid) + + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: aid, + type: "text", + text: "response", + }) + + const result = MessageV2.get({ sessionID: session.id, messageID: aid }) + expect(result.info.role).toBe("assistant") + expect(result.parts).toHaveLength(1) + expect((result.parts[0] as MessageV2.TextPart).text).toBe("response") + + await Session.remove(session.id) + }, + }) + }) + + test("returns message with zero parts", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const id = await addUser(session.id) + + const result = MessageV2.get({ sessionID: session.id, messageID: id }) + expect(result.info.id).toBe(id) + expect(result.parts).toEqual([]) + + await Session.remove(session.id) + }, + }) + }) +}) + +describe("MessageV2.filterCompacted", () => { + test("returns all messages when no compaction", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 5) + + const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) + expect(result).toHaveLength(5) + // reversed from newest-first to chronological + expect(result.map((item) => item.info.id)).toEqual(ids) + + await Session.remove(session.id) + }, + }) + }) + + test("stops at compaction boundary and returns chronological order", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + // Chronological: u1(+compaction part), a1(summary, parentID=u1), u2, a2 + // Stream (newest first): a2, u2, a1(adds u1 to completed), u1(in completed + compaction) -> break + const u1 = await addUser(session.id, "first question") + const a1 = await addAssistant(session.id, u1, { summary: true, finish: "end_turn" }) + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: a1, + type: "text", + text: "summary", + }) + await addCompactionPart(session.id, u1) + + const u2 = await addUser(session.id, "new question") + const a2 = await addAssistant(session.id, u2) + await Session.updatePart({ + id: PartID.ascending(), + sessionID: session.id, + messageID: a2, + type: "text", + text: "new response", + }) + + const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) + // Includes compaction boundary: u1, a1, u2, a2 + expect(result[0].info.id).toBe(u1) + expect(result.length).toBe(4) + + await Session.remove(session.id) + }, + }) + }) + + test("handles empty iterable", () => { + const result = MessageV2.filterCompacted([]) + expect(result).toEqual([]) + }) + + test("does not break on compaction part without matching summary", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + const u1 = await addUser(session.id, "hello") + await addCompactionPart(session.id, u1) + const u2 = await addUser(session.id, "world") + + const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) + expect(result).toHaveLength(2) + + await Session.remove(session.id) + }, + }) + }) + + test("skips assistant with error even if marked as summary", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + const u1 = await addUser(session.id, "hello") + await addCompactionPart(session.id, u1) + + const error = new MessageV2.APIError({ message: "boom", isRetryable: true }).toObject() as MessageV2.Assistant["error"] + await addAssistant(session.id, u1, { summary: true, finish: "end_turn", error }) + const u2 = await addUser(session.id, "retry") + + const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) + // Error assistant doesn't add to completed, so compaction boundary never triggers + expect(result).toHaveLength(3) + + await Session.remove(session.id) + }, + }) + }) + + test("skips assistant without finish even if marked as summary", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + + const u1 = await addUser(session.id, "hello") + await addCompactionPart(session.id, u1) + + // summary=true but no finish + await addAssistant(session.id, u1, { summary: true }) + const u2 = await addUser(session.id, "next") + + const result = MessageV2.filterCompacted(MessageV2.stream(session.id)) + expect(result).toHaveLength(3) + + await Session.remove(session.id) + }, + }) + }) + + test("works with array input", () => { + // filterCompacted accepts any Iterable, not just generators + const id = MessageID.ascending() + const items: MessageV2.WithParts[] = [ + { + info: { + id, + sessionID: "s1", + role: "user", + time: { created: 1 }, + agent: "test", + model: { providerID: "test", modelID: "test" }, + } as unknown as MessageV2.Info, + parts: [{ type: "text", text: "hello" }] as unknown as MessageV2.Part[], + }, + ] + const result = MessageV2.filterCompacted(items) + expect(result).toHaveLength(1) + expect(result[0].info.id).toBe(id) + }) +}) + +describe("MessageV2.cursor", () => { + test("encode/decode roundtrip", () => { + const input = { id: MessageID.ascending(), time: 1234567890 } + const encoded = MessageV2.cursor.encode(input) + const decoded = MessageV2.cursor.decode(encoded) + expect(decoded.id).toBe(input.id) + expect(decoded.time).toBe(input.time) + }) + + test("encode/decode with fractional time", () => { + const input = { id: MessageID.ascending(), time: 1234567890.5 } + const encoded = MessageV2.cursor.encode(input) + const decoded = MessageV2.cursor.decode(encoded) + expect(decoded.time).toBe(1234567890.5) + }) + + test("encoded cursor is base64url", () => { + const encoded = MessageV2.cursor.encode({ id: MessageID.ascending(), time: 0 }) + expect(encoded).toMatch(/^[A-Za-z0-9_-]+$/) + }) +}) + +describe("MessageV2 consistency", () => { + test("page hydration matches get for each message", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 3) + + const paged = MessageV2.page({ sessionID: session.id, limit: 10 }) + for (const item of paged.items) { + const got = MessageV2.get({ sessionID: session.id, messageID: item.info.id as MessageID }) + expect(got.info).toEqual(item.info) + expect(got.parts).toEqual(item.parts) + } + + await Session.remove(session.id) + }, + }) + }) + + test("parts from get match standalone parts call", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const [id] = await fill(session.id, 1) + + const got = MessageV2.get({ sessionID: session.id, messageID: id }) + const standalone = MessageV2.parts(id) + expect(got.parts).toEqual(standalone) + + await Session.remove(session.id) + }, + }) + }) + + test("stream collects same messages as exhaustive page iteration", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + await fill(session.id, 7) + + const streamed = Array.from(MessageV2.stream(session.id)) + + const paged = [] as MessageV2.WithParts[] + let cursor: string | undefined + while (true) { + const result = MessageV2.page({ sessionID: session.id, limit: 3, before: cursor }) + for (let i = result.items.length - 1; i >= 0; i--) { + paged.push(result.items[i]) + } + if (!result.more || !result.cursor) break + cursor = result.cursor + } + + expect(streamed.map((m) => m.info.id)).toEqual(paged.map((m) => m.info.id)) + + await Session.remove(session.id) + }, + }) + }) + + test("filterCompacted of full stream returns same as Array.from when no compaction", async () => { + await Instance.provide({ + directory: root, + fn: async () => { + const session = await Session.create({}) + const ids = await fill(session.id, 4) + + const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id)) + const all = Array.from(MessageV2.stream(session.id)).reverse() + + expect(filtered.map((m) => m.info.id)).toEqual(all.map((m) => m.info.id)) + + await Session.remove(session.id) + }, + }) + }) }) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 1dd8b7edc9..a79e6967af 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () => } satisfies LLM.StreamInput const value = yield* handle.process(input) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = MessageV2.parts(msg.id) const calls = yield* llm.calls expect(value).toBe("continue") @@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = MessageV2.parts(msg.id) expect(value).toBe("compact") expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true) @@ -299,7 +299,7 @@ it.live("session.processor effect tests capture reasoning from http mock", () => tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = MessageV2.parts(msg.id) const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning") const text = parts.find((part): part is MessageV2.TextPart => part.type === "text") @@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", ( tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = MessageV2.parts(msg.id) const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning") expect(value).toBe("continue") @@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors" tools: {}, }) - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = MessageV2.parts(msg.id) expect(value).toBe("continue") expect(yield* llm.calls).toBe(2) @@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) { yield* handle.abort() } - const parts = yield* Effect.promise(() => MessageV2.parts(msg.id)) + const parts = MessageV2.parts(msg.id) const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool") expect(Exit.isFailure(exit)).toBe(true) @@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( yield* handle.abort() } yield* Effect.promise(() => seen.promise) - const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) + const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id }) const state = yield* sts.get(chat.id) off() @@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) - const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id })) + const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id }) const state = yield* sts.get(chat.id) expect(Exit.isFailure(exit)).toBe(true) diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index 0a6c8e02c8..c1c60b1b87 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -470,7 +470,7 @@ it.live("failed subtask preserves metadata on error tool state", () => expect(result.info.role).toBe("assistant") expect(yield* llm.calls).toBe(2) - const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const msgs = yield* MessageV2.filterCompactedEffect(chat.id) const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") expect(taskMsg?.info.role).toBe("assistant") if (!taskMsg || taskMsg.info.role !== "assistant") return @@ -629,7 +629,7 @@ it.live( const exit = yield* Fiber.await(fiber) expect(Exit.isSuccess(exit)).toBe(true) - const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const msgs = yield* MessageV2.filterCompactedEffect(chat.id) const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") expect(taskMsg?.info.role).toBe("assistant") if (!taskMsg || taskMsg.info.role !== "assistant") return