refactor(session): effectify SessionCompaction service (#19459)
parent
f3997d8082
commit
2145d97f18
|
|
@ -16,6 +16,8 @@ import { Config } from "@/config/config"
|
|||
import { NotFoundError } from "@/storage/db"
|
||||
import { ProviderTransform } from "@/provider/transform"
|
||||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Effect, Layer, ServiceMap } from "effect"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
|
||||
export namespace SessionCompaction {
|
||||
const log = Log.create({ service: "session.compaction" })
|
||||
|
|
@ -30,10 +32,54 @@ export namespace SessionCompaction {
|
|||
}
|
||||
|
||||
const COMPACTION_BUFFER = 20_000
|
||||
export const PRUNE_MINIMUM = 20_000
|
||||
export const PRUNE_PROTECT = 40_000
|
||||
const PRUNE_PROTECTED_TOOLS = ["skill"]
|
||||
|
||||
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
|
||||
const config = await Config.get()
|
||||
if (config.compaction?.auto === false) return false
|
||||
export interface Interface {
|
||||
readonly isOverflow: (input: {
|
||||
tokens: MessageV2.Assistant["tokens"]
|
||||
model: Provider.Model
|
||||
}) => Effect.Effect<boolean>
|
||||
readonly prune: (input: { sessionID: SessionID }) => Effect.Effect<void>
|
||||
readonly process: (input: {
|
||||
parentID: MessageID
|
||||
messages: MessageV2.WithParts[]
|
||||
sessionID: SessionID
|
||||
abort: AbortSignal
|
||||
auto: boolean
|
||||
overflow?: boolean
|
||||
}) => Effect.Effect<"continue" | "stop">
|
||||
readonly create: (input: {
|
||||
sessionID: SessionID
|
||||
agent: string
|
||||
model: { providerID: ProviderID; modelID: ModelID }
|
||||
auto: boolean
|
||||
overflow?: boolean
|
||||
}) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionCompaction") {}
|
||||
|
||||
export const layer: Layer.Layer<
|
||||
Service,
|
||||
never,
|
||||
Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service
|
||||
> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const config = yield* Config.Service
|
||||
const session = yield* Session.Service
|
||||
const agents = yield* Agent.Service
|
||||
const plugin = yield* Plugin.Service
|
||||
|
||||
const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: {
|
||||
tokens: MessageV2.Assistant["tokens"]
|
||||
model: Provider.Model
|
||||
}) {
|
||||
const cfg = yield* config.get()
|
||||
if (cfg.compaction?.auto === false) return false
|
||||
const context = input.model.limit.context
|
||||
if (context === 0) return false
|
||||
|
||||
|
|
@ -42,33 +88,28 @@ export namespace SessionCompaction {
|
|||
input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write
|
||||
|
||||
const reserved =
|
||||
config.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
|
||||
cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
|
||||
const usable = input.model.limit.input
|
||||
? input.model.limit.input - reserved
|
||||
: context - ProviderTransform.maxOutputTokens(input.model)
|
||||
return count >= usable
|
||||
}
|
||||
|
||||
export const PRUNE_MINIMUM = 20_000
|
||||
export const PRUNE_PROTECT = 40_000
|
||||
|
||||
const PRUNE_PROTECTED_TOOLS = ["skill"]
|
||||
|
||||
// goes backwards through parts until there are 40_000 tokens worth of tool
|
||||
// calls. then erases output of previous tool calls. idea is to throw away old
|
||||
// tool calls that are no longer relevant.
|
||||
export async function prune(input: { sessionID: SessionID }) {
|
||||
const config = await Config.get()
|
||||
if (config.compaction?.prune === false) return
|
||||
log.info("pruning")
|
||||
const msgs = await Session.messages({ sessionID: input.sessionID }).catch((err) => {
|
||||
if (NotFoundError.isInstance(err)) return undefined
|
||||
throw err
|
||||
})
|
||||
|
||||
// goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool
|
||||
// calls, then erases output of older tool calls to free context space
|
||||
const prune = Effect.fn("SessionCompaction.prune")(function* (input: { sessionID: SessionID }) {
|
||||
const cfg = yield* config.get()
|
||||
if (cfg.compaction?.prune === false) return
|
||||
log.info("pruning")
|
||||
|
||||
const msgs = yield* session
|
||||
.messages({ sessionID: input.sessionID })
|
||||
.pipe(Effect.catchIf(NotFoundError.isInstance, () => Effect.succeed(undefined)))
|
||||
if (!msgs) return
|
||||
|
||||
let total = 0
|
||||
let pruned = 0
|
||||
const toPrune = []
|
||||
const toPrune: MessageV2.ToolPart[] = []
|
||||
let turns = 0
|
||||
|
||||
loop: for (let msgIndex = msgs.length - 1; msgIndex >= 0; msgIndex--) {
|
||||
|
|
@ -81,7 +122,6 @@ export namespace SessionCompaction {
|
|||
if (part.type === "tool")
|
||||
if (part.state.status === "completed") {
|
||||
if (PRUNE_PROTECTED_TOOLS.includes(part.tool)) continue
|
||||
|
||||
if (part.state.time.compacted) break loop
|
||||
const estimate = Token.estimate(part.state.output)
|
||||
total += estimate
|
||||
|
|
@ -92,19 +132,20 @@ export namespace SessionCompaction {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("found", { pruned, total })
|
||||
if (pruned > PRUNE_MINIMUM) {
|
||||
for (const part of toPrune) {
|
||||
if (part.state.status === "completed") {
|
||||
part.state.time.compacted = Date.now()
|
||||
await Session.updatePart(part)
|
||||
yield* session.updatePart(part)
|
||||
}
|
||||
}
|
||||
log.info("pruned", { count: toPrune.length })
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
export async function process(input: {
|
||||
const processCompaction = Effect.fn("SessionCompaction.process")(function* (input: {
|
||||
parentID: MessageID
|
||||
messages: MessageV2.WithParts[]
|
||||
sessionID: SessionID
|
||||
|
|
@ -134,11 +175,13 @@ export namespace SessionCompaction {
|
|||
}
|
||||
}
|
||||
|
||||
const agent = await Agent.get("compaction")
|
||||
const model = agent.model
|
||||
? await Provider.getModel(agent.model.providerID, agent.model.modelID)
|
||||
: await Provider.getModel(userMessage.model.providerID, userMessage.model.modelID)
|
||||
const msg = (await Session.updateMessage({
|
||||
const agent = yield* agents.get("compaction")
|
||||
const model = yield* Effect.promise(() =>
|
||||
agent.model
|
||||
? Provider.getModel(agent.model.providerID, agent.model.modelID)
|
||||
: Provider.getModel(userMessage.model.providerID, userMessage.model.modelID),
|
||||
)
|
||||
const msg = (yield* session.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
parentID: input.parentID,
|
||||
|
|
@ -170,8 +213,8 @@ export namespace SessionCompaction {
|
|||
model,
|
||||
abort: input.abort,
|
||||
})
|
||||
// Allow plugins to inject context or replace compaction prompt
|
||||
const compacting = await Plugin.trigger(
|
||||
// Allow plugins to inject context or replace compaction prompt.
|
||||
const compacting = yield* plugin.trigger(
|
||||
"experimental.session.compacting",
|
||||
{ sessionID: input.sessionID },
|
||||
{ context: [], prompt: undefined },
|
||||
|
|
@ -204,10 +247,12 @@ When constructing the summary, try to stick to this template:
|
|||
[Construct a structured list of relevant files that have been read, edited, or created that pertain to the task at hand. If all the files in a directory are relevant, include the path to the directory.]
|
||||
---`
|
||||
|
||||
const promptText = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
|
||||
const prompt = compacting.prompt ?? [defaultPrompt, ...compacting.context].join("\n\n")
|
||||
const msgs = structuredClone(messages)
|
||||
await Plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const result = await processor.process({
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
|
||||
const result = yield* Effect.promise(() =>
|
||||
processor.process({
|
||||
user: userMessage,
|
||||
agent,
|
||||
abort: input.abort,
|
||||
|
|
@ -215,19 +260,15 @@ When constructing the summary, try to stick to this template:
|
|||
tools: {},
|
||||
system: [],
|
||||
messages: [
|
||||
...(await MessageV2.toModelMessages(msgs, model, { stripMedia: true })),
|
||||
...modelMessages,
|
||||
{
|
||||
role: "user",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: promptText,
|
||||
},
|
||||
],
|
||||
content: [{ type: "text", text: prompt }],
|
||||
},
|
||||
],
|
||||
model,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
if (result === "compact") {
|
||||
processor.message.error = new MessageV2.ContextOverflowError({
|
||||
|
|
@ -236,14 +277,14 @@ When constructing the summary, try to stick to this template:
|
|||
: "Session too large to compact - context exceeds model limit even after stripping media",
|
||||
}).toObject()
|
||||
processor.message.finish = "error"
|
||||
await Session.updateMessage(processor.message)
|
||||
yield* session.updateMessage(processor.message)
|
||||
return "stop"
|
||||
}
|
||||
|
||||
if (result === "continue" && input.auto) {
|
||||
if (replay) {
|
||||
const original = replay.info as MessageV2.User
|
||||
const replayMsg = await Session.updateMessage({
|
||||
const replayMsg = yield* session.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "user",
|
||||
sessionID: input.sessionID,
|
||||
|
|
@ -261,15 +302,17 @@ When constructing the summary, try to stick to this template:
|
|||
part.type === "file" && MessageV2.isMedia(part.mime)
|
||||
? { type: "text" as const, text: `[Attached ${part.mime}: ${part.filename ?? "file"}]` }
|
||||
: part
|
||||
await Session.updatePart({
|
||||
yield* session.updatePart({
|
||||
...replayPart,
|
||||
id: PartID.ascending(),
|
||||
messageID: replayMsg.id,
|
||||
sessionID: input.sessionID,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
const continueMsg = await Session.updateMessage({
|
||||
}
|
||||
|
||||
if (!replay) {
|
||||
const continueMsg = yield* session.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "user",
|
||||
sessionID: input.sessionID,
|
||||
|
|
@ -282,7 +325,7 @@ When constructing the summary, try to stick to this template:
|
|||
? "The previous request exceeded the provider's size limit due to large media attachments. The conversation was compacted and media files were removed from context. If the user was asking about attached images or files, explain that the attachments were too large to process and suggest they try again with smaller or fewer files.\n\n"
|
||||
: "") +
|
||||
"Continue if you have next steps, or stop and ask for clarification if you are unsure how to proceed."
|
||||
await Session.updatePart({
|
||||
yield* session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: continueMsg.id,
|
||||
sessionID: input.sessionID,
|
||||
|
|
@ -296,34 +339,28 @@ When constructing the summary, try to stick to this template:
|
|||
})
|
||||
}
|
||||
}
|
||||
if (processor.message.error) return "stop"
|
||||
Bus.publish(Event.Compacted, { sessionID: input.sessionID })
|
||||
return "continue"
|
||||
}
|
||||
|
||||
export const create = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
agent: z.string(),
|
||||
model: z.object({
|
||||
providerID: ProviderID.zod,
|
||||
modelID: ModelID.zod,
|
||||
}),
|
||||
auto: z.boolean(),
|
||||
overflow: z.boolean().optional(),
|
||||
}),
|
||||
async (input) => {
|
||||
const msg = await Session.updateMessage({
|
||||
if (processor.message.error) return "stop"
|
||||
if (result === "continue") yield* bus.publish(Event.Compacted, { sessionID: input.sessionID })
|
||||
return result
|
||||
})
|
||||
|
||||
const create = Effect.fn("SessionCompaction.create")(function* (input: {
|
||||
sessionID: SessionID
|
||||
agent: string
|
||||
model: { providerID: ProviderID; modelID: ModelID }
|
||||
auto: boolean
|
||||
overflow?: boolean
|
||||
}) {
|
||||
const msg = yield* session.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "user",
|
||||
model: input.model,
|
||||
sessionID: input.sessionID,
|
||||
agent: input.agent,
|
||||
time: {
|
||||
created: Date.now(),
|
||||
},
|
||||
time: { created: Date.now() },
|
||||
})
|
||||
await Session.updatePart({
|
||||
yield* session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: msg.id,
|
||||
sessionID: msg.sessionID,
|
||||
|
|
@ -331,6 +368,58 @@ When constructing the summary, try to stick to this template:
|
|||
auto: input.auto,
|
||||
overflow: input.overflow,
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
isOverflow,
|
||||
prune,
|
||||
process: processCompaction,
|
||||
create,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = Layer.unwrap(
|
||||
Effect.sync(() =>
|
||||
layer.pipe(
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(Agent.defaultLayer),
|
||||
Layer.provide(Plugin.defaultLayer),
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
|
||||
return runPromise((svc) => svc.isOverflow(input))
|
||||
}
|
||||
|
||||
export async function prune(input: { sessionID: SessionID }) {
|
||||
return runPromise((svc) => svc.prune(input))
|
||||
}
|
||||
|
||||
export async function process(input: {
|
||||
parentID: MessageID
|
||||
messages: MessageV2.WithParts[]
|
||||
sessionID: SessionID
|
||||
abort: AbortSignal
|
||||
auto: boolean
|
||||
overflow?: boolean
|
||||
}) {
|
||||
return runPromise((svc) => svc.process(input))
|
||||
}
|
||||
|
||||
export const create = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
agent: z.string(),
|
||||
model: z.object({ providerID: ProviderID.zod, modelID: ModelID.zod }),
|
||||
auto: z.boolean(),
|
||||
overflow: z.boolean().optional(),
|
||||
}),
|
||||
(input) => runPromise((svc) => svc.create(input)),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +1,30 @@
|
|||
import { describe, expect, test } from "bun:test"
|
||||
import { afterEach, describe, expect, mock, spyOn, test } from "bun:test"
|
||||
import path from "path"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { SessionCompaction } from "../../src/session/compaction"
|
||||
import { Token } from "../../src/util/token"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { Log } from "../../src/util/log"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { Session } from "../../src/session"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, SessionID } from "../../src/session/schema"
|
||||
import { ModelID, ProviderID } from "../../src/provider/schema"
|
||||
import type { Provider } from "../../src/provider/provider"
|
||||
import * as ProviderModule from "../../src/provider/provider"
|
||||
import * as SessionProcessorModule from "../../src/session/processor"
|
||||
|
||||
Log.init({ print: false })
|
||||
|
||||
const ref = {
|
||||
providerID: ProviderID.make("test"),
|
||||
modelID: ModelID.make("test-model"),
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
mock.restore()
|
||||
})
|
||||
|
||||
function createModel(opts: {
|
||||
context: number
|
||||
output: number
|
||||
|
|
@ -40,6 +55,105 @@ function createModel(opts: {
|
|||
} as Provider.Model
|
||||
}
|
||||
|
||||
async function user(sessionID: SessionID, text: string) {
|
||||
const msg = await Session.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "user",
|
||||
sessionID,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
time: { created: Date.now() },
|
||||
})
|
||||
await Session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: msg.id,
|
||||
sessionID,
|
||||
type: "text",
|
||||
text,
|
||||
})
|
||||
return msg
|
||||
}
|
||||
|
||||
async function assistant(sessionID: SessionID, parentID: MessageID, root: string) {
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
sessionID,
|
||||
mode: "build",
|
||||
agent: "build",
|
||||
path: { cwd: root, root },
|
||||
cost: 0,
|
||||
tokens: {
|
||||
output: 0,
|
||||
input: 0,
|
||||
reasoning: 0,
|
||||
cache: { read: 0, write: 0 },
|
||||
},
|
||||
modelID: ref.modelID,
|
||||
providerID: ref.providerID,
|
||||
parentID,
|
||||
time: { created: Date.now() },
|
||||
finish: "end_turn",
|
||||
}
|
||||
await Session.updateMessage(msg)
|
||||
return msg
|
||||
}
|
||||
|
||||
async function tool(sessionID: SessionID, messageID: MessageID, tool: string, output: string) {
|
||||
return Session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID,
|
||||
sessionID,
|
||||
type: "tool",
|
||||
callID: crypto.randomUUID(),
|
||||
tool,
|
||||
state: {
|
||||
status: "completed",
|
||||
input: {},
|
||||
output,
|
||||
title: "done",
|
||||
metadata: {},
|
||||
time: { start: Date.now(), end: Date.now() },
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
function fake(
|
||||
input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0],
|
||||
result: "continue" | "compact",
|
||||
): ReturnType<(typeof SessionProcessorModule.SessionProcessor)["create"]> {
|
||||
const msg = input.assistantMessage
|
||||
return {
|
||||
get message() {
|
||||
return msg
|
||||
},
|
||||
partFromToolCall() {
|
||||
return {
|
||||
id: PartID.ascending(),
|
||||
messageID: msg.id,
|
||||
sessionID: msg.sessionID,
|
||||
type: "tool",
|
||||
callID: "fake",
|
||||
tool: "fake",
|
||||
state: { status: "pending", input: {}, raw: "" },
|
||||
}
|
||||
},
|
||||
process: async () => result,
|
||||
}
|
||||
}
|
||||
|
||||
function wait(ms = 50) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
function defer() {
|
||||
let resolve!: () => void
|
||||
const promise = new Promise<void>((done) => {
|
||||
resolve = done
|
||||
})
|
||||
return { promise, resolve }
|
||||
}
|
||||
|
||||
describe("session.compaction.isOverflow", () => {
|
||||
test("returns true when token count exceeds usable context", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
|
@ -227,6 +341,272 @@ describe("session.compaction.isOverflow", () => {
|
|||
})
|
||||
})
|
||||
|
||||
describe("session.compaction.create", () => {
|
||||
test("creates a compaction user message and part", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await Session.create({})
|
||||
|
||||
await SessionCompaction.create({
|
||||
sessionID: session.id,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
auto: true,
|
||||
overflow: true,
|
||||
})
|
||||
|
||||
const msgs = await Session.messages({ sessionID: session.id })
|
||||
expect(msgs).toHaveLength(1)
|
||||
expect(msgs[0].info.role).toBe("user")
|
||||
expect(msgs[0].parts).toHaveLength(1)
|
||||
expect(msgs[0].parts[0]).toMatchObject({
|
||||
type: "compaction",
|
||||
auto: true,
|
||||
overflow: true,
|
||||
})
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("session.compaction.prune", () => {
|
||||
test("compacts old completed tool output", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await Session.create({})
|
||||
const a = await user(session.id, "first")
|
||||
const b = await assistant(session.id, a.id, tmp.path)
|
||||
await tool(session.id, b.id, "bash", "x".repeat(200_000))
|
||||
await user(session.id, "second")
|
||||
await user(session.id, "third")
|
||||
|
||||
await SessionCompaction.prune({ sessionID: session.id })
|
||||
|
||||
const msgs = await Session.messages({ sessionID: session.id })
|
||||
const part = msgs.flatMap((msg) => msg.parts).find((part) => part.type === "tool")
|
||||
expect(part?.type).toBe("tool")
|
||||
expect(part?.state.status).toBe("completed")
|
||||
if (part?.type === "tool" && part.state.status === "completed") {
|
||||
expect(part.state.time.compacted).toBeNumber()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("skips protected skill tool output", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await Session.create({})
|
||||
const a = await user(session.id, "first")
|
||||
const b = await assistant(session.id, a.id, tmp.path)
|
||||
await tool(session.id, b.id, "skill", "x".repeat(200_000))
|
||||
await user(session.id, "second")
|
||||
await user(session.id, "third")
|
||||
|
||||
await SessionCompaction.prune({ sessionID: session.id })
|
||||
|
||||
const msgs = await Session.messages({ sessionID: session.id })
|
||||
const part = msgs.flatMap((msg) => msg.parts).find((part) => part.type === "tool")
|
||||
expect(part?.type).toBe("tool")
|
||||
if (part?.type === "tool" && part.state.status === "completed") {
|
||||
expect(part.state.time.compacted).toBeUndefined()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("session.compaction.process", () => {
|
||||
test("publishes compacted event on continue", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
|
||||
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
|
||||
|
||||
const session = await Session.create({})
|
||||
const msg = await user(session.id, "hello")
|
||||
const msgs = await Session.messages({ sessionID: session.id })
|
||||
const done = defer()
|
||||
let seen = false
|
||||
const unsub = Bus.subscribe(SessionCompaction.Event.Compacted, (evt) => {
|
||||
if (evt.properties.sessionID !== session.id) return
|
||||
seen = true
|
||||
done.resolve()
|
||||
})
|
||||
|
||||
const result = await SessionCompaction.process({
|
||||
parentID: msg.id,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
abort: new AbortController().signal,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
await Promise.race([
|
||||
done.promise,
|
||||
wait(500).then(() => {
|
||||
throw new Error("timed out waiting for compacted event")
|
||||
}),
|
||||
])
|
||||
unsub()
|
||||
|
||||
expect(result).toBe("continue")
|
||||
expect(seen).toBe(true)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("marks summary message as errored on compact result", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
|
||||
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "compact"))
|
||||
|
||||
const session = await Session.create({})
|
||||
const msg = await user(session.id, "hello")
|
||||
const result = await SessionCompaction.process({
|
||||
parentID: msg.id,
|
||||
messages: await Session.messages({ sessionID: session.id }),
|
||||
sessionID: session.id,
|
||||
abort: new AbortController().signal,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const summary = (await Session.messages({ sessionID: session.id })).find(
|
||||
(msg) => msg.info.role === "assistant" && msg.info.summary,
|
||||
)
|
||||
|
||||
expect(result).toBe("stop")
|
||||
expect(summary?.info.role).toBe("assistant")
|
||||
if (summary?.info.role === "assistant") {
|
||||
expect(summary.info.finish).toBe("error")
|
||||
expect(JSON.stringify(summary.info.error)).toContain("Session too large to compact")
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("adds synthetic continue prompt when auto is enabled", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
|
||||
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
|
||||
|
||||
const session = await Session.create({})
|
||||
const msg = await user(session.id, "hello")
|
||||
|
||||
const result = await SessionCompaction.process({
|
||||
parentID: msg.id,
|
||||
messages: await Session.messages({ sessionID: session.id }),
|
||||
sessionID: session.id,
|
||||
abort: new AbortController().signal,
|
||||
auto: true,
|
||||
})
|
||||
|
||||
const msgs = await Session.messages({ sessionID: session.id })
|
||||
const last = msgs.at(-1)
|
||||
|
||||
expect(result).toBe("continue")
|
||||
expect(last?.info.role).toBe("user")
|
||||
expect(last?.parts[0]).toMatchObject({
|
||||
type: "text",
|
||||
synthetic: true,
|
||||
})
|
||||
if (last?.parts[0]?.type === "text") {
|
||||
expect(last.parts[0].text).toContain("Continue if you have next steps")
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("replays the prior user turn on overflow when earlier context exists", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
|
||||
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
|
||||
|
||||
const session = await Session.create({})
|
||||
await user(session.id, "root")
|
||||
const replay = await user(session.id, "image")
|
||||
await Session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: replay.id,
|
||||
sessionID: session.id,
|
||||
type: "file",
|
||||
mime: "image/png",
|
||||
filename: "cat.png",
|
||||
url: "https://example.com/cat.png",
|
||||
})
|
||||
const msg = await user(session.id, "current")
|
||||
|
||||
const result = await SessionCompaction.process({
|
||||
parentID: msg.id,
|
||||
messages: await Session.messages({ sessionID: session.id }),
|
||||
sessionID: session.id,
|
||||
abort: new AbortController().signal,
|
||||
auto: true,
|
||||
overflow: true,
|
||||
})
|
||||
|
||||
const last = (await Session.messages({ sessionID: session.id })).at(-1)
|
||||
|
||||
expect(result).toBe("continue")
|
||||
expect(last?.info.role).toBe("user")
|
||||
expect(last?.parts.some((part) => part.type === "file")).toBe(false)
|
||||
expect(
|
||||
last?.parts.some((part) => part.type === "text" && part.text.includes("Attached image/png: cat.png")),
|
||||
).toBe(true)
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("falls back to overflow guidance when no replayable turn exists", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
|
||||
spyOn(SessionProcessorModule.SessionProcessor, "create").mockImplementation((input) => fake(input, "continue"))
|
||||
|
||||
const session = await Session.create({})
|
||||
await user(session.id, "earlier")
|
||||
const msg = await user(session.id, "current")
|
||||
|
||||
const result = await SessionCompaction.process({
|
||||
parentID: msg.id,
|
||||
messages: await Session.messages({ sessionID: session.id }),
|
||||
sessionID: session.id,
|
||||
abort: new AbortController().signal,
|
||||
auto: true,
|
||||
overflow: true,
|
||||
})
|
||||
|
||||
const last = (await Session.messages({ sessionID: session.id })).at(-1)
|
||||
|
||||
expect(result).toBe("continue")
|
||||
expect(last?.info.role).toBe("user")
|
||||
if (last?.parts[0]?.type === "text") {
|
||||
expect(last.parts[0].text).toContain("previous request exceeded the provider's size limit")
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("util.token.estimate", () => {
|
||||
test("estimates tokens from text (4 chars per token)", () => {
|
||||
const text = "x".repeat(4000)
|
||||
|
|
|
|||
Loading…
Reference in New Issue