refactor: add Effect-returning versions of MessageV2 functions (#20374)

pull/20567/head
Kit Langton 2026-04-01 19:48:36 -04:00 committed by GitHub
parent 4214ae205d
commit 26fb6b8788
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 878 additions and 114 deletions

View File

@ -218,7 +218,7 @@ When constructing the summary, try to stick to this template:
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const modelMessages = yield* MessageV2.toModelMessagesEffect(msgs, model, { stripMedia: true })
const ctx = yield* InstanceState.context
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),

View File

@ -593,15 +593,10 @@ export namespace Session {
})
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
return yield* Effect.promise(async () => {
const result = [] as MessageV2.WithParts[]
for await (const msg of MessageV2.stream(input.sessionID)) {
if (input.limit && result.length >= input.limit) break
result.push(msg)
}
result.reverse()
return result
})
if (input.limit) {
return MessageV2.page({ sessionID: input.sessionID, limit: input.limit }).items
}
return Array.from(MessageV2.stream(input.sessionID)).reverse()
})
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {

View File

@ -5,7 +5,6 @@ import { NamedError } from "@opencode-ai/util/error"
import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessage, type UIMessage } from "ai"
import { LSP } from "../lsp"
import { Snapshot } from "@/snapshot"
import { fn } from "@/util/fn"
import { SyncEvent } from "../sync"
import { Database, NotFoundError, and, desc, eq, inArray, lt, or } from "@/storage/db"
import { MessageTable, PartTable, SessionTable } from "./session.sql"
@ -15,6 +14,7 @@ import { errorMessage } from "@/util/error"
import type { SystemError } from "bun"
import type { Provider } from "@/provider/provider"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect } from "effect"
/** Error shape thrown by Bun's fetch() when gzip/br decompression fails mid-stream */
interface FetchDecompressionError extends Error {
@ -547,7 +547,7 @@ export namespace MessageV2 {
and(eq(MessageTable.time_created, row.time), lt(MessageTable.id, row.id)),
)
async function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
function hydrate(rows: (typeof MessageTable.$inferSelect)[]) {
const ids = rows.map((row) => row.id)
const partByMessage = new Map<string, MessageV2.Part[]>()
if (ids.length > 0) {
@ -573,11 +573,11 @@ export namespace MessageV2 {
}))
}
export async function toModelMessages(
export const toModelMessagesEffect = Effect.fnUntraced(function* (
input: WithParts[],
model: Provider.Model,
options?: { stripMedia?: boolean },
): Promise<ModelMessage[]> {
) {
const result: UIMessage[] = []
const toolNames = new Set<string>()
// Track media from tool results that need to be injected as user messages
@ -800,64 +800,67 @@ export namespace MessageV2 {
const tools = Object.fromEntries(Array.from(toolNames).map((toolName) => [toolName, { toModelOutput }]))
return await convertToModelMessages(
result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")),
{
//@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput)
tools,
},
return yield* Effect.promise(() =>
convertToModelMessages(
result.filter((msg) => msg.parts.some((part) => part.type !== "step-start")),
{
//@ts-expect-error (convertToModelMessages expects a ToolSet but only actually needs tools[name]?.toModelOutput)
tools,
},
),
)
})
export function toModelMessages(
input: WithParts[],
model: Provider.Model,
options?: { stripMedia?: boolean },
): Promise<ModelMessage[]> {
return Effect.runPromise(toModelMessagesEffect(input, model, options))
}
export const page = fn(
z.object({
sessionID: SessionID.zod,
limit: z.number().int().positive(),
before: z.string().optional(),
}),
async (input) => {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
const rows = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(input.limit + 1)
.all(),
export function page(input: { sessionID: SessionID; limit: number; before?: string }) {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
: eq(MessageTable.session_id, input.sessionID)
const rows = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(where)
.orderBy(desc(MessageTable.time_created), desc(MessageTable.id))
.limit(input.limit + 1)
.all(),
)
if (rows.length === 0) {
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (rows.length === 0) {
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items: [] as MessageV2.WithParts[],
more: false,
}
}
const more = rows.length > input.limit
const page = more ? rows.slice(0, input.limit) : rows
const items = await hydrate(page)
items.reverse()
const tail = page.at(-1)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items,
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
items: [] as MessageV2.WithParts[],
more: false,
}
},
)
}
export const stream = fn(SessionID.zod, async function* (sessionID) {
const more = rows.length > input.limit
const slice = more ? rows.slice(0, input.limit) : rows
const items = hydrate(slice)
items.reverse()
const tail = slice.at(-1)
return {
items,
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
}
}
export function* stream(sessionID: SessionID) {
const size = 50
let before: string | undefined
while (true) {
const next = await page({ sessionID, limit: size, before })
const next = page({ sessionID, limit: size, before })
if (next.items.length === 0) break
for (let i = next.items.length - 1; i >= 0; i--) {
yield next.items[i]
@ -865,9 +868,9 @@ export namespace MessageV2 {
if (!next.more || !next.cursor) break
before = next.cursor
}
})
}
export const parts = fn(MessageID.zod, async (message_id) => {
export function parts(message_id: MessageID) {
const rows = Database.use((db) =>
db.select().from(PartTable).where(eq(PartTable.message_id, message_id)).orderBy(PartTable.id).all(),
)
@ -880,33 +883,28 @@ export namespace MessageV2 {
messageID: row.message_id,
}) as MessageV2.Part,
)
})
}
export const get = fn(
z.object({
sessionID: SessionID.zod,
messageID: MessageID.zod,
}),
async (input): Promise<WithParts> => {
const row = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: await parts(input.messageID),
}
},
)
export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
const row = Database.use((db) =>
db
.select()
.from(MessageTable)
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: parts(input.messageID),
}
}
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
export function filterCompacted(msgs: Iterable<MessageV2.WithParts>) {
const result = [] as MessageV2.WithParts[]
const completed = new Set<string>()
for await (const msg of stream) {
for (const msg of msgs) {
result.push(msg)
if (
msg.info.role === "user" &&
@ -921,6 +919,10 @@ export namespace MessageV2 {
return result
}
export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: SessionID) {
return filterCompacted(stream(sessionID))
})
export function fromError(
e: unknown,
ctx: { providerID: ProviderID; aborted?: boolean },

View File

@ -184,7 +184,7 @@ export namespace SessionProcessor {
metadata: value.providerMetadata,
} satisfies MessageV2.ToolPart)
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const parts = MessageV2.parts(ctx.assistantMessage.id)
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
if (
@ -396,7 +396,7 @@ export namespace SessionProcessor {
}
ctx.reasoningMap = {}
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const parts = MessageV2.parts(ctx.assistantMessage.id)
for (const part of parts) {
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
yield* session.updatePart({

View File

@ -216,7 +216,7 @@ export namespace SessionPrompt {
(yield* provider.getModel(input.providerID, input.modelID)))
const msgs = onlySubtasks
? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
: yield* Effect.promise(() => MessageV2.toModelMessages(context, mdl))
: yield* MessageV2.toModelMessagesEffect(context, mdl)
const text = yield* Effect.promise(async (signal) => {
const result = await LLM.stream({
agent: ag,
@ -1342,7 +1342,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* status.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID })
let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
let lastUser: MessageV2.User | undefined
let lastAssistant: MessageV2.Assistant | undefined

View File

@ -4,6 +4,7 @@ import { Instance } from "../../src/project/instance"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { Log } from "../../src/util/log"
const root = path.join(__dirname, "../..")
@ -35,7 +36,83 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D
return ids
}
describe("session message pagination", () => {
async function addUser(sessionID: SessionID, text?: string) {
const id = MessageID.ascending()
await Session.updateMessage({
id,
sessionID,
role: "user",
time: { created: Date.now() },
agent: "test",
model: { providerID: "test", modelID: "test" },
tools: {},
mode: "",
} as unknown as MessageV2.Info)
if (text) {
await Session.updatePart({
id: PartID.ascending(),
sessionID,
messageID: id,
type: "text",
text,
})
}
return id
}
async function addAssistant(
sessionID: SessionID,
parentID: MessageID,
opts?: { summary?: boolean; finish?: string; error?: MessageV2.Assistant["error"] },
) {
const id = MessageID.ascending()
await Session.updateMessage({
id,
sessionID,
role: "assistant",
time: { created: Date.now() },
parentID,
modelID: ModelID.make("test"),
providerID: ProviderID.make("test"),
mode: "",
agent: "default",
path: { cwd: "/", root: "/" },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
summary: opts?.summary,
finish: opts?.finish,
error: opts?.error,
} as unknown as MessageV2.Info)
return id
}
async function addCompactionPart(sessionID: SessionID, messageID: MessageID) {
await Session.updatePart({
id: PartID.ascending(),
sessionID,
messageID,
type: "compaction",
auto: true,
} as any)
}
describe("MessageV2.page", () => {
test("returns sync result", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 2)
const result = MessageV2.page({ sessionID: session.id, limit: 10 })
expect(result).toBeDefined()
expect(result.items).toBeArray()
await Session.remove(session.id)
},
})
})
test("pages backward with opaque cursors", async () => {
await Instance.provide({
directory: root,
@ -43,18 +120,18 @@ describe("session message pagination", () => {
const session = await Session.create({})
const ids = await fill(session.id, 6)
const a = await MessageV2.page({ sessionID: session.id, limit: 2 })
const a = MessageV2.page({ sessionID: session.id, limit: 2 })
expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2))
expect(a.items.every((item) => item.parts.length === 1)).toBe(true)
expect(a.more).toBe(true)
expect(a.cursor).toBeTruthy()
const b = await MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! })
const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! })
expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
expect(b.more).toBe(true)
expect(b.cursor).toBeTruthy()
const c = await MessageV2.page({ sessionID: session.id, limit: 2, before: b.cursor! })
const c = MessageV2.page({ sessionID: session.id, limit: 2, before: b.cursor! })
expect(c.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2))
expect(c.more).toBe(false)
expect(c.cursor).toBeUndefined()
@ -64,30 +141,114 @@ describe("session message pagination", () => {
})
})
test("keeps stream order newest first", async () => {
test("returns items in chronological order within a page", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 5)
const ids = await fill(session.id, 4)
const items = await Array.fromAsync(MessageV2.stream(session.id))
expect(items.map((item) => item.info.id)).toEqual(ids.slice().reverse())
const result = MessageV2.page({ sessionID: session.id, limit: 4 })
expect(result.items.map((item) => item.info.id)).toEqual(ids)
await Session.remove(session.id)
},
})
})
test("accepts cursors generated from fractional timestamps", async () => {
test("returns empty items for session with no messages", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const result = MessageV2.page({ sessionID: session.id, limit: 10 })
expect(result.items).toEqual([])
expect(result.more).toBe(false)
expect(result.cursor).toBeUndefined()
await Session.remove(session.id)
},
})
})
test("throws NotFoundError for non-existent session", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const fake = "non-existent-session" as SessionID
expect(() => MessageV2.page({ sessionID: fake, limit: 10 })).toThrow("NotFoundError")
},
})
})
test("handles exact limit boundary", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 3)
const result = MessageV2.page({ sessionID: session.id, limit: 3 })
expect(result.items.map((item) => item.info.id)).toEqual(ids)
expect(result.more).toBe(false)
expect(result.cursor).toBeUndefined()
await Session.remove(session.id)
},
})
})
test("limit of 1 returns single newest message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 5)
const result = MessageV2.page({ sessionID: session.id, limit: 1 })
expect(result.items).toHaveLength(1)
expect(result.items[0].info.id).toBe(ids[ids.length - 1])
expect(result.more).toBe(true)
await Session.remove(session.id)
},
})
})
test("hydrates multiple parts per message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: id,
type: "text",
text: "extra",
})
const result = MessageV2.page({ sessionID: session.id, limit: 10 })
expect(result.items).toHaveLength(1)
expect(result.items[0].parts).toHaveLength(2)
await Session.remove(session.id)
},
})
})
test("accepts cursors from fractional timestamps", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 4, (i) => 1000.5 + i)
const a = await MessageV2.page({ sessionID: session.id, limit: 2 })
const b = await MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! })
const a = MessageV2.page({ sessionID: session.id, limit: 2 })
const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! })
expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2))
expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2))
@ -97,7 +258,298 @@ describe("session message pagination", () => {
})
})
test("scopes get by session id", async () => {
test("messages with same timestamp are ordered by id", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 4, () => 1000)
const a = MessageV2.page({ sessionID: session.id, limit: 2 })
expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2))
expect(a.more).toBe(true)
const b = MessageV2.page({ sessionID: session.id, limit: 2, before: a.cursor! })
expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2))
expect(b.more).toBe(false)
await Session.remove(session.id)
},
})
})
test("does not return messages from other sessions", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const a = await Session.create({})
const b = await Session.create({})
await fill(a.id, 3)
await fill(b.id, 2)
const resultA = MessageV2.page({ sessionID: a.id, limit: 10 })
const resultB = MessageV2.page({ sessionID: b.id, limit: 10 })
expect(resultA.items).toHaveLength(3)
expect(resultB.items).toHaveLength(2)
expect(resultA.items.every((item) => item.info.sessionID === a.id)).toBe(true)
expect(resultB.items.every((item) => item.info.sessionID === b.id)).toBe(true)
await Session.remove(a.id)
await Session.remove(b.id)
},
})
})
test("large limit returns all messages without cursor", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 10)
const result = MessageV2.page({ sessionID: session.id, limit: 100 })
expect(result.items).toHaveLength(10)
expect(result.items.map((item) => item.info.id)).toEqual(ids)
expect(result.more).toBe(false)
expect(result.cursor).toBeUndefined()
await Session.remove(session.id)
},
})
})
})
describe("MessageV2.stream", () => {
test("yields items newest first", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 5)
const items = Array.from(MessageV2.stream(session.id))
expect(items.map((item) => item.info.id)).toEqual(ids.slice().reverse())
await Session.remove(session.id)
},
})
})
test("yields nothing for empty session", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const items = Array.from(MessageV2.stream(session.id))
expect(items).toHaveLength(0)
await Session.remove(session.id)
},
})
})
test("yields single message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 1)
const items = Array.from(MessageV2.stream(session.id))
expect(items).toHaveLength(1)
expect(items[0].info.id).toBe(ids[0])
await Session.remove(session.id)
},
})
})
test("hydrates parts for each yielded message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 3)
const items = Array.from(MessageV2.stream(session.id))
for (const item of items) {
expect(item.parts).toHaveLength(1)
expect(item.parts[0].type).toBe("text")
}
await Session.remove(session.id)
},
})
})
test("handles sets exceeding internal page size", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 60)
const items = Array.from(MessageV2.stream(session.id))
expect(items).toHaveLength(60)
expect(items[0].info.id).toBe(ids[ids.length - 1])
expect(items[59].info.id).toBe(ids[0])
await Session.remove(session.id)
},
})
})
test("is a sync generator", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 1)
const gen = MessageV2.stream(session.id)
const first = gen.next()
// sync generator returns { value, done } directly, not a Promise
expect(first).toHaveProperty("value")
expect(first).toHaveProperty("done")
expect(first.done).toBe(false)
await Session.remove(session.id)
},
})
})
})
describe("MessageV2.parts", () => {
test("returns parts for a message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
const result = MessageV2.parts(id)
expect(result).toHaveLength(1)
expect(result[0].type).toBe("text")
expect((result[0] as MessageV2.TextPart).text).toBe("m0")
await Session.remove(session.id)
},
})
})
test("returns empty array for message with no parts", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const id = await addUser(session.id)
const result = MessageV2.parts(id)
expect(result).toEqual([])
await Session.remove(session.id)
},
})
})
test("returns multiple parts in order", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: id,
type: "text",
text: "second",
})
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: id,
type: "text",
text: "third",
})
const result = MessageV2.parts(id)
expect(result).toHaveLength(3)
expect((result[0] as MessageV2.TextPart).text).toBe("m0")
expect((result[1] as MessageV2.TextPart).text).toBe("second")
expect((result[2] as MessageV2.TextPart).text).toBe("third")
await Session.remove(session.id)
},
})
})
test("returns empty for non-existent message id", async () => {
await Instance.provide({
directory: root,
fn: async () => {
await Session.create({})
const result = MessageV2.parts(MessageID.ascending())
expect(result).toEqual([])
},
})
})
test("parts contain sessionID and messageID", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
const result = MessageV2.parts(id)
expect(result[0].sessionID).toBe(session.id)
expect(result[0].messageID).toBe(id)
await Session.remove(session.id)
},
})
})
})
describe("MessageV2.get", () => {
test("returns message with hydrated parts", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
const result = MessageV2.get({ sessionID: session.id, messageID: id })
expect(result.info.id).toBe(id)
expect(result.info.sessionID).toBe(session.id)
expect(result.info.role).toBe("user")
expect(result.parts).toHaveLength(1)
expect((result.parts[0] as MessageV2.TextPart).text).toBe("m0")
await Session.remove(session.id)
},
})
})
test("throws NotFoundError for non-existent message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
expect(() => MessageV2.get({ sessionID: session.id, messageID: MessageID.ascending() })).toThrow(
"NotFoundError",
)
await Session.remove(session.id)
},
})
})
test("scopes by session id", async () => {
await Instance.provide({
directory: root,
fn: async () => {
@ -105,11 +557,326 @@ describe("session message pagination", () => {
const b = await Session.create({})
const [id] = await fill(a.id, 1)
await expect(MessageV2.get({ sessionID: b.id, messageID: id })).rejects.toMatchObject({ name: "NotFoundError" })
expect(() => MessageV2.get({ sessionID: b.id, messageID: id })).toThrow("NotFoundError")
const result = MessageV2.get({ sessionID: a.id, messageID: id })
expect(result.info.id).toBe(id)
await Session.remove(a.id)
await Session.remove(b.id)
},
})
})
test("returns message with multiple parts", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: id,
type: "text",
text: "extra",
})
const result = MessageV2.get({ sessionID: session.id, messageID: id })
expect(result.parts).toHaveLength(2)
await Session.remove(session.id)
},
})
})
test("returns assistant message with correct role", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const uid = await addUser(session.id, "hello")
const aid = await addAssistant(session.id, uid)
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: aid,
type: "text",
text: "response",
})
const result = MessageV2.get({ sessionID: session.id, messageID: aid })
expect(result.info.role).toBe("assistant")
expect(result.parts).toHaveLength(1)
expect((result.parts[0] as MessageV2.TextPart).text).toBe("response")
await Session.remove(session.id)
},
})
})
test("returns message with zero parts", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const id = await addUser(session.id)
const result = MessageV2.get({ sessionID: session.id, messageID: id })
expect(result.info.id).toBe(id)
expect(result.parts).toEqual([])
await Session.remove(session.id)
},
})
})
})
describe("MessageV2.filterCompacted", () => {
test("returns all messages when no compaction", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 5)
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
expect(result).toHaveLength(5)
// reversed from newest-first to chronological
expect(result.map((item) => item.info.id)).toEqual(ids)
await Session.remove(session.id)
},
})
})
test("stops at compaction boundary and returns chronological order", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
// Chronological: u1(+compaction part), a1(summary, parentID=u1), u2, a2
// Stream (newest first): a2, u2, a1(adds u1 to completed), u1(in completed + compaction) -> break
const u1 = await addUser(session.id, "first question")
const a1 = await addAssistant(session.id, u1, { summary: true, finish: "end_turn" })
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: a1,
type: "text",
text: "summary",
})
await addCompactionPart(session.id, u1)
const u2 = await addUser(session.id, "new question")
const a2 = await addAssistant(session.id, u2)
await Session.updatePart({
id: PartID.ascending(),
sessionID: session.id,
messageID: a2,
type: "text",
text: "new response",
})
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
// Includes compaction boundary: u1, a1, u2, a2
expect(result[0].info.id).toBe(u1)
expect(result.length).toBe(4)
await Session.remove(session.id)
},
})
})
test("handles empty iterable", () => {
const result = MessageV2.filterCompacted([])
expect(result).toEqual([])
})
test("does not break on compaction part without matching summary", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const u1 = await addUser(session.id, "hello")
await addCompactionPart(session.id, u1)
const u2 = await addUser(session.id, "world")
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
expect(result).toHaveLength(2)
await Session.remove(session.id)
},
})
})
test("skips assistant with error even if marked as summary", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const u1 = await addUser(session.id, "hello")
await addCompactionPart(session.id, u1)
const error = new MessageV2.APIError({ message: "boom", isRetryable: true }).toObject() as MessageV2.Assistant["error"]
await addAssistant(session.id, u1, { summary: true, finish: "end_turn", error })
const u2 = await addUser(session.id, "retry")
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
// Error assistant doesn't add to completed, so compaction boundary never triggers
expect(result).toHaveLength(3)
await Session.remove(session.id)
},
})
})
test("skips assistant without finish even if marked as summary", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const u1 = await addUser(session.id, "hello")
await addCompactionPart(session.id, u1)
// summary=true but no finish
await addAssistant(session.id, u1, { summary: true })
const u2 = await addUser(session.id, "next")
const result = MessageV2.filterCompacted(MessageV2.stream(session.id))
expect(result).toHaveLength(3)
await Session.remove(session.id)
},
})
})
test("works with array input", () => {
// filterCompacted accepts any Iterable, not just generators
const id = MessageID.ascending()
const items: MessageV2.WithParts[] = [
{
info: {
id,
sessionID: "s1",
role: "user",
time: { created: 1 },
agent: "test",
model: { providerID: "test", modelID: "test" },
} as unknown as MessageV2.Info,
parts: [{ type: "text", text: "hello" }] as unknown as MessageV2.Part[],
},
]
const result = MessageV2.filterCompacted(items)
expect(result).toHaveLength(1)
expect(result[0].info.id).toBe(id)
})
})
describe("MessageV2.cursor", () => {
test("encode/decode roundtrip", () => {
const input = { id: MessageID.ascending(), time: 1234567890 }
const encoded = MessageV2.cursor.encode(input)
const decoded = MessageV2.cursor.decode(encoded)
expect(decoded.id).toBe(input.id)
expect(decoded.time).toBe(input.time)
})
test("encode/decode with fractional time", () => {
const input = { id: MessageID.ascending(), time: 1234567890.5 }
const encoded = MessageV2.cursor.encode(input)
const decoded = MessageV2.cursor.decode(encoded)
expect(decoded.time).toBe(1234567890.5)
})
test("encoded cursor is base64url", () => {
const encoded = MessageV2.cursor.encode({ id: MessageID.ascending(), time: 0 })
expect(encoded).toMatch(/^[A-Za-z0-9_-]+$/)
})
})
describe("MessageV2 consistency", () => {
test("page hydration matches get for each message", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 3)
const paged = MessageV2.page({ sessionID: session.id, limit: 10 })
for (const item of paged.items) {
const got = MessageV2.get({ sessionID: session.id, messageID: item.info.id as MessageID })
expect(got.info).toEqual(item.info)
expect(got.parts).toEqual(item.parts)
}
await Session.remove(session.id)
},
})
})
test("parts from get match standalone parts call", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const [id] = await fill(session.id, 1)
const got = MessageV2.get({ sessionID: session.id, messageID: id })
const standalone = MessageV2.parts(id)
expect(got.parts).toEqual(standalone)
await Session.remove(session.id)
},
})
})
test("stream collects same messages as exhaustive page iteration", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 7)
const streamed = Array.from(MessageV2.stream(session.id))
const paged = [] as MessageV2.WithParts[]
let cursor: string | undefined
while (true) {
const result = MessageV2.page({ sessionID: session.id, limit: 3, before: cursor })
for (let i = result.items.length - 1; i >= 0; i--) {
paged.push(result.items[i])
}
if (!result.more || !result.cursor) break
cursor = result.cursor
}
expect(streamed.map((m) => m.info.id)).toEqual(paged.map((m) => m.info.id))
await Session.remove(session.id)
},
})
})
test("filterCompacted of full stream returns same as Array.from when no compaction", async () => {
await Instance.provide({
directory: root,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 4)
const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
const all = Array.from(MessageV2.stream(session.id)).reverse()
expect(filtered.map((m) => m.info.id)).toEqual(all.map((m) => m.info.id))
await Session.remove(session.id)
},
})
})
})

