From 498cbb2c269370ba8a8eeaad836e2eb50f599fe1 Mon Sep 17 00:00:00 2001 From: Dax Raad Date: Fri, 30 Jan 2026 11:13:16 -0500 Subject: [PATCH] core: split message part updates into delta events for smoother streaming Streaming text and reasoning content now uses incremental delta events instead of sending full message parts on each update. This reduces bandwidth and improves real-time response smoothness in the TUI. --- packages/app/src/context/global-sync.tsx | 17 ++++ packages/opencode/src/acp/agent.ts | 90 ++++++++++++------- .../opencode/src/cli/cmd/tui/context/sync.tsx | 18 ++++ packages/opencode/src/session/index.ts | 30 +++---- packages/opencode/src/session/message-v2.ts | 11 ++- packages/opencode/src/session/processor.ts | 27 ++++-- packages/sdk/js/src/v2/gen/types.gen.ts | 17 ++-- 7 files changed, 147 insertions(+), 63 deletions(-) diff --git a/packages/app/src/context/global-sync.tsx b/packages/app/src/context/global-sync.tsx index ad3d124b2c..12fe37acf3 100644 --- a/packages/app/src/context/global-sync.tsx +++ b/packages/app/src/context/global-sync.tsx @@ -839,6 +839,23 @@ function createGlobalSync() { ) break } + case "message.part.delta": { + const parts = store.part[event.properties.messageID] + if (!parts) break + const result = Binary.search(parts, event.properties.partID, (p) => p.id) + if (!result.found) break + setStore( + "part", + event.properties.messageID, + produce((draft) => { + const part = draft[result.index] + const field = event.properties.field as keyof typeof part + const existing = part[field] as string | undefined + ;(part[field] as string) = (existing ?? "") + event.properties.delta + }), + ) + break + } case "message.part.removed": { const messageID = event.properties.messageID const parts = store.part[messageID] diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index cc9a029a04..2af375b996 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -365,46 +365,68 @@ export namespace ACP { return } } + return + } - if (part.type === "text") { - const delta = props.delta - if (delta && part.ignored !== true) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, + case "message.part.delta": { + const props = event.properties + const session = this.sessionManager.tryGet(props.sessionID) + if (!session) return + const sessionId = session.id + + const message = await this.sdk.session + .message( + { + sessionID: props.sessionID, + messageID: props.messageID, + directory: session.cwd, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((error) => { + log.error("unexpected error when fetching message", { error }) + return undefined + }) + + if (!message || message.info.role !== "assistant") return + + const part = message.parts.find((p) => p.id === props.partID) + if (!part) return + + if (part.type === "text" && props.field === "text" && part.ignored !== true) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: props.delta, }, - }) - .catch((error) => { - log.error("failed to send text to ACP", { error }) - }) - } + }, + }) + .catch((error) => { + log.error("failed to send text delta to ACP", { error }) + }) return } - if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: delta, - }, + if (part.type === "reasoning" && props.field === "text") { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: props.delta, }, - }) - .catch((error) => { - log.error("failed to send reasoning to ACP", { error }) - }) - } + }, + }) + .catch((error) => { + log.error("failed to send reasoning delta to ACP", { error }) + }) } return } diff --git a/packages/opencode/src/cli/cmd/tui/context/sync.tsx b/packages/opencode/src/cli/cmd/tui/context/sync.tsx index eb8ed2d9bb..269ed7ae0b 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sync.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sync.tsx @@ -299,6 +299,24 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ break } + case "message.part.delta": { + const parts = store.part[event.properties.messageID] + if (!parts) break + const result = Binary.search(parts, event.properties.partID, (p) => p.id) + if (!result.found) break + setStore( + "part", + event.properties.messageID, + produce((draft) => { + const part = draft[result.index] + const field = event.properties.field as keyof typeof part + const existing = part[field] as string | undefined + ;(part[field] as string) = (existing ?? "") + event.properties.delta + }), + ) + break + } + case "message.part.removed": { const parts = store.part[event.properties.messageID] const result = Binary.search(parts, event.properties.partID, (p) => p.id) diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 2d506e2e96..e87e4e4e44 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -603,21 +603,9 @@ export namespace Session { }, ) - const UpdatePartInput = z.union([ - MessageV2.Part, - z.object({ - part: MessageV2.TextPart, - delta: z.string(), - }), - z.object({ - part: MessageV2.ReasoningPart, - delta: z.string(), - }), - ]) + const UpdatePartInput = MessageV2.Part - export const updatePart = fn(UpdatePartInput, async (input) => { - const part = "delta" in input ? input.part : input - const delta = "delta" in input ? input.delta : undefined + export const updatePart = fn(UpdatePartInput, async (part) => { const { id, messageID, sessionID, ...data } = part const time = Date.now() Database.use((db) => { @@ -634,13 +622,25 @@ export namespace Session { Database.effect(() => Bus.publish(MessageV2.Event.PartUpdated, { part, - delta, }), ) }) return part }) + export const updatePartDelta = fn( + z.object({ + sessionID: z.string(), + messageID: z.string(), + partID: z.string(), + field: z.string(), + delta: z.string(), + }), + async (input) => { + Bus.publish(MessageV2.Event.PartDelta, input) + }, + ) + export const getUsage = fn( z.object({ model: z.custom(), diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index f5a4d7abbd..1b6bf04a84 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -417,7 +417,16 @@ export namespace MessageV2 { "message.part.updated", z.object({ part: Part, - delta: z.string().optional(), + }), + ), + PartDelta: BusEvent.define( + "message.part.delta", + z.object({ + sessionID: z.string(), + messageID: z.string(), + partID: z.string(), + field: z.string(), + delta: z.string(), }), ), PartRemoved: BusEvent.define( diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 2707105618..826d0842cc 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -63,17 +63,19 @@ export namespace SessionProcessor { if (value.id in reasoningMap) { continue } - reasoningMap[value.id] = { + const reasoningPart = { id: Identifier.ascending("part"), messageID: input.assistantMessage.id, sessionID: input.assistantMessage.sessionID, - type: "reasoning", + type: "reasoning" as const, text: "", time: { start: Date.now(), }, metadata: value.providerMetadata, } + reasoningMap[value.id] = reasoningPart + await Session.updatePart(reasoningPart) break case "reasoning-delta": @@ -81,7 +83,13 @@ export namespace SessionProcessor { const part = reasoningMap[value.id] part.text += value.text if (value.providerMetadata) part.metadata = value.providerMetadata - if (part.text) await Session.updatePart({ part, delta: value.text }) + await Session.updatePartDelta({ + sessionID: part.sessionID, + messageID: part.messageID, + partID: part.id, + field: "text", + delta: value.text, + }) } break @@ -288,17 +296,20 @@ export namespace SessionProcessor { }, metadata: value.providerMetadata, } + await Session.updatePart(currentText) break case "text-delta": if (currentText) { currentText.text += value.text if (value.providerMetadata) currentText.metadata = value.providerMetadata - if (currentText.text) - await Session.updatePart({ - part: currentText, - delta: value.text, - }) + await Session.updatePartDelta({ + sessionID: currentText.sessionID, + messageID: currentText.messageID, + partID: currentText.id, + field: "text", + delta: value.text, + }) } break diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 8555e84384..20fc5920df 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -480,7 +480,17 @@ export type EventMessagePartUpdated = { type: "message.part.updated" properties: { part: Part - delta?: string + } +} + +export type EventMessagePartDelta = { + type: "message.part.delta" + properties: { + sessionID: string + messageID: string + partID: string + field: string + delta: string } } @@ -650,10 +660,6 @@ export type Todo = { * Priority level of the task: high, medium, low */ priority: string - /** - * Unique identifier for the todo item - */ - id: string } export type EventTodoUpdated = { @@ -896,6 +902,7 @@ export type Event = | EventMessageUpdated | EventMessageRemoved | EventMessagePartUpdated + | EventMessagePartDelta | EventMessagePartRemoved | EventPermissionAsked | EventPermissionReplied