From 38450443b18c4736acaa2310fbe480674f68369d Mon Sep 17 00:00:00 2001 From: James Long Date: Thu, 26 Mar 2026 12:30:26 -0400 Subject: [PATCH] feat(core): remove workspace server, WorkspaceContext, start work towards better routing (#19316) --- packages/opencode/src/cli/cmd/tui/worker.ts | 59 +++---- .../opencode/src/cli/cmd/workspace-serve.ts | 16 -- .../src/control-plane/adaptors/worktree.ts | 5 +- .../src/control-plane/workspace-context.ts | 24 --- .../workspace-router-middleware.ts | 44 +++-- .../control-plane/workspace-server/routes.ts | 33 ---- .../control-plane/workspace-server/server.ts | 65 ------- packages/opencode/src/index.ts | 9 +- packages/opencode/src/server/server.ts | 17 +- packages/opencode/src/session/index.ts | 5 +- .../session-proxy-middleware.test.ts | 159 ------------------ .../workspace-server-sse.test.ts | 70 -------- .../test/control-plane/workspace-sync.test.ts | 99 ----------- 13 files changed, 68 insertions(+), 537 deletions(-) delete mode 100644 packages/opencode/src/cli/cmd/workspace-serve.ts delete mode 100644 packages/opencode/src/control-plane/workspace-context.ts delete mode 100644 packages/opencode/src/control-plane/workspace-server/routes.ts delete mode 100644 packages/opencode/src/control-plane/workspace-server/server.ts delete mode 100644 packages/opencode/test/control-plane/session-proxy-middleware.test.ts delete mode 100644 packages/opencode/test/control-plane/workspace-server-sse.test.ts delete mode 100644 packages/opencode/test/control-plane/workspace-sync.test.ts diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index 3d8c00cc52..76f76fa58f 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -12,7 +12,6 @@ import type { Event } from "@opencode-ai/sdk/v2" import { Flag } from "@/flag/flag" import { setTimeout as sleep } from "node:timers/promises" import { writeHeapSnapshot } from "node:v8" -import { WorkspaceContext } from "@/control-plane/workspace-context" import { WorkspaceID } from "@/control-plane/schema" await Log.init({ @@ -53,45 +52,39 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) => eventStream.abort = abort const signal = abort.signal - const workspaceID = input.workspaceID ? WorkspaceID.make(input.workspaceID) : undefined - ;(async () => { while (!signal.aborted) { - const shouldReconnect = await WorkspaceContext.provide({ - workspaceID, + const shouldReconnect = await Instance.provide({ + directory: input.directory, + init: InstanceBootstrap, fn: () => - Instance.provide({ - directory: input.directory, - init: InstanceBootstrap, - fn: () => - new Promise((resolve) => { - Rpc.emit("event", { - type: "server.connected", - properties: {}, - } satisfies Event) + new Promise((resolve) => { + Rpc.emit("event", { + type: "server.connected", + properties: {}, + } satisfies Event) - let settled = false - const settle = (value: boolean) => { - if (settled) return - settled = true - signal.removeEventListener("abort", onAbort) - unsub() - resolve(value) - } + let settled = false + const settle = (value: boolean) => { + if (settled) return + settled = true + signal.removeEventListener("abort", onAbort) + unsub() + resolve(value) + } - const unsub = Bus.subscribeAll((event) => { - Rpc.emit("event", event as Event) - if (event.type === Bus.InstanceDisposed.type) { - settle(true) - } - }) + const unsub = Bus.subscribeAll((event) => { + Rpc.emit("event", event as Event) + if (event.type === Bus.InstanceDisposed.type) { + settle(true) + } + }) - const onAbort = () => { - settle(false) - } + const onAbort = () => { + settle(false) + } - signal.addEventListener("abort", onAbort, { once: true }) - }), + signal.addEventListener("abort", onAbort, { once: true }) }), }).catch((error) => { Log.Default.error("event stream subscribe error", { diff --git a/packages/opencode/src/cli/cmd/workspace-serve.ts b/packages/opencode/src/cli/cmd/workspace-serve.ts deleted file mode 100644 index cb5c304e4b..0000000000 --- a/packages/opencode/src/cli/cmd/workspace-serve.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { cmd } from "./cmd" -import { withNetworkOptions, resolveNetworkOptions } from "../network" -import { WorkspaceServer } from "../../control-plane/workspace-server/server" - -export const WorkspaceServeCommand = cmd({ - command: "workspace-serve", - builder: (yargs) => withNetworkOptions(yargs), - describe: "starts a remote workspace event server", - handler: async (args) => { - const opts = await resolveNetworkOptions(args) - const server = WorkspaceServer.Listen(opts) - console.log(`workspace event server listening on http://${server.hostname}:${server.port}/event`) - await new Promise(() => {}) - await server.stop() - }, -}) diff --git a/packages/opencode/src/control-plane/adaptors/worktree.ts b/packages/opencode/src/control-plane/adaptors/worktree.ts index ff2d92e199..2a96034d78 100644 --- a/packages/opencode/src/control-plane/adaptors/worktree.ts +++ b/packages/opencode/src/control-plane/adaptors/worktree.ts @@ -33,13 +33,14 @@ export const WorktreeAdaptor: Adaptor = { await Worktree.remove({ directory: config.directory }) }, async fetch(info, input: RequestInfo | URL, init?: RequestInit) { + const { Server } = await import("../../server/server") + const config = Config.parse(info) - const { WorkspaceServer } = await import("../workspace-server/server") const url = input instanceof Request || input instanceof URL ? input : new URL(input, "http://opencode.internal") const headers = new Headers(init?.headers ?? (input instanceof Request ? input.headers : undefined)) headers.set("x-opencode-directory", config.directory) const request = new Request(url, { ...init, headers }) - return WorkspaceServer.App().fetch(request) + return Server.Default().fetch(request) }, } diff --git a/packages/opencode/src/control-plane/workspace-context.ts b/packages/opencode/src/control-plane/workspace-context.ts deleted file mode 100644 index cdd975dc4f..0000000000 --- a/packages/opencode/src/control-plane/workspace-context.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Context } from "../util/context" -import type { WorkspaceID } from "./schema" - -interface Context { - workspaceID?: WorkspaceID -} - -const context = Context.create("workspace") - -export const WorkspaceContext = { - async provide(input: { workspaceID?: WorkspaceID; fn: () => R }): Promise { - return context.provide({ workspaceID: input.workspaceID }, async () => { - return input.fn() - }) - }, - - get workspaceID() { - try { - return context.use().workspaceID - } catch (e) { - return undefined - } - }, -} diff --git a/packages/opencode/src/control-plane/workspace-router-middleware.ts b/packages/opencode/src/control-plane/workspace-router-middleware.ts index 463a95ef2b..283350532b 100644 --- a/packages/opencode/src/control-plane/workspace-router-middleware.ts +++ b/packages/opencode/src/control-plane/workspace-router-middleware.ts @@ -1,23 +1,38 @@ import type { MiddlewareHandler } from "hono" import { Flag } from "../flag/flag" import { getAdaptor } from "./adaptors" +import { WorkspaceID } from "./schema" import { Workspace } from "./workspace" -import { WorkspaceContext } from "./workspace-context" -// This middleware forwards all non-GET requests if the workspace is a -// remote. The remote workspace needs to handle session mutations +type Rule = { method?: string; path: string; exact?: boolean; action: "local" | "forward" } + +const RULES: Array = [ + { path: "/session/status", action: "forward" }, + { method: "GET", path: "/session", action: "local" }, +] + +function local(method: string, path: string) { + for (const rule of RULES) { + if (rule.method && rule.method !== method) continue + const match = rule.exact ? path === rule.path : path === rule.path || path.startsWith(rule.path + "/") + if (match) return rule.action === "local" + } + return false +} + async function routeRequest(req: Request) { - // Right now, we need to forward all requests to the workspace - // because we don't have syncing. In the future all GET requests - // which don't mutate anything will be handled locally - // - // if (req.method === "GET") return + const url = new URL(req.url) + const raw = url.searchParams.get("workspace") || req.headers.get("x-opencode-workspace") - if (!WorkspaceContext.workspaceID) return + if (!raw) return - const workspace = await Workspace.get(WorkspaceContext.workspaceID) + if (local(req.method, url.pathname)) return + + const workspaceID = WorkspaceID.make(raw) + + const workspace = await Workspace.get(workspaceID) if (!workspace) { - return new Response(`Workspace not found: ${WorkspaceContext.workspaceID}`, { + return new Response(`Workspace not found: ${workspaceID}`, { status: 500, headers: { "content-type": "text/plain; charset=utf-8", @@ -27,11 +42,14 @@ async function routeRequest(req: Request) { const adaptor = await getAdaptor(workspace.type) - return adaptor.fetch(workspace, `${new URL(req.url).pathname}${new URL(req.url).search}`, { + const headers = new Headers(req.headers) + headers.delete("x-opencode-workspace") + + return adaptor.fetch(workspace, `${url.pathname}${url.search}`, { method: req.method, body: req.method === "GET" || req.method === "HEAD" ? undefined : await req.arrayBuffer(), signal: req.signal, - headers: req.headers, + headers, }) } diff --git a/packages/opencode/src/control-plane/workspace-server/routes.ts b/packages/opencode/src/control-plane/workspace-server/routes.ts deleted file mode 100644 index 353e5d50af..0000000000 --- a/packages/opencode/src/control-plane/workspace-server/routes.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { GlobalBus } from "../../bus/global" -import { Hono } from "hono" -import { streamSSE } from "hono/streaming" - -export function WorkspaceServerRoutes() { - return new Hono().get("/event", async (c) => { - c.header("X-Accel-Buffering", "no") - c.header("X-Content-Type-Options", "nosniff") - return streamSSE(c, async (stream) => { - const send = async (event: unknown) => { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - } - const handler = async (event: { directory?: string; payload: unknown }) => { - await send(event.payload) - } - GlobalBus.on("event", handler) - await send({ type: "server.connected", properties: {} }) - const heartbeat = setInterval(() => { - void send({ type: "server.heartbeat", properties: {} }) - }, 10_000) - - await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - GlobalBus.off("event", handler) - resolve() - }) - }) - }) - }) -} diff --git a/packages/opencode/src/control-plane/workspace-server/server.ts b/packages/opencode/src/control-plane/workspace-server/server.ts deleted file mode 100644 index b0744fe025..0000000000 --- a/packages/opencode/src/control-plane/workspace-server/server.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { Hono } from "hono" -import { Instance } from "../../project/instance" -import { InstanceBootstrap } from "../../project/bootstrap" -import { SessionRoutes } from "../../server/routes/session" -import { WorkspaceServerRoutes } from "./routes" -import { WorkspaceContext } from "../workspace-context" -import { WorkspaceID } from "../schema" - -export namespace WorkspaceServer { - export function App() { - const session = new Hono() - .use(async (c, next) => { - // Right now, we need handle all requests because we don't - // have syncing. In the future all GET requests will handled - // by the control plane - // - // if (c.req.method === "GET") return c.notFound() - await next() - }) - .route("/", SessionRoutes()) - - return new Hono() - .use(async (c, next) => { - const rawWorkspaceID = c.req.query("workspace") || c.req.header("x-opencode-workspace") - const raw = c.req.query("directory") || c.req.header("x-opencode-directory") - if (rawWorkspaceID == null) { - throw new Error("workspaceID parameter is required") - } - if (raw == null) { - throw new Error("directory parameter is required") - } - - const directory = (() => { - try { - return decodeURIComponent(raw) - } catch { - return raw - } - })() - - return WorkspaceContext.provide({ - workspaceID: WorkspaceID.make(rawWorkspaceID), - async fn() { - return Instance.provide({ - directory, - init: InstanceBootstrap, - async fn() { - return next() - }, - }) - }, - }) - }) - .route("/session", session) - .route("/", WorkspaceServerRoutes()) - } - - export function Listen(opts: { hostname: string; port: number }) { - return Bun.serve({ - hostname: opts.hostname, - port: opts.port, - fetch: App().fetch, - }) - } -} diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index b3d1db7eb0..e27471068f 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -14,7 +14,6 @@ import { Installation } from "./installation" import { NamedError } from "@opencode-ai/util/error" import { FormatError } from "./cli/error" import { ServeCommand } from "./cli/cmd/serve" -import { WorkspaceServeCommand } from "./cli/cmd/workspace-serve" import { Filesystem } from "./util/filesystem" import { DebugCommand } from "./cli/cmd/debug" import { StatsCommand } from "./cli/cmd/stats" @@ -47,7 +46,7 @@ process.on("uncaughtException", (e) => { }) }) -let cli = yargs(hideBin(process.argv)) +const cli = yargs(hideBin(process.argv)) .parserConfiguration({ "populate--": true }) .scriptName("opencode") .wrap(100) @@ -145,12 +144,6 @@ let cli = yargs(hideBin(process.argv)) .command(PrCommand) .command(SessionCommand) .command(DbCommand) - -if (Installation.isLocal()) { - cli = cli.command(WorkspaceServeCommand) -} - -cli = cli .fail((msg, err) => { if ( msg?.startsWith("Unknown argument") || diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 899dacee29..e3eeead2a1 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -19,7 +19,6 @@ import { Auth } from "../auth" import { Flag } from "../flag/flag" import { Command } from "../command" import { Global } from "../global" -import { WorkspaceContext } from "../control-plane/workspace-context" import { WorkspaceID } from "../control-plane/schema" import { ProviderID } from "../provider/schema" import { WorkspaceRouterMiddleware } from "../control-plane/workspace-router-middleware" @@ -204,7 +203,6 @@ export namespace Server { ) .use(async (c, next) => { if (c.req.path === "/log") return next() - const rawWorkspaceID = c.req.query("workspace") || c.req.header("x-opencode-workspace") const raw = c.req.query("directory") || c.req.header("x-opencode-directory") || process.cwd() const directory = Filesystem.resolve( (() => { @@ -216,20 +214,14 @@ export namespace Server { })(), ) - return WorkspaceContext.provide({ - workspaceID: rawWorkspaceID ? WorkspaceID.make(rawWorkspaceID) : undefined, + return Instance.provide({ + directory, + init: InstanceBootstrap, async fn() { - return Instance.provide({ - directory, - init: InstanceBootstrap, - async fn() { - return next() - }, - }) + return next() }, }) }) - .use(WorkspaceRouterMiddleware) .get( "/doc", openAPIRouteHandler(app, { @@ -252,6 +244,7 @@ export namespace Server { }), ), ) + .use(WorkspaceRouterMiddleware) .route("/project", ProjectRoutes()) .route("/pty", PtyRoutes()) .route("/config", ConfigRoutes()) diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index a379cd228c..6102b7b413 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -23,7 +23,6 @@ import { SessionPrompt } from "./prompt" import { fn } from "@/util/fn" import { Command } from "../command" import { Snapshot } from "@/snapshot" -import { WorkspaceContext } from "../control-plane/workspace-context" import { ProjectID } from "../project/schema" import { WorkspaceID } from "../control-plane/schema" import { SessionID, MessageID, PartID } from "./schema" @@ -494,8 +493,8 @@ export namespace Session { const project = Instance.project const conditions = [eq(SessionTable.project_id, project.id)] - if (WorkspaceContext.workspaceID) { - conditions.push(eq(SessionTable.workspace_id, WorkspaceContext.workspaceID)) + if (input?.workspaceID) { + conditions.push(eq(SessionTable.workspace_id, input.workspaceID)) } if (input?.directory) { conditions.push(eq(SessionTable.directory, input.directory)) diff --git a/packages/opencode/test/control-plane/session-proxy-middleware.test.ts b/packages/opencode/test/control-plane/session-proxy-middleware.test.ts deleted file mode 100644 index d4d152a1c6..0000000000 --- a/packages/opencode/test/control-plane/session-proxy-middleware.test.ts +++ /dev/null @@ -1,159 +0,0 @@ -import { afterEach, describe, expect, mock, test } from "bun:test" -import { WorkspaceID } from "../../src/control-plane/schema" -import { Hono } from "hono" -import { tmpdir } from "../fixture/fixture" -import { Project } from "../../src/project/project" -import { WorkspaceTable } from "../../src/control-plane/workspace.sql" -import { Instance } from "../../src/project/instance" -import { WorkspaceContext } from "../../src/control-plane/workspace-context" -import { Database } from "../../src/storage/db" -import { resetDatabase } from "../fixture/db" -import * as adaptors from "../../src/control-plane/adaptors" -import type { Adaptor } from "../../src/control-plane/types" -import { Flag } from "../../src/flag/flag" - -afterEach(async () => { - mock.restore() - await resetDatabase() -}) - -const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES -// @ts-expect-error don't do this normally, but it works -Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true - -afterEach(() => { - // @ts-expect-error don't do this normally, but it works - Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original -}) - -type State = { - workspace?: "first" | "second" - calls: Array<{ method: string; url: string; body?: string }> -} - -const remote = { type: "testing", name: "remote-a" } as unknown as typeof WorkspaceTable.$inferInsert - -async function setup(state: State) { - const TestAdaptor: Adaptor = { - configure(config) { - return config - }, - async create() { - throw new Error("not used") - }, - async remove() {}, - - async fetch(_config: unknown, input: RequestInfo | URL, init?: RequestInit) { - const url = - input instanceof Request || input instanceof URL - ? input.toString() - : new URL(input, "http://workspace.test").toString() - const request = new Request(url, init) - const body = request.method === "GET" || request.method === "HEAD" ? undefined : await request.text() - state.calls.push({ - method: request.method, - url: `${new URL(request.url).pathname}${new URL(request.url).search}`, - body, - }) - return new Response("proxied", { status: 202 }) - }, - } - - adaptors.installAdaptor("testing", TestAdaptor) - - await using tmp = await tmpdir({ git: true }) - const { project } = await Project.fromDirectory(tmp.path) - - const id1 = WorkspaceID.ascending() - const id2 = WorkspaceID.ascending() - - Database.use((db) => - db - .insert(WorkspaceTable) - .values([ - { - id: id1, - branch: "main", - project_id: project.id, - type: remote.type, - name: remote.name, - }, - { - id: id2, - branch: "main", - project_id: project.id, - type: "worktree", - directory: tmp.path, - name: "local", - }, - ]) - .run(), - ) - - const { WorkspaceRouterMiddleware } = await import("../../src/control-plane/workspace-router-middleware") - const app = new Hono().use(WorkspaceRouterMiddleware) - - return { - id1, - id2, - app, - async request(input: RequestInfo | URL, init?: RequestInit) { - return Instance.provide({ - directory: tmp.path, - fn: async () => - WorkspaceContext.provide({ - workspaceID: state.workspace === "first" ? id1 : id2, - fn: () => app.request(input, init), - }), - }) - }, - } -} - -describe("control-plane/session-proxy-middleware", () => { - test("forwards non-GET session requests for workspaces", async () => { - const state: State = { - workspace: "first", - calls: [], - } - - const ctx = await setup(state) - - ctx.app.post("/session/foo", (c) => c.text("local", 200)) - const response = await ctx.request("http://workspace.test/session/foo?x=1", { - method: "POST", - body: JSON.stringify({ hello: "world" }), - headers: { - "content-type": "application/json", - }, - }) - - expect(response.status).toBe(202) - expect(await response.text()).toBe("proxied") - expect(state.calls).toEqual([ - { - method: "POST", - url: "/session/foo?x=1", - body: '{"hello":"world"}', - }, - ]) - }) - - // It will behave this way when we have syncing - // - // test("does not forward GET requests", async () => { - // const state: State = { - // workspace: "first", - // calls: [], - // } - - // const ctx = await setup(state) - - // ctx.app.get("/session/foo", (c) => c.text("local", 200)) - // const response = await ctx.request("http://workspace.test/session/foo?x=1") - - // expect(response.status).toBe(200) - // expect(await response.text()).toBe("local") - // expect(state.calls).toEqual([]) - // }) -}) diff --git a/packages/opencode/test/control-plane/workspace-server-sse.test.ts b/packages/opencode/test/control-plane/workspace-server-sse.test.ts deleted file mode 100644 index 7e7cddb140..0000000000 --- a/packages/opencode/test/control-plane/workspace-server-sse.test.ts +++ /dev/null @@ -1,70 +0,0 @@ -import { afterEach, describe, expect, test } from "bun:test" -import { Log } from "../../src/util/log" -import { WorkspaceServer } from "../../src/control-plane/workspace-server/server" -import { parseSSE } from "../../src/control-plane/sse" -import { GlobalBus } from "../../src/bus/global" -import { resetDatabase } from "../fixture/db" -import { tmpdir } from "../fixture/fixture" - -afterEach(async () => { - await resetDatabase() -}) - -Log.init({ print: false }) - -describe("control-plane/workspace-server SSE", () => { - test("streams GlobalBus events and parseSSE reads them", async () => { - await using tmp = await tmpdir({ git: true }) - const app = WorkspaceServer.App() - const stop = new AbortController() - const seen: unknown[] = [] - try { - const response = await app.request("/event", { - signal: stop.signal, - headers: { - "x-opencode-workspace": "wrk_test_workspace", - "x-opencode-directory": tmp.path, - }, - }) - - expect(response.status).toBe(200) - expect(response.body).toBeDefined() - - const done = new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - reject(new Error("timed out waiting for workspace.test event")) - }, 3000) - - void parseSSE(response.body!, stop.signal, (event) => { - seen.push(event) - const next = event as { type?: string } - if (next.type === "server.connected") { - GlobalBus.emit("event", { - payload: { - type: "workspace.test", - properties: { ok: true }, - }, - }) - return - } - if (next.type !== "workspace.test") return - clearTimeout(timeout) - resolve() - }).catch((error) => { - clearTimeout(timeout) - reject(error) - }) - }) - - await done - - expect(seen.some((event) => (event as { type?: string }).type === "server.connected")).toBe(true) - expect(seen).toContainEqual({ - type: "workspace.test", - properties: { ok: true }, - }) - } finally { - stop.abort() - } - }) -}) diff --git a/packages/opencode/test/control-plane/workspace-sync.test.ts b/packages/opencode/test/control-plane/workspace-sync.test.ts deleted file mode 100644 index 0f8d608fb3..0000000000 --- a/packages/opencode/test/control-plane/workspace-sync.test.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { afterEach, describe, expect, mock, test } from "bun:test" -import { WorkspaceID } from "../../src/control-plane/schema" -import { Log } from "../../src/util/log" -import { tmpdir } from "../fixture/fixture" -import { Project } from "../../src/project/project" -import { Database } from "../../src/storage/db" -import { WorkspaceTable } from "../../src/control-plane/workspace.sql" -import { GlobalBus } from "../../src/bus/global" -import { resetDatabase } from "../fixture/db" -import * as adaptors from "../../src/control-plane/adaptors" -import type { Adaptor } from "../../src/control-plane/types" - -afterEach(async () => { - mock.restore() - await resetDatabase() -}) - -Log.init({ print: false }) - -const remote = { type: "testing", name: "remote-a" } as unknown as typeof WorkspaceTable.$inferInsert - -const TestAdaptor: Adaptor = { - configure(config) { - return config - }, - async create() { - throw new Error("not used") - }, - async remove() {}, - async fetch(_config: unknown, _input: RequestInfo | URL, _init?: RequestInit) { - const body = new ReadableStream({ - start(controller) { - const encoder = new TextEncoder() - controller.enqueue(encoder.encode('data: {"type":"remote.ready","properties":{}}\n\n')) - controller.close() - }, - }) - return new Response(body, { - status: 200, - headers: { - "content-type": "text/event-stream", - }, - }) - }, -} - -adaptors.installAdaptor("testing", TestAdaptor) - -describe("control-plane/workspace.startSyncing", () => { - test("syncs only remote workspaces and emits remote SSE events", async () => { - const { Workspace } = await import("../../src/control-plane/workspace") - await using tmp = await tmpdir({ git: true }) - const { project } = await Project.fromDirectory(tmp.path) - - const id1 = WorkspaceID.ascending() - const id2 = WorkspaceID.ascending() - - Database.use((db) => - db - .insert(WorkspaceTable) - .values([ - { - id: id1, - branch: "main", - project_id: project.id, - type: remote.type, - name: remote.name, - }, - { - id: id2, - branch: "main", - project_id: project.id, - type: "worktree", - directory: tmp.path, - name: "local", - }, - ]) - .run(), - ) - - const done = new Promise((resolve) => { - const listener = (event: { directory?: string; payload: { type: string } }) => { - if (event.directory !== id1) return - if (event.payload.type !== "remote.ready") return - GlobalBus.off("event", listener) - resolve() - } - GlobalBus.on("event", listener) - }) - - const sync = Workspace.startSyncing(project) - await Promise.race([ - done, - new Promise((_, reject) => setTimeout(() => reject(new Error("timed out waiting for sync event")), 2000)), - ]) - - await sync.stop() - }) -})