View File

@ -207,7 +207,7 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
} satisfies LLM.StreamInput
const value = yield* handle.process(input)
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = MessageV2.parts(msg.id)
const calls = yield* llm.calls
expect(value).toBe("continue")
@ -254,7 +254,7 @@ it.live("session.processor effect tests stop after token overflow requests compa
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = MessageV2.parts(msg.id)
expect(value).toBe("compact")
expect(parts.some((part) => part.type === "text" && part.text === "after")).toBe(true)
@ -299,7 +299,7 @@ it.live("session.processor effect tests capture reasoning from http mock", () =>
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = MessageV2.parts(msg.id)
const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
@ -347,7 +347,7 @@ it.live("session.processor effect tests reset reasoning state across retries", (
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = MessageV2.parts(msg.id)
const reasoning = parts.filter((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
expect(value).toBe("continue")
@ -438,7 +438,7 @@ it.live("session.processor effect tests retry recognized structured json errors"
tools: {},
})
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = MessageV2.parts(msg.id)
expect(value).toBe("continue")
expect(yield* llm.calls).toBe(2)
@ -596,7 +596,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
yield* handle.abort()
}
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const parts = MessageV2.parts(msg.id)
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
expect(Exit.isFailure(exit)).toBe(true)
@ -669,7 +669,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
yield* handle.abort()
}
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
off()
@ -731,7 +731,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
expect(Exit.isFailure(exit)).toBe(true)

View File

@ -470,7 +470,7 @@ it.live("failed subtask preserves metadata on error tool state", () =>
expect(result.info.role).toBe("assistant")
expect(yield* llm.calls).toBe(2)
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return
@ -629,7 +629,7 @@ it.live(
const exit = yield* Fiber.await(fiber)
expect(Exit.isSuccess(exit)).toBe(true)
const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
const msgs = yield* MessageV2.filterCompactedEffect(chat.id)
const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
expect(taskMsg?.info.role).toBe("assistant")
if (!taskMsg || taskMsg.info.role !== "assistant") return