From 6a3cb06c6ce8d2f45a154c21e68223aace67853b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 19 Mar 2026 21:18:15 -0400 Subject: [PATCH] effectify Pty service, add runSyncInstance helper --- packages/opencode/specs/effect-migration.md | 2 +- packages/opencode/src/effect/instances.ts | 3 + packages/opencode/src/effect/runtime.ts | 4 + packages/opencode/src/pty/index.ts | 456 +++++++++++--------- 4 files changed, 263 insertions(+), 202 deletions(-) diff --git a/packages/opencode/specs/effect-migration.md b/packages/opencode/specs/effect-migration.md index 4f195917fd..19c8a1c6f7 100644 --- a/packages/opencode/specs/effect-migration.md +++ b/packages/opencode/specs/effect-migration.md @@ -128,7 +128,7 @@ Still open and likely worth migrating: - [ ] `Plugin` - [ ] `ToolRegistry` -- [ ] `Pty` +- [x] `Pty` - [ ] `Worktree` - [ ] `Installation` - [ ] `Bus` diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts index c05458d5df..fa4972697e 100644 --- a/packages/opencode/src/effect/instances.ts +++ b/packages/opencode/src/effect/instances.ts @@ -4,6 +4,7 @@ import { FileTime } from "@/file/time" import { FileWatcher } from "@/file/watcher" import { Format } from "@/format" import { PermissionNext } from "@/permission" +import { Pty } from "@/pty" import { Instance } from "@/project/instance" import { Vcs } from "@/project/vcs" import { ProviderAuth } from "@/provider/auth" @@ -24,6 +25,7 @@ export type InstanceServices = | FileTime.Service | Format.Service | File.Service + | Pty.Service | Skill.Service | Snapshot.Service @@ -44,6 +46,7 @@ function lookup(_key: string) { Layer.fresh(FileTime.layer).pipe(Layer.orDie), Layer.fresh(Format.layer), Layer.fresh(File.layer), + Layer.fresh(Pty.layer), Layer.fresh(Skill.defaultLayer), Layer.fresh(Snapshot.defaultLayer), ).pipe(Layer.provide(ctx)) diff --git a/packages/opencode/src/effect/runtime.ts b/packages/opencode/src/effect/runtime.ts index f52203b222..039f570f43 100644 --- a/packages/opencode/src/effect/runtime.ts +++ b/packages/opencode/src/effect/runtime.ts @@ -18,6 +18,10 @@ export function runPromiseInstance(effect: Effect.Effect(effect: Effect.Effect) { + return runtime.runSync(effect.pipe(Effect.provide(Instances.get(Instance.directory)))) +} + export function disposeRuntime() { return runtime.dispose() } diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 7436abec9f..ab14948f9f 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -1,13 +1,15 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" +import { InstanceContext } from "@/effect/instance-context" +import { runPromiseInstance, runSyncInstance } from "@/effect/runtime" import { type IPty } from "bun-pty" import z from "zod" import { Log } from "../util/log" -import { Instance } from "../project/instance" import { lazy } from "@opencode-ai/util/lazy" import { Shell } from "@/shell/shell" import { Plugin } from "@/plugin" import { PtyID } from "./schema" +import { Effect, Layer, ServiceMap } from "effect" export namespace Pty { const log = Log.create({ service: "pty" }) @@ -90,232 +92,284 @@ export namespace Pty { subscribers: Map } - const state = Instance.state( - () => new Map(), - async (sessions) => { - for (const session of sessions.values()) { + export interface Interface { + readonly list: () => Effect.Effect + readonly get: (id: PtyID) => Effect.Effect + readonly create: (input: CreateInput) => Effect.Effect + readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect + readonly remove: (id: PtyID) => Effect.Effect + readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect + readonly write: (id: PtyID, data: string) => Effect.Effect + readonly connect: ( + id: PtyID, + ws: Socket, + cursor?: number, + ) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined> + } + + export class Service extends ServiceMap.Service()("@opencode/Pty") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const instance = yield* InstanceContext + const sessions = new Map() + + yield* Effect.addFinalizer(() => + Effect.sync(() => { + for (const session of sessions.values()) { + try { + session.process.kill() + } catch {} + for (const [key, ws] of session.subscribers.entries()) { + try { + if (ws.data === key) ws.close() + } catch {} + } + } + sessions.clear() + }), + ) + + const removeSession = (id: PtyID) => { + const session = sessions.get(id) + if (!session) return + sessions.delete(id) + log.info("removing session", { id }) try { session.process.kill() } catch {} for (const [key, ws] of session.subscribers.entries()) { try { if (ws.data === key) ws.close() + } catch {} + } + session.subscribers.clear() + Bus.publish(Event.Deleted, { id: session.info.id }) + } + + const list = Effect.fn("Pty.list")(function* () { + return Array.from(sessions.values()).map((s) => s.info) + }) + + const get = Effect.fn("Pty.get")(function* (id: PtyID) { + return sessions.get(id)?.info + }) + + const create = Effect.fn("Pty.create")(function* (input: CreateInput) { + return yield* Effect.promise(async () => { + const id = PtyID.ascending() + const command = input.command || Shell.preferred() + const args = input.args || [] + if (command.endsWith("sh")) { + args.push("-l") + } + + const cwd = input.cwd || instance.directory + const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} }) + const env = { + ...process.env, + ...input.env, + ...shellEnv.env, + TERM: "xterm-256color", + OPENCODE_TERMINAL: "1", + } as Record + + if (process.platform === "win32") { + env.LC_ALL = "C.UTF-8" + env.LC_CTYPE = "C.UTF-8" + env.LANG = "C.UTF-8" + } + log.info("creating session", { id, cmd: command, args, cwd }) + + const spawn = await pty() + const ptyProcess = spawn(command, args, { + name: "xterm-256color", + cwd, + env, + }) + + const info = { + id, + title: input.title || `Terminal ${id.slice(-4)}`, + command, + args, + cwd, + status: "running", + pid: ptyProcess.pid, + } as const + const session: ActiveSession = { + info, + process: ptyProcess, + buffer: "", + bufferCursor: 0, + cursor: 0, + subscribers: new Map(), + } + sessions.set(id, session) + ptyProcess.onData((chunk) => { + session.cursor += chunk.length + + for (const [key, ws] of session.subscribers.entries()) { + if (ws.readyState !== 1) { + session.subscribers.delete(key) + continue + } + if (ws.data !== key) { + session.subscribers.delete(key) + continue + } + try { + ws.send(chunk) + } catch { + session.subscribers.delete(key) + } + } + + session.buffer += chunk + if (session.buffer.length <= BUFFER_LIMIT) return + const excess = session.buffer.length - BUFFER_LIMIT + session.buffer = session.buffer.slice(excess) + session.bufferCursor += excess + }) + ptyProcess.onExit(({ exitCode }) => { + if (session.info.status === "exited") return + log.info("session exited", { id, exitCode }) + session.info.status = "exited" + Bus.publish(Event.Exited, { id, exitCode }) + removeSession(id) + }) + Bus.publish(Event.Created, { info }) + return info + }) + }) + + const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) { + const session = sessions.get(id) + if (!session) return + if (input.title) { + session.info.title = input.title + } + if (input.size) { + session.process.resize(input.size.cols, input.size.rows) + } + Bus.publish(Event.Updated, { info: session.info }) + return session.info + }) + + const remove = Effect.fn("Pty.remove")(function* (id: PtyID) { + removeSession(id) + }) + + const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) { + const session = sessions.get(id) + if (session && session.info.status === "running") { + session.process.resize(cols, rows) + } + }) + + const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) { + const session = sessions.get(id) + if (session && session.info.status === "running") { + session.process.write(data) + } + }) + + const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) { + const session = sessions.get(id) + if (!session) { + ws.close() + return + } + log.info("client connected to session", { id }) + + const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws + session.subscribers.delete(connectionKey) + session.subscribers.set(connectionKey, ws) + + const cleanup = () => { + session.subscribers.delete(connectionKey) + } + + const start = session.bufferCursor + const end = session.cursor + + const from = + cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 + + const data = (() => { + if (!session.buffer) return "" + if (from >= end) return "" + const offset = Math.max(0, from - start) + if (offset >= session.buffer.length) return "" + return session.buffer.slice(offset) + })() + + if (data) { + try { + for (let i = 0; i < data.length; i += BUFFER_CHUNK) { + ws.send(data.slice(i, i + BUFFER_CHUNK)) + } } catch { - // ignore + cleanup() + ws.close() + return } } - } - sessions.clear() - }, + + try { + ws.send(meta(end)) + } catch { + cleanup() + ws.close() + return + } + return { + onMessage: (message: string | ArrayBuffer) => { + session.process.write(String(message)) + }, + onClose: () => { + log.info("client disconnected from session", { id }) + cleanup() + }, + } + }) + + return Service.of({ list, get, create, update, remove, resize, write, connect }) + }), ) + // Sync facades export function list() { - return Array.from(state().values()).map((s) => s.info) + return runSyncInstance(Service.use((svc) => svc.list())) } export function get(id: PtyID) { - return state().get(id)?.info - } - - export async function create(input: CreateInput) { - const id = PtyID.ascending() - const command = input.command || Shell.preferred() - const args = input.args || [] - if (command.endsWith("sh")) { - args.push("-l") - } - - const cwd = input.cwd || Instance.directory - const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} }) - const env = { - ...process.env, - ...input.env, - ...shellEnv.env, - TERM: "xterm-256color", - OPENCODE_TERMINAL: "1", - } as Record - - if (process.platform === "win32") { - env.LC_ALL = "C.UTF-8" - env.LC_CTYPE = "C.UTF-8" - env.LANG = "C.UTF-8" - } - log.info("creating session", { id, cmd: command, args, cwd }) - - const spawn = await pty() - const ptyProcess = spawn(command, args, { - name: "xterm-256color", - cwd, - env, - }) - - const info = { - id, - title: input.title || `Terminal ${id.slice(-4)}`, - command, - args, - cwd, - status: "running", - pid: ptyProcess.pid, - } as const - const session: ActiveSession = { - info, - process: ptyProcess, - buffer: "", - bufferCursor: 0, - cursor: 0, - subscribers: new Map(), - } - state().set(id, session) - ptyProcess.onData( - Instance.bind((chunk) => { - session.cursor += chunk.length - - for (const [key, ws] of session.subscribers.entries()) { - if (ws.readyState !== 1) { - session.subscribers.delete(key) - continue - } - - if (ws.data !== key) { - session.subscribers.delete(key) - continue - } - - try { - ws.send(chunk) - } catch { - session.subscribers.delete(key) - } - } - - session.buffer += chunk - if (session.buffer.length <= BUFFER_LIMIT) return - const excess = session.buffer.length - BUFFER_LIMIT - session.buffer = session.buffer.slice(excess) - session.bufferCursor += excess - }), - ) - ptyProcess.onExit( - Instance.bind(({ exitCode }) => { - if (session.info.status === "exited") return - log.info("session exited", { id, exitCode }) - session.info.status = "exited" - Bus.publish(Event.Exited, { id, exitCode }) - remove(id) - }), - ) - Bus.publish(Event.Created, { info }) - return info - } - - export async function update(id: PtyID, input: UpdateInput) { - const session = state().get(id) - if (!session) return - if (input.title) { - session.info.title = input.title - } - if (input.size) { - session.process.resize(input.size.cols, input.size.rows) - } - Bus.publish(Event.Updated, { info: session.info }) - return session.info - } - - export async function remove(id: PtyID) { - const session = state().get(id) - if (!session) return - state().delete(id) - log.info("removing session", { id }) - try { - session.process.kill() - } catch {} - for (const [key, ws] of session.subscribers.entries()) { - try { - if (ws.data === key) ws.close() - } catch { - // ignore - } - } - session.subscribers.clear() - Bus.publish(Event.Deleted, { id: session.info.id }) + return runSyncInstance(Service.use((svc) => svc.get(id))) } export function resize(id: PtyID, cols: number, rows: number) { - const session = state().get(id) - if (session && session.info.status === "running") { - session.process.resize(cols, rows) - } + runSyncInstance(Service.use((svc) => svc.resize(id, cols, rows))) } export function write(id: PtyID, data: string) { - const session = state().get(id) - if (session && session.info.status === "running") { - session.process.write(data) - } + runSyncInstance(Service.use((svc) => svc.write(id, data))) } export function connect(id: PtyID, ws: Socket, cursor?: number) { - const session = state().get(id) - if (!session) { - ws.close() - return - } - log.info("client connected to session", { id }) + return runSyncInstance(Service.use((svc) => svc.connect(id, ws, cursor))) + } - // Use ws.data as the unique key for this connection lifecycle. - // If ws.data is undefined, fallback to ws object. - const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws + // Async facades + export async function create(input: CreateInput) { + return runPromiseInstance(Service.use((svc) => svc.create(input))) + } - // Optionally cleanup if the key somehow exists - session.subscribers.delete(connectionKey) - session.subscribers.set(connectionKey, ws) + export async function update(id: PtyID, input: UpdateInput) { + return runPromiseInstance(Service.use((svc) => svc.update(id, input))) + } - const cleanup = () => { - session.subscribers.delete(connectionKey) - } - - const start = session.bufferCursor - const end = session.cursor - - const from = - cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0 - - const data = (() => { - if (!session.buffer) return "" - if (from >= end) return "" - const offset = Math.max(0, from - start) - if (offset >= session.buffer.length) return "" - return session.buffer.slice(offset) - })() - - if (data) { - try { - for (let i = 0; i < data.length; i += BUFFER_CHUNK) { - ws.send(data.slice(i, i + BUFFER_CHUNK)) - } - } catch { - cleanup() - ws.close() - return - } - } - - try { - ws.send(meta(end)) - } catch { - cleanup() - ws.close() - return - } - return { - onMessage: (message: string | ArrayBuffer) => { - session.process.write(String(message)) - }, - onClose: () => { - log.info("client disconnected from session", { id }) - cleanup() - }, - } + export async function remove(id: PtyID) { + return runPromiseInstance(Service.use((svc) => svc.remove(id))) } }