core: split message part updates into delta events for smoother streaming

Streaming text and reasoning content now uses incremental delta events instead of sending full message parts on each update. This reduces bandwidth and improves real-time response smoothness in the TUI.
feature/workspace-domain
Dax Raad 2026-01-30 11:13:16 -05:00
parent d6fbd255b6
commit 498cbb2c26
7 changed files with 147 additions and 63 deletions

View File

@ -839,6 +839,23 @@ function createGlobalSync() {
)
break
}
case "message.part.delta": {
const parts = store.part[event.properties.messageID]
if (!parts) break
const result = Binary.search(parts, event.properties.partID, (p) => p.id)
if (!result.found) break
setStore(
"part",
event.properties.messageID,
produce((draft) => {
const part = draft[result.index]
const field = event.properties.field as keyof typeof part
const existing = part[field] as string | undefined
;(part[field] as string) = (existing ?? "") + event.properties.delta
}),
)
break
}
case "message.part.removed": {
const messageID = event.properties.messageID
const parts = store.part[messageID]

View File

@ -365,46 +365,68 @@ export namespace ACP {
return
}
}
return
}
if (part.type === "text") {
const delta = props.delta
if (delta && part.ignored !== true) {
await this.connection
.sessionUpdate({
sessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: delta,
},
case "message.part.delta": {
const props = event.properties
const session = this.sessionManager.tryGet(props.sessionID)
if (!session) return
const sessionId = session.id
const message = await this.sdk.session
.message(
{
sessionID: props.sessionID,
messageID: props.messageID,
directory: session.cwd,
},
{ throwOnError: true },
)
.then((x) => x.data)
.catch((error) => {
log.error("unexpected error when fetching message", { error })
return undefined
})
if (!message || message.info.role !== "assistant") return
const part = message.parts.find((p) => p.id === props.partID)
if (!part) return
if (part.type === "text" && props.field === "text" && part.ignored !== true) {
await this.connection
.sessionUpdate({
sessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: props.delta,
},
})
.catch((error) => {
log.error("failed to send text to ACP", { error })
})
}
},
})
.catch((error) => {
log.error("failed to send text delta to ACP", { error })
})
return
}
if (part.type === "reasoning") {
const delta = props.delta
if (delta) {
await this.connection
.sessionUpdate({
sessionId,
update: {
sessionUpdate: "agent_thought_chunk",
content: {
type: "text",
text: delta,
},
if (part.type === "reasoning" && props.field === "text") {
await this.connection
.sessionUpdate({
sessionId,
update: {
sessionUpdate: "agent_thought_chunk",
content: {
type: "text",
text: props.delta,
},
})
.catch((error) => {
log.error("failed to send reasoning to ACP", { error })
})
}
},
})
.catch((error) => {
log.error("failed to send reasoning delta to ACP", { error })
})
}
return
}

View File

@ -299,6 +299,24 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
break
}
case "message.part.delta": {
const parts = store.part[event.properties.messageID]
if (!parts) break
const result = Binary.search(parts, event.properties.partID, (p) => p.id)
if (!result.found) break
setStore(
"part",
event.properties.messageID,
produce((draft) => {
const part = draft[result.index]
const field = event.properties.field as keyof typeof part
const existing = part[field] as string | undefined
;(part[field] as string) = (existing ?? "") + event.properties.delta
}),
)
break
}
case "message.part.removed": {
const parts = store.part[event.properties.messageID]
const result = Binary.search(parts, event.properties.partID, (p) => p.id)

View File

@ -603,21 +603,9 @@ export namespace Session {
},
)
const UpdatePartInput = z.union([
MessageV2.Part,
z.object({
part: MessageV2.TextPart,
delta: z.string(),
}),
z.object({
part: MessageV2.ReasoningPart,
delta: z.string(),
}),
])
const UpdatePartInput = MessageV2.Part
export const updatePart = fn(UpdatePartInput, async (input) => {
const part = "delta" in input ? input.part : input
const delta = "delta" in input ? input.delta : undefined
export const updatePart = fn(UpdatePartInput, async (part) => {
const { id, messageID, sessionID, ...data } = part
const time = Date.now()
Database.use((db) => {
@ -634,13 +622,25 @@ export namespace Session {
Database.effect(() =>
Bus.publish(MessageV2.Event.PartUpdated, {
part,
delta,
}),
)
})
return part
})
export const updatePartDelta = fn(
z.object({
sessionID: z.string(),
messageID: z.string(),
partID: z.string(),
field: z.string(),
delta: z.string(),
}),
async (input) => {
Bus.publish(MessageV2.Event.PartDelta, input)
},
)
export const getUsage = fn(
z.object({
model: z.custom<Provider.Model>(),

View File

@ -417,7 +417,16 @@ export namespace MessageV2 {
"message.part.updated",
z.object({
part: Part,
delta: z.string().optional(),
}),
),
PartDelta: BusEvent.define(
"message.part.delta",
z.object({
sessionID: z.string(),
messageID: z.string(),
partID: z.string(),
field: z.string(),
delta: z.string(),
}),
),
PartRemoved: BusEvent.define(

View File

@ -63,17 +63,19 @@ export namespace SessionProcessor {
if (value.id in reasoningMap) {
continue
}
reasoningMap[value.id] = {
const reasoningPart = {
id: Identifier.ascending("part"),
messageID: input.assistantMessage.id,
sessionID: input.assistantMessage.sessionID,
type: "reasoning",
type: "reasoning" as const,
text: "",
time: {
start: Date.now(),
},
metadata: value.providerMetadata,
}
reasoningMap[value.id] = reasoningPart
await Session.updatePart(reasoningPart)
break
case "reasoning-delta":
@ -81,7 +83,13 @@ export namespace SessionProcessor {
const part = reasoningMap[value.id]
part.text += value.text
if (value.providerMetadata) part.metadata = value.providerMetadata
if (part.text) await Session.updatePart({ part, delta: value.text })
await Session.updatePartDelta({
sessionID: part.sessionID,
messageID: part.messageID,
partID: part.id,
field: "text",
delta: value.text,
})
}
break
@ -288,17 +296,20 @@ export namespace SessionProcessor {
},
metadata: value.providerMetadata,
}
await Session.updatePart(currentText)
break
case "text-delta":
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
if (currentText.text)
await Session.updatePart({
part: currentText,
delta: value.text,
})
await Session.updatePartDelta({
sessionID: currentText.sessionID,
messageID: currentText.messageID,
partID: currentText.id,
field: "text",
delta: value.text,
})
}
break

View File

@ -480,7 +480,17 @@ export type EventMessagePartUpdated = {
type: "message.part.updated"
properties: {
part: Part
delta?: string
}
}
export type EventMessagePartDelta = {
type: "message.part.delta"
properties: {
sessionID: string
messageID: string
partID: string
field: string
delta: string
}
}
@ -650,10 +660,6 @@ export type Todo = {
* Priority level of the task: high, medium, low
*/
priority: string
/**
* Unique identifier for the todo item
*/
id: string
}
export type EventTodoUpdated = {
@ -896,6 +902,7 @@ export type Event =
| EventMessageUpdated
| EventMessageRemoved
| EventMessagePartUpdated
| EventMessagePartDelta
| EventMessagePartRemoved
| EventPermissionAsked
| EventPermissionReplied