From 9c501b1583a1e63f67afdf718b6eac4c03bc811b Mon Sep 17 00:00:00 2001 From: Adam <2363879+adamdotdevin@users.noreply.github.com> Date: Thu, 19 Feb 2026 16:35:29 -0600 Subject: [PATCH] wip(app): node-pty --- bun.lock | 8 +- packages/app/src/components/terminal.tsx | 74 ++++++++-- packages/opencode/package.json | 2 +- packages/opencode/src/pty/index.ts | 117 +++++++++++----- packages/opencode/src/server/routes/pty.ts | 21 ++- packages/opencode/src/server/server.ts | 2 +- packages/opencode/src/server/websocket.ts | 1 + .../test/pty/pty-output-isolation.test.ts | 132 ++++++++++++++++-- 8 files changed, 283 insertions(+), 74 deletions(-) create mode 100644 packages/opencode/src/server/websocket.ts diff --git a/bun.lock b/bun.lock index e87f700f06..fd44d944fd 100644 --- a/bun.lock +++ b/bun.lock @@ -315,7 +315,6 @@ "ai": "catalog:", "ai-gateway-provider": "2.3.1", "bonjour-service": "1.3.0", - "bun-pty": "0.4.8", "chokidar": "4.0.3", "clipboardy": "4.0.0", "decimal.js": "10.5.0", @@ -331,6 +330,7 @@ "jsonc-parser": "3.3.1", "mime-types": "3.0.2", "minimatch": "10.0.3", + "node-pty": "1.0.0", "open": "10.1.2", "opentui-spinner": "0.0.6", "partial-json": "0.1.7", @@ -2222,8 +2222,6 @@ "bun-ffi-structs": ["bun-ffi-structs@0.1.2", "", { "peerDependencies": { "typescript": "^5" } }, "sha512-Lh1oQAYHDcnesJauieA4UNkWGXY9hYck7OA5IaRwE3Bp6K2F2pJSNYqq+hIy7P3uOvo3km3oxS8304g5gDMl/w=="], - "bun-pty": ["bun-pty@0.4.8", "", {}, "sha512-rO70Mrbr13+jxHHHu2YBkk2pNqrJE5cJn29WE++PUr+GFA0hq/VgtQPZANJ8dJo6d7XImvBk37Innt8GM7O28w=="], - "bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="], "bun-webgpu": ["bun-webgpu@0.1.4", "", { "dependencies": { "@webgpu/types": "^0.1.60" }, "optionalDependencies": { "bun-webgpu-darwin-arm64": "^0.1.4", "bun-webgpu-darwin-x64": "^0.1.4", "bun-webgpu-linux-x64": "^0.1.4", "bun-webgpu-win32-x64": "^0.1.4" } }, "sha512-Kw+HoXl1PMWJTh9wvh63SSRofTA8vYBFCw0XEP1V1fFdQEDhI8Sgf73sdndE/oDpN/7CMx0Yv/q8FCvO39ROMQ=="], @@ -3266,6 +3264,8 @@ "named-placeholders": ["named-placeholders@1.1.6", "", { "dependencies": { "lru.min": "^1.1.0" } }, "sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w=="], + "nan": ["nan@2.25.0", "", {}, "sha512-0M90Ag7Xn5KMLLZ7zliPWP3rT90P6PN+IzVFS0VqmnPktBk3700xUVv8Ikm9EUaUE5SDWdp/BIxdENzVznpm1g=="], + "nanoevents": ["nanoevents@7.0.1", "", {}, "sha512-o6lpKiCxLeijK4hgsqfR6CNToPyRU3keKyyI6uwuHRvpRTbZ0wXw51WRgyldVugZqoJfkGFrjrIenYH3bfEO3Q=="], "nanoid": ["nanoid@3.3.11", "", { "bin": { "nanoid": "bin/nanoid.cjs" } }, "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w=="], @@ -3298,6 +3298,8 @@ "node-mock-http": ["node-mock-http@1.0.4", "", {}, "sha512-8DY+kFsDkNXy1sJglUfuODx1/opAGJGyrTuFqEoN90oRc2Vk0ZbD4K2qmKXBBEhZQzdKHIVfEJpDU8Ak2NJEvQ=="], + "node-pty": ["node-pty@1.0.0", "", { "dependencies": { "nan": "^2.17.0" } }, "sha512-wtBMWWS7dFZm/VgqElrTvtfMq4GzJ6+edFI0Y0zyzygUSZMgZdraDUMUhCIvkjhJjme15qWmbyJbtAx4ot4uZA=="], + "node-releases": ["node-releases@2.0.27", "", {}, "sha512-nmh3lCkYZ3grZvqcCH+fjmQ7X+H0OeZgP40OierEaAptX4XofMh5kwNbWh7lBduUzCcV/8kZ+NDLCwm2iorIlA=="], "nopt": ["nopt@7.2.1", "", { "dependencies": { "abbrev": "^2.0.0" }, "bin": { "nopt": "bin/nopt.js" } }, "sha512-taM24ViiimT/XntxbPyJQzCG+p4EKOpgD3mxFwW38mGjVUrfERQOeY4EDHjdnptttfHuHQXFx+lTP08Q+mLa/w=="], diff --git a/packages/app/src/components/terminal.tsx b/packages/app/src/components/terminal.tsx index bd7ab24475..da33d260dc 100644 --- a/packages/app/src/components/terminal.tsx +++ b/packages/app/src/components/terminal.tsx @@ -15,6 +15,40 @@ import { terminalWriter } from "@/utils/terminal-writer" const TOGGLE_TERMINAL_ID = "terminal.toggle" const DEFAULT_TOGGLE_TERMINAL_KEYBIND = "ctrl+`" +const FRAME_META = 0 +const FRAME_OUTPUT = 1 +const FRAME_INPUT = 2 +const encoder = new TextEncoder() + +const connection = () => { + if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") { + return crypto.randomUUID() + } + return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}` +} + +const frameInput = (id: string, data: string) => { + const channel = encoder.encode(id) + const body = encoder.encode(data) + const out = new Uint8Array(2 + channel.length + body.length) + out[0] = FRAME_INPUT + out[1] = channel.length + out.set(channel, 2) + out.set(body, 2 + channel.length) + return out +} + +const frameOutput = (bytes: Uint8Array, decoder: TextDecoder) => { + if (bytes[0] !== FRAME_OUTPUT) return + const size = bytes[1] + if (!Number.isSafeInteger(size) || size < 0) return + if (bytes.length < 2 + size) return + return { + connection: decoder.decode(bytes.subarray(2, 2 + size)), + data: decoder.decode(bytes.subarray(2 + size)), + } +} + export interface TerminalProps extends ComponentProps<"div"> { pty: LocalPTY onSubmit?: () => void @@ -396,8 +430,10 @@ export const Terminal = (props: TerminalProps) => { scheduleSize(size.cols, size.rows) }) cleanups.push(() => disposeIfDisposable(onResize)) + const connectionID = connection() const onData = t.onData((data) => { - if (ws?.readyState === WebSocket.OPEN) ws.send(data) + if (ws?.readyState !== WebSocket.OPEN) return + ws.send(frameInput(connectionID, data)) }) cleanups.push(() => disposeIfDisposable(onData)) const onKey = t.onKey((key) => { @@ -450,6 +486,7 @@ export const Terminal = (props: TerminalProps) => { const url = new URL(sdk.url + `/pty/${local.pty.id}/connect`) url.searchParams.set("directory", sdk.directory) url.searchParams.set("cursor", String(start !== undefined ? start : local.pty.buffer ? -1 : 0)) + url.searchParams.set("connection", connectionID) url.protocol = url.protocol === "https:" ? "wss:" : "ws:" url.username = server.current?.http.username ?? "" url.password = server.current?.http.password ?? "" @@ -471,24 +508,33 @@ export const Terminal = (props: TerminalProps) => { if (closing) return if (event.data instanceof ArrayBuffer) { const bytes = new Uint8Array(event.data) - if (bytes[0] !== 0) return - const json = decoder.decode(bytes.subarray(1)) - try { - const meta = JSON.parse(json) as { cursor?: unknown } - const next = meta?.cursor - if (typeof next === "number" && Number.isSafeInteger(next) && next >= 0) { - cursor = next + if (bytes[0] === FRAME_META) { + const json = decoder.decode(bytes.subarray(1)) + try { + const meta = JSON.parse(json) as { cursor?: unknown; connection?: unknown } + if (typeof meta?.connection === "string" && meta.connection !== connectionID) return + const next = meta?.cursor + if (typeof next === "number" && Number.isSafeInteger(next) && next >= 0) { + cursor = next + } + } catch (err) { + debugTerminal("invalid websocket control frame", err) } - } catch (err) { - debugTerminal("invalid websocket control frame", err) + return } + + const frame = frameOutput(bytes, decoder) + if (!frame) return + if (frame.connection !== connectionID) return + if (!frame.data) return + output?.push(frame.data) + cursor += frame.data.length return } - const data = typeof event.data === "string" ? event.data : "" - if (!data) return - output?.push(data) - cursor += data.length + if (typeof event.data === "string") { + debugTerminal("ignoring unframed websocket output") + } } socket.addEventListener("message", handleMessage) diff --git a/packages/opencode/package.json b/packages/opencode/package.json index 8281d4ff0c..6bf245559c 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -100,7 +100,7 @@ "ai": "catalog:", "ai-gateway-provider": "2.3.1", "bonjour-service": "1.3.0", - "bun-pty": "0.4.8", + "node-pty": "1.0.0", "chokidar": "4.0.3", "clipboardy": "4.0.0", "decimal.js": "10.5.0", diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 2dda403e14..96378a5c17 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -1,11 +1,10 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" -import { type IPty } from "bun-pty" +import { type IPty } from "node-pty" import z from "zod" import { Identifier } from "../id/id" import { Log } from "../util/log" import { Instance } from "../project/instance" -import { lazy } from "@opencode-ai/util/lazy" import { Shell } from "@/shell/shell" import { Plugin } from "@/plugin" @@ -15,22 +14,27 @@ export namespace Pty { const BUFFER_LIMIT = 1024 * 1024 * 2 const BUFFER_CHUNK = 64 * 1024 const encoder = new TextEncoder() + const decoder = new TextDecoder() + const FRAME_META = 0 + const FRAME_OUTPUT = 1 + const FRAME_INPUT = 2 + const MAX_CONNECTION = 200 type Socket = { readyState: number - data?: unknown send: (data: string | Uint8Array | ArrayBuffer) => void close: (code?: number, reason?: string) => void } type Subscriber = { id: number - token: unknown + connection: string } const sockets = new WeakMap() const owners = new WeakMap() let socketCounter = 0 + let connectionCounter = 0 const tagSocket = (ws: Socket) => { if (!ws || typeof ws !== "object") return @@ -39,33 +43,74 @@ export namespace Pty { return next } - const token = (ws: Socket) => { - const data = ws.data - if (!data || typeof data !== "object") return - - const events = (data as { events?: unknown }).events - if (events && typeof events === "object") return events - - const url = (data as { url?: unknown }).url - if (url && typeof url === "object") return url - - return data + const connection = () => { + connectionCounter = (connectionCounter + 1) % Number.MAX_SAFE_INTEGER + return `${Date.now().toString(36)}-${connectionCounter.toString(36)}` } - // WebSocket control frame: 0x00 + UTF-8 JSON. - const meta = (cursor: number) => { - const json = JSON.stringify({ cursor }) + const normalizeConnection = (value?: string) => { + const next = typeof value === "string" ? value.trim() : "" + if (!next) return connection() + if (next.length > MAX_CONNECTION) return connection() + if (encoder.encode(next).length > 255) return connection() + return next + } + + const output = (connection: string, data: string) => { + const channel = encoder.encode(connection) + const chunk = encoder.encode(data) + const out = new Uint8Array(2 + channel.length + chunk.length) + out[0] = FRAME_OUTPUT + out[1] = channel.length + out.set(channel, 2) + out.set(chunk, 2 + channel.length) + return out + } + + const input = (message: string | Uint8Array | ArrayBuffer) => { + if (typeof message === "string") { + return { data: message } + } + + const bytes = message instanceof Uint8Array ? message : new Uint8Array(message) + if (bytes[0] !== FRAME_INPUT) return + const size = bytes[1] + if (!Number.isSafeInteger(size) || size < 0) return + if (bytes.length < 2 + size) return + return { + connection: decoder.decode(bytes.subarray(2, 2 + size)), + data: decoder.decode(bytes.subarray(2 + size)), + } + } + + // WebSocket control frame: 0x00 + UTF-8 JSON ({ cursor, connection }). + const meta = (cursor: number, connection: string) => { + const json = JSON.stringify({ cursor, connection }) const bytes = encoder.encode(json) const out = new Uint8Array(bytes.length + 1) - out[0] = 0 + out[0] = FRAME_META out.set(bytes, 1) return out } - const pty = lazy(async () => { - const { spawn } = await import("bun-pty") - return spawn - }) + type Spawn = (file: string, args: string | string[], options: unknown) => IPty + let override: Spawn | undefined + let spawn: Spawn | undefined + + const pty = async (): Promise => { + if (override) return override + if (spawn) return spawn + const mod = await import("node-pty") + const next = mod.spawn as Spawn + spawn = next + return next + } + + export function setSpawn(input?: Spawn) { + override = input + if (input) return + spawn = undefined + } export const Info = z .object({ @@ -210,13 +255,8 @@ export namespace Pty { continue } - if (sub.token !== undefined && token(ws) !== sub.token) { - session.subscribers.delete(ws) - continue - } - try { - ws.send(chunk) + ws.send(output(sub.connection, chunk)) } catch { session.subscribers.delete(ws) } @@ -292,7 +332,7 @@ export namespace Pty { } } - export function connect(id: string, ws: Socket, cursor?: number) { + export function connect(id: string, ws: Socket, cursor?: number, connectionID?: string) { const session = state().get(id) if (!session) { ws.close() @@ -312,7 +352,11 @@ export namespace Pty { } owners.set(ws, id) - session.subscribers.set(ws, { id: socketId, token: token(ws) }) + const sub = { + id: socketId, + connection: normalizeConnection(connectionID), + } + session.subscribers.set(ws, sub) const cleanup = () => { session.subscribers.delete(ws) @@ -336,7 +380,7 @@ export namespace Pty { if (data) { try { for (let i = 0; i < data.length; i += BUFFER_CHUNK) { - ws.send(data.slice(i, i + BUFFER_CHUNK)) + ws.send(output(sub.connection, data.slice(i, i + BUFFER_CHUNK))) } } catch { cleanup() @@ -346,15 +390,18 @@ export namespace Pty { } try { - ws.send(meta(end)) + ws.send(meta(end, sub.connection)) } catch { cleanup() ws.close() return } return { - onMessage: (message: string | ArrayBuffer) => { - session.process.write(String(message)) + onMessage: (message: string | Uint8Array | ArrayBuffer) => { + const next = input(message) + if (!next?.data) return + if (next.connection && next.connection !== sub.connection) return + session.process.write(next.data) }, onClose: () => { log.info("client disconnected from session", { id }) diff --git a/packages/opencode/src/server/routes/pty.ts b/packages/opencode/src/server/routes/pty.ts index 368c9612bf..dd95ddbde3 100644 --- a/packages/opencode/src/server/routes/pty.ts +++ b/packages/opencode/src/server/routes/pty.ts @@ -1,11 +1,11 @@ import { Hono } from "hono" import { describeRoute, validator, resolver } from "hono-openapi" -import { upgradeWebSocket } from "hono/bun" import z from "zod" import { Pty } from "@/pty" import { NotFoundError } from "../../storage/db" import { errors } from "../error" import { lazy } from "../../util/lazy" +import { upgradeWebSocket } from "../websocket" export const PtyRoutes = lazy(() => new Hono() @@ -158,6 +158,12 @@ export const PtyRoutes = lazy(() => if (!Number.isSafeInteger(parsed) || parsed < -1) return return parsed })() + const connection = (() => { + const value = c.req.query("connection") + if (!value) return + if (value.length > 200) return + return value + })() let handler: ReturnType if (!Pty.get(id)) throw new Error("Session not found") @@ -177,15 +183,20 @@ export const PtyRoutes = lazy(() => return { onOpen(_event, ws) { - const socket = ws.raw - if (!isSocket(socket)) { + if (!isSocket(ws)) { ws.close() return } - handler = Pty.connect(id, socket, cursor) + handler = Pty.connect(id, ws, cursor, connection) }, onMessage(event) { - if (typeof event.data !== "string") return + if ( + typeof event.data !== "string" && + !(event.data instanceof ArrayBuffer) && + !(event.data instanceof Uint8Array) + ) { + return + } handler?.onMessage(event.data) }, onClose() { diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 9fba9c1fe1..dbfa84797b 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -33,7 +33,7 @@ import { lazy } from "../util/lazy" import { InstanceBootstrap } from "../project/bootstrap" import { NotFoundError } from "../storage/db" import type { ContentfulStatusCode } from "hono/utils/http-status" -import { websocket } from "hono/bun" +import { websocket } from "./websocket" import { HTTPException } from "hono/http-exception" import { errors } from "./error" import { QuestionRoutes } from "./routes/question" diff --git a/packages/opencode/src/server/websocket.ts b/packages/opencode/src/server/websocket.ts new file mode 100644 index 0000000000..5a65d5e639 --- /dev/null +++ b/packages/opencode/src/server/websocket.ts @@ -0,0 +1 @@ +export { createBunWebSocket, upgradeWebSocket, websocket } from "hono/bun" diff --git a/packages/opencode/test/pty/pty-output-isolation.test.ts b/packages/opencode/test/pty/pty-output-isolation.test.ts index 1b89a63742..be22c48aea 100644 --- a/packages/opencode/test/pty/pty-output-isolation.test.ts +++ b/packages/opencode/test/pty/pty-output-isolation.test.ts @@ -1,9 +1,73 @@ -import { describe, expect, test } from "bun:test" +import { afterEach, beforeEach, describe, expect, test } from "bun:test" import { Instance } from "../../src/project/instance" import { Pty } from "../../src/pty" import { tmpdir } from "../fixture/fixture" +const encoder = new TextEncoder() +const decoder = new TextDecoder() + +const input = (connection: string, data: string) => { + const channel = encoder.encode(connection) + const body = encoder.encode(data) + const out = new Uint8Array(2 + channel.length + body.length) + out[0] = 2 + out[1] = channel.length + out.set(channel, 2) + out.set(body, 2 + channel.length) + return out +} + +const output = (connection: string, data: unknown) => { + if (typeof data === "string") return data + if (!(data instanceof Uint8Array) && !(data instanceof ArrayBuffer)) return "" + const bytes = data instanceof Uint8Array ? data : new Uint8Array(data) + if (bytes[0] !== 1) return "" + const size = bytes[1] + if (!Number.isSafeInteger(size) || size < 0) return "" + if (bytes.length < 2 + size) return "" + const id = decoder.decode(bytes.subarray(2, 2 + size)) + if (id !== connection) return "" + return decoder.decode(bytes.subarray(2 + size)) +} + +const spawn = () => { + let pid = 1000 + return () => { + const data = new Set<(chunk: string) => void>() + const exit = new Set<(event: { exitCode: number }) => void>() + let closed = false + + return { + pid: ++pid, + onData: (cb: (chunk: string) => void) => { + data.add(cb) + }, + onExit: (cb: (event: { exitCode: number }) => void) => { + exit.add(cb) + }, + resize: () => {}, + write: (chunk: string) => { + if (closed) return + for (const cb of data) cb(chunk) + }, + kill: () => { + if (closed) return + closed = true + for (const cb of exit) cb({ exitCode: 0 }) + }, + } + } +} + describe("pty", () => { + beforeEach(() => { + Pty.setSpawn(spawn() as unknown as Parameters[0]) + }) + + afterEach(() => { + Pty.setSpawn() + }) + test("does not leak output when websocket objects are reused", async () => { await using dir = await tmpdir({ git: true }) @@ -18,9 +82,9 @@ describe("pty", () => { const ws = { readyState: 1, - data: { events: { connection: "a" } }, send: (data: unknown) => { - outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + const text = output("conn-a", data) + if (text) outA.push(text) }, close: () => { // no-op (simulate abrupt drop) @@ -28,14 +92,14 @@ describe("pty", () => { } // Connect "a" first with ws. - Pty.connect(a.id, ws as any) + Pty.connect(a.id, ws as any, undefined, "conn-a") // Now "reuse" the same ws object for another connection. - ws.data = { events: { connection: "b" } } ws.send = (data: unknown) => { - outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + const text = output("conn-b", data) + if (text) outB.push(text) } - Pty.connect(b.id, ws as any) + Pty.connect(b.id, ws as any, undefined, "conn-b") // Clear connect metadata writes. outA.length = 0 @@ -54,7 +118,7 @@ describe("pty", () => { }) }) - test("does not leak output when Bun recycles websocket objects before re-connect", async () => { + test("does not leak output when websocket objects are recycled before re-connect", async () => { await using dir = await tmpdir({ git: true }) await Instance.provide({ @@ -67,9 +131,9 @@ describe("pty", () => { const ws = { readyState: 1, - data: { events: { connection: "a" } }, send: (data: unknown) => { - outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + const text = output("conn-a", data) + if (text) outA.push(text) }, close: () => { // no-op (simulate abrupt drop) @@ -77,14 +141,14 @@ describe("pty", () => { } // Connect "a" first. - Pty.connect(a.id, ws as any) + Pty.connect(a.id, ws as any, undefined, "conn-a") outA.length = 0 - // Simulate Bun reusing the same websocket object for another - // connection before the next onOpen calls Pty.connect. - ws.data = { events: { connection: "b" } } + // Simulate websocket object reuse for another connection before + // the next onOpen calls Pty.connect. ws.send = (data: unknown) => { - outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + const text = output("conn-b", data) + if (text) outB.push(text) } Pty.write(a.id, "AAA\n") @@ -97,4 +161,42 @@ describe("pty", () => { }, }) }) + + test("drops input frames that carry a different connection id", async () => { + await using dir = await tmpdir({ git: true }) + + await Instance.provide({ + directory: dir.path, + fn: async () => { + const a = await Pty.create({ command: "cat", title: "a" }) + try { + const out: string[] = [] + + const ws = { + readyState: 1, + send: (data: unknown) => { + const text = output("conn-a", data) + if (text) out.push(text) + }, + close: () => { + // no-op + }, + } + + const handler = Pty.connect(a.id, ws as any, undefined, "conn-a") + out.length = 0 + + handler?.onMessage(input("conn-b", "BBB\n")) + await Bun.sleep(100) + expect(out.join("")).not.toContain("BBB") + + handler?.onMessage(input("conn-a", "AAA\n")) + await Bun.sleep(100) + expect(out.join("")).toContain("AAA") + } finally { + await Pty.remove(a.id) + } + }, + }) + }) })