fix(acp): serialize session updates to prevent out-of-order messages

All ACP sessionUpdate calls are now routed through a promise chain
(`sendUpdate`) that guarantees ordering. After each prompt completes,
a reconciliation step fetches the final assistant message and delivers
any text/reasoning content that was missed by SSE delta events — fixing
a race where streaming events arrive after end_turn.

Also adds handling for the `question.asked` event via the ACP permission
flow, and updates the question tool to use a two-step async pattern.
pull/21473/head
LydiaCai 2026-04-07 16:26:11 +08:00
parent ae614d919f
commit b2ea09a97d
4 changed files with 335 additions and 100 deletions

View File

@ -48,6 +48,7 @@ import { z } from "zod"
import { LoadAPIKeyError } from "ai"
import type { AssistantMessage, Event, OpencodeClient, SessionMessageResponse, ToolPart } from "@opencode-ai/sdk/v2"
import { applyPatch } from "diff"
import { Question } from "@/question"
type ModeOption = { id: string; name: string; description?: string }
type ModelOption = { modelId: string; name: string }
@ -77,7 +78,7 @@ export namespace ACP {
}
async function sendUsageUpdate(
connection: AgentSideConnection,
agent: Agent,
sdk: OpencodeClient,
sessionID: string,
directory: string,
@ -111,8 +112,7 @@ export namespace ACP {
const used = msg.tokens.input + (msg.tokens.cache?.read ?? 0)
const totalCost = assistantMessages.reduce((sum, m) => sum + m.info.cost, 0)
await connection
.sessionUpdate({
await agent.sendSessionUpdate({
sessionId: sessionID,
update: {
sessionUpdate: "usage_update",
@ -121,9 +121,6 @@ export namespace ACP {
cost: { amount: totalCost, currency: "USD" },
},
})
.catch((error) => {
log.error("failed to send usage update", { error })
})
}
export async function init({ sdk: _sdk }: { sdk: OpencodeClient }) {
@ -144,12 +141,90 @@ export namespace ACP {
private bashSnapshots = new Map<string, string>()
private toolStarts = new Set<string>()
private permissionQueues = new Map<string, Promise<void>>()
private questionQueues = new Map<string, Promise<void>>()
private permissionOptions: PermissionOption[] = [
{ optionId: "once", kind: "allow_once", name: "Allow once" },
{ optionId: "always", kind: "allow_always", name: "Always allow" },
{ optionId: "reject", kind: "reject_once", name: "Reject" },
]
// Promise chain to serialize all sessionUpdate calls, ensuring ordering.
private sendChain: Promise<void> = Promise.resolve()
// Track characters sent per partID via delta events, for post-prompt reconciliation.
private sentDeltaChars = new Map<string, number>()
// Parts that have been reconciled — delta handler must skip these to avoid duplicates.
private reconciledParts = new Set<string>()
private sendUpdate(
params: Parameters<AgentSideConnection["sessionUpdate"]>[0],
): Promise<void> {
const next = this.sendChain.then(() =>
this.connection.sessionUpdate(params).catch((error) => {
log.error("failed to send session update to ACP", { error })
}),
)
this.sendChain = next.catch(() => {})
return next
}
private drainUpdates(): Promise<void> {
return this.sendChain
}
/**
* After prompt() returns, fetch the complete assistant message and send any
* text/reasoning content that was not already delivered via delta events.
* This handles the race where SSE events haven't arrived before end_turn.
*/
private async reconcileAssistantMessage(sessionId: string, directory: string): Promise<void> {
const messages = await this.sdk.session
.messages({ sessionID: sessionId, directory }, { throwOnError: true })
.then((x) => x.data)
.catch((error) => {
log.error("failed to fetch messages for reconciliation", { error })
return undefined
})
if (!messages) return
const lastAssistant = messages.findLast((m) => m.info.role === "assistant")
if (!lastAssistant) return
for (const part of lastAssistant.parts) {
if (part.type === "text" && part.text) {
// Mark reconciled BEFORE reading sentDeltaChars, so any delta events
// arriving during the await below will be skipped.
this.reconciledParts.add(part.id)
const sent = this.sentDeltaChars.get(part.id) ?? 0
this.sentDeltaChars.delete(part.id)
if (sent < part.text.length) {
const remaining = part.text.substring(sent)
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: remaining },
},
})
}
} else if (part.type === "reasoning" && part.text) {
this.reconciledParts.add(part.id)
const sent = this.sentDeltaChars.get(part.id) ?? 0
this.sentDeltaChars.delete(part.id)
if (sent < part.text.length) {
const remaining = part.text.substring(sent)
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: remaining },
},
})
}
}
}
}
constructor(connection: AgentSideConnection, config: ACPConfig) {
this.connection = connection
this.config = config
@ -267,6 +342,200 @@ export namespace ACP {
return
}
case "question.asked": {
const question = event.properties
const session = this.sessionManager.tryGet(question.sessionID)
if (!session) return
const prev = this.questionQueues.get(question.sessionID) ?? Promise.resolve()
const next = prev
.then(async () => {
const directory = session.cwd
// Build flattened options for all questions: q{questionIndex}_opt{optionIndex}
const options: PermissionOption[] = []
for (let qIdx = 0; qIdx < question.questions.length; qIdx++) {
const q = question.questions[qIdx]
for (let optIdx = 0; optIdx < q.options.length; optIdx++) {
options.push({
optionId: `q${qIdx}_opt${optIdx}`,
kind: "allow_once" as const,
name: q.options[optIdx].label,
})
}
}
// Build title from first question (or combine headers)
const firstQ = question.questions[0]
const title = firstQ.header ?? firstQ.question
// Build rawInput with questions array (matching cc.json format)
const rawInput = {
questions: question.questions.map((q: Question.Info) => ({
question: q.question,
header: q.header,
options: q.options,
multiSelect: q.multiple ?? false,
})),
}
// Build _meta with full questions array (matching cc.json format)
const metaQuestions = question.questions.map((q: Question.Info) => ({
question: q.question,
header: q.header,
options: q.options,
multiSelect: q.multiple ?? false,
}))
const res = await this.connection
.requestPermission({
sessionId: question.sessionID,
toolCall: {
toolCallId: question.tool?.callID ?? question.id,
title,
rawInput,
},
options,
_meta: {
askUserQuestion: {
questions: metaQuestions,
},
},
})
.catch(async (error) => {
log.error("failed to request permission for question from ACP", {
error,
questionID: question.id,
sessionID: question.sessionID,
})
await this.sdk.question.reject({
requestID: question.id,
directory,
})
return undefined
})
if (!res) {
await this.sdk.question.reject({
requestID: question.id,
directory,
})
return
}
if (res.outcome.outcome == "cancelled") {
await this.sdk.question.reject({
requestID: question.id,
directory,
})
// Send tool_call_update to client
await this.sendUpdate({
sessionId: question.sessionID,
update: {
sessionUpdate: "tool_call_update",
toolCallId: question.tool?.callID ?? question.id,
status: "failed",
content: [
{
type: "content",
content: {
type: "text",
text: "```\nTool permission request failed: Error: Question cancelled\n```",
},
},
],
_meta: {
opencode: {
toolName: "AskUserQuestion",
},
},
},
})
return
}
if (res.outcome.outcome !== "selected") {
await this.sdk.question.reject({
requestID: question.id,
directory,
})
return
}
// Parse response to build answers array
const answers: string[][] = []
// Check for _meta.answers format: { "question text": ["answer1", "answer2"] }
const metaAnswers = (res._meta as Record<string, unknown> | undefined)?.answers as
| Record<string, string[]>
| undefined
if (metaAnswers && typeof metaAnswers === "object") {
for (const questionItem of question.questions) {
const key = questionItem.header
? `${questionItem.header}: ${questionItem.question}`
: questionItem.question
const answer = metaAnswers[key] ?? metaAnswers[questionItem.question] ?? []
answers.push(Array.isArray(answer) ? answer : [answer])
}
} else {
// Parse optionId(s) in format: q{qIdx}_opt{optIdx} or q{qIdx}_other
const optionId = res.outcome.optionId
// Initialize answers array with empty arrays for each question
for (let i = 0; i < question.questions.length; i++) {
answers.push([])
}
// Parse the optionId - could be single or multiple (comma-separated)
const selectedIds = optionId.split(",").map((s: string) => s.trim())
for (const selId of selectedIds) {
const match = selId.match(/^q(\d+)_(opt(\d+)|other(?::(.*))?)?$/)
if (match) {
const qIdx = parseInt(match[1], 10)
if (qIdx < question.questions.length) {
if (match[2]?.startsWith("opt")) {
const optIdx = parseInt(match[3], 10)
const q = question.questions[qIdx]
if (optIdx < q.options.length) {
answers[qIdx].push(q.options[optIdx].label)
}
} else if (match[2]?.startsWith("other")) {
// Custom "Other" answer: q{idx}_other:customText
const customText = match[4] ?? ""
answers[qIdx].push(customText)
}
}
}
}
}
// Send all answers at once
await this.sdk.question
.reply(
{
requestID: question.id,
answers,
directory,
},
{ throwOnError: true },
)
.catch((error) => {
log.error("failed to reply to question", { error, questionID: question.id })
})
})
.catch((error: unknown) => {
log.error("failed to handle question", { error, questionID: question.id })
})
.finally(() => {
if (this.questionQueues.get(question.sessionID) === next) {
this.questionQueues.delete(question.sessionID)
}
})
this.questionQueues.set(question.sessionID, next)
return
}
case "message.part.updated": {
log.info("message part updated", { event: event.properties })
const props = event.properties
@ -290,8 +559,7 @@ export namespace ACP {
const hash = Hash.fast(output)
if (part.tool === "bash") {
if (this.bashSnapshots.get(part.callID) === hash) {
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -303,9 +571,6 @@ export namespace ACP {
rawInput: part.state.input,
},
})
.catch((error) => {
log.error("failed to send tool in_progress to ACP", { error })
})
return
}
this.bashSnapshots.set(part.callID, hash)
@ -318,8 +583,7 @@ export namespace ACP {
},
})
}
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -332,9 +596,6 @@ export namespace ACP {
...(content.length > 0 && { content }),
},
})
.catch((error) => {
log.error("failed to send tool in_progress to ACP", { error })
})
return
case "completed": {
@ -372,8 +633,7 @@ export namespace ACP {
if (part.tool === "todowrite") {
const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
if (parsedTodos.success) {
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "plan",
@ -388,16 +648,12 @@ export namespace ACP {
}),
},
})
.catch((error) => {
log.error("failed to send session update for todo", { error })
})
} else {
log.error("failed to parse todo output", { error: parsedTodos.error })
}
}
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -413,16 +669,12 @@ export namespace ACP {
},
},
})
.catch((error) => {
log.error("failed to send tool completed to ACP", { error })
})
return
}
case "error":
this.toolStarts.delete(part.callID)
this.bashSnapshots.delete(part.callID)
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -446,9 +698,6 @@ export namespace ACP {
},
},
})
.catch((error) => {
log.error("failed to send tool error to ACP", { error })
})
return
}
}
@ -494,9 +743,12 @@ export namespace ACP {
const part = message.parts.find((p) => p.id === props.partID)
if (!part) return
// Skip deltas for parts already reconciled after prompt() returned.
if (this.reconciledParts.has(props.partID)) return
if (part.type === "text" && props.field === "text" && part.ignored !== true) {
await this.connection
.sessionUpdate({
this.sentDeltaChars.set(props.partID, (this.sentDeltaChars.get(props.partID) ?? 0) + props.delta.length)
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "agent_message_chunk",
@ -507,15 +759,12 @@ export namespace ACP {
},
},
})
.catch((error) => {
log.error("failed to send text delta to ACP", { error })
})
return
}
if (part.type === "reasoning" && props.field === "text") {
await this.connection
.sessionUpdate({
this.sentDeltaChars.set(props.partID, (this.sentDeltaChars.get(props.partID) ?? 0) + props.delta.length)
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "agent_thought_chunk",
@ -526,9 +775,6 @@ export namespace ACP {
},
},
})
.catch((error) => {
log.error("failed to send reasoning delta to ACP", { error })
})
}
return
}
@ -676,7 +922,7 @@ export namespace ACP {
await this.processMessage(msg)
}
await sendUsageUpdate(this.connection, this.sdk, sessionId, directory)
await sendUsageUpdate(this, this.sdk, sessionId, directory)
return result
} catch (e) {
@ -786,7 +1032,7 @@ export namespace ACP {
await this.processMessage(msg)
}
await sendUsageUpdate(this.connection, this.sdk, sessionId, directory)
await sendUsageUpdate(this, this.sdk, sessionId, directory)
return mode
} catch (e) {
@ -817,7 +1063,7 @@ export namespace ACP {
sessionId,
})
await sendUsageUpdate(this.connection, this.sdk, sessionId, directory)
await sendUsageUpdate(this, this.sdk, sessionId, directory)
return result
} catch (e) {
@ -855,8 +1101,7 @@ export namespace ACP {
},
})
}
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -869,9 +1114,6 @@ export namespace ACP {
...(runningContent.length > 0 && { content: runningContent }),
},
})
.catch((err) => {
log.error("failed to send tool in_progress to ACP", { error: err })
})
break
case "completed":
this.toolStarts.delete(part.callID)
@ -908,8 +1150,7 @@ export namespace ACP {
if (part.tool === "todowrite") {
const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output))
if (parsedTodos.success) {
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "plan",
@ -924,16 +1165,12 @@ export namespace ACP {
}),
},
})
.catch((err) => {
log.error("failed to send session update for todo", { error: err })
})
} else {
log.error("failed to parse todo output", { error: parsedTodos.error })
}
}
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -949,15 +1186,11 @@ export namespace ACP {
},
},
})
.catch((err) => {
log.error("failed to send tool completed to ACP", { error: err })
})
break
case "error":
this.toolStarts.delete(part.callID)
this.bashSnapshots.delete(part.callID)
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call_update",
@ -981,16 +1214,12 @@ export namespace ACP {
},
},
})
.catch((err) => {
log.error("failed to send tool error to ACP", { error: err })
})
break
}
} else if (part.type === "text") {
if (part.text) {
const audience: Role[] | undefined = part.synthetic ? ["assistant"] : part.ignored ? ["user"] : undefined
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: message.info.role === "user" ? "user_message_chunk" : "agent_message_chunk",
@ -1002,9 +1231,6 @@ export namespace ACP {
},
},
})
.catch((err) => {
log.error("failed to send text to ACP", { error: err })
})
}
} else if (part.type === "file") {
// Replay file attachments as appropriate ACP content blocks.
@ -1021,8 +1247,7 @@ export namespace ACP {
if (url.startsWith("file://")) {
// Local file reference - send as resource_link
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: messageChunk,
@ -1030,9 +1255,6 @@ export namespace ACP {
content: { type: "resource_link", uri: url, name: filename, mimeType: mime },
},
})
.catch((err) => {
log.error("failed to send resource_link to ACP", { error: err })
})
} else if (url.startsWith("data:")) {
// Embedded content - parse data URL and send as appropriate block type
const base64Match = url.match(/^data:([^;]+);base64,(.*)$/)
@ -1043,8 +1265,7 @@ export namespace ACP {
if (effectiveMime.startsWith("image/")) {
// Image - send as image block
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: messageChunk,
@ -1057,9 +1278,6 @@ export namespace ACP {
},
},
})
.catch((err) => {
log.error("failed to send image to ACP", { error: err })
})
} else {
// Non-image: text types get decoded, binary types stay as blob
const isText = effectiveMime.startsWith("text/") || effectiveMime === "application/json"
@ -1072,8 +1290,7 @@ export namespace ACP {
}
: { uri: fileUri, mimeType: effectiveMime, blob: base64Data }
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: messageChunk,
@ -1081,16 +1298,12 @@ export namespace ACP {
content: { type: "resource", resource },
},
})
.catch((err) => {
log.error("failed to send resource to ACP", { error: err })
})
}
}
// URLs that don't match file:// or data: are skipped (unsupported)
} else if (part.type === "reasoning") {
if (part.text) {
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "agent_thought_chunk",
@ -1101,9 +1314,6 @@ export namespace ACP {
},
},
})
.catch((err) => {
log.error("failed to send reasoning to ACP", { error: err })
})
}
}
}
@ -1120,8 +1330,7 @@ export namespace ACP {
private async toolStart(sessionId: string, part: ToolPart) {
if (this.toolStarts.has(part.callID)) return
this.toolStarts.add(part.callID)
await this.connection
.sessionUpdate({
await this.sendUpdate({
sessionId,
update: {
sessionUpdate: "tool_call",
@ -1133,9 +1342,6 @@ export namespace ACP {
rawInput: {},
},
})
.catch((error) => {
log.error("failed to send tool pending to ACP", { error })
})
}
private async loadAvailableModes(directory: string): Promise<ModeOption[]> {
@ -1259,7 +1465,7 @@ export namespace ACP {
)
setTimeout(() => {
this.connection.sessionUpdate({
this.sendUpdate({
sessionId,
update: {
sessionUpdate: "available_commands_update",
@ -1358,6 +1564,10 @@ export namespace ACP {
}
async prompt(params: PromptRequest) {
// Clear stale reconciliation state from previous prompt.
this.reconciledParts.clear()
this.sentDeltaChars.clear()
const sessionID = params.sessionId
const session = this.sessionManager.get(sessionID)
const directory = session.cwd
@ -1486,7 +1696,9 @@ export namespace ACP {
})
const msg = response.data?.info
await sendUsageUpdate(this.connection, this.sdk, sessionID, directory)
await this.drainUpdates()
await this.reconcileAssistantMessage(sessionID, directory)
await sendUsageUpdate(this, this.sdk, sessionID, directory)
return {
stopReason: "end_turn" as const,
@ -1509,7 +1721,9 @@ export namespace ACP {
})
const msg = response.data?.info
await sendUsageUpdate(this.connection, this.sdk, sessionID, directory)
await this.drainUpdates()
await this.reconcileAssistantMessage(sessionID, directory)
await sendUsageUpdate(this, this.sdk, sessionID, directory)
return {
stopReason: "end_turn" as const,
@ -1532,7 +1746,9 @@ export namespace ACP {
break
}
await sendUsageUpdate(this.connection, this.sdk, sessionID, directory)
await this.drainUpdates()
await this.reconcileAssistantMessage(sessionID, directory)
await sendUsageUpdate(this, this.sdk, sessionID, directory)
return {
stopReason: "end_turn" as const,
@ -1550,6 +1766,14 @@ export namespace ACP {
{ throwOnError: true },
)
}
hasSession(sessionID: string): boolean {
return this.sessionManager.tryGet(sessionID) !== undefined
}
async sendSessionUpdate(params: Parameters<AgentSideConnection["sessionUpdate"]>[0]) {
await this.sendUpdate(params)
}
}
function toToolKind(toolName: string): ToolKind {

View File

@ -484,7 +484,15 @@ export namespace SessionProcessor {
yield* abort()
}
if (ctx.needsCompaction) return "compact"
if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
if (ctx.blocked || ctx.assistantMessage.error || aborted) {
log.info("process stop", {
sessionID: ctx.sessionID,
blocked: ctx.blocked,
error: ctx.assistantMessage.error,
aborted,
})
return "stop"
}
return "continue"
}).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid)))
})

View File

@ -21,7 +21,7 @@ export const QuestionTool = Tool.defineEffect<typeof parameters, Metadata, Quest
description: DESCRIPTION,
parameters,
async execute(params: z.infer<typeof parameters>, ctx: Tool.Context<Metadata>) {
const answers = await question
const result = await question
.ask({
sessionID: ctx.sessionID,
questions: params.questions,
@ -29,6 +29,8 @@ export const QuestionTool = Tool.defineEffect<typeof parameters, Metadata, Quest
})
.pipe(Effect.runPromise)
const answers = result
const formatted = params.questions
.map((q, i) => `"${q.question}"="${answers[i]?.length ? answers[i].join(", ") : "Unanswered"}"`)
.join(", ")

View File

@ -17,6 +17,7 @@ const ctx = {
messages: [],
metadata: () => {},
ask: async () => {},
question: async () => [],
}
const it = testEffect(Layer.mergeAll(Question.defaultLayer, CrossSpawnSpawner.defaultLayer))