feat(core): remove workspace server, WorkspaceContext, start work towards better routing (#19316)
parent
da1d37274f
commit
38450443b1
|
|
@ -12,7 +12,6 @@ import type { Event } from "@opencode-ai/sdk/v2"
|
|||
import { Flag } from "@/flag/flag"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
import { writeHeapSnapshot } from "node:v8"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import { WorkspaceID } from "@/control-plane/schema"
|
||||
|
||||
await Log.init({
|
||||
|
|
@ -53,14 +52,9 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) =>
|
|||
eventStream.abort = abort
|
||||
const signal = abort.signal
|
||||
|
||||
const workspaceID = input.workspaceID ? WorkspaceID.make(input.workspaceID) : undefined
|
||||
|
||||
;(async () => {
|
||||
while (!signal.aborted) {
|
||||
const shouldReconnect = await WorkspaceContext.provide({
|
||||
workspaceID,
|
||||
fn: () =>
|
||||
Instance.provide({
|
||||
const shouldReconnect = await Instance.provide({
|
||||
directory: input.directory,
|
||||
init: InstanceBootstrap,
|
||||
fn: () =>
|
||||
|
|
@ -92,7 +86,6 @@ const startEventStream = (input: { directory: string; workspaceID?: string }) =>
|
|||
|
||||
signal.addEventListener("abort", onAbort, { once: true })
|
||||
}),
|
||||
}),
|
||||
}).catch((error) => {
|
||||
Log.Default.error("event stream subscribe error", {
|
||||
error: error instanceof Error ? error.message : error,
|
||||
|
|
|
|||
|
|
@ -1,16 +0,0 @@
|
|||
import { cmd } from "./cmd"
|
||||
import { withNetworkOptions, resolveNetworkOptions } from "../network"
|
||||
import { WorkspaceServer } from "../../control-plane/workspace-server/server"
|
||||
|
||||
export const WorkspaceServeCommand = cmd({
|
||||
command: "workspace-serve",
|
||||
builder: (yargs) => withNetworkOptions(yargs),
|
||||
describe: "starts a remote workspace event server",
|
||||
handler: async (args) => {
|
||||
const opts = await resolveNetworkOptions(args)
|
||||
const server = WorkspaceServer.Listen(opts)
|
||||
console.log(`workspace event server listening on http://${server.hostname}:${server.port}/event`)
|
||||
await new Promise(() => {})
|
||||
await server.stop()
|
||||
},
|
||||
})
|
||||
|
|
@ -33,13 +33,14 @@ export const WorktreeAdaptor: Adaptor = {
|
|||
await Worktree.remove({ directory: config.directory })
|
||||
},
|
||||
async fetch(info, input: RequestInfo | URL, init?: RequestInit) {
|
||||
const { Server } = await import("../../server/server")
|
||||
|
||||
const config = Config.parse(info)
|
||||
const { WorkspaceServer } = await import("../workspace-server/server")
|
||||
const url = input instanceof Request || input instanceof URL ? input : new URL(input, "http://opencode.internal")
|
||||
const headers = new Headers(init?.headers ?? (input instanceof Request ? input.headers : undefined))
|
||||
headers.set("x-opencode-directory", config.directory)
|
||||
|
||||
const request = new Request(url, { ...init, headers })
|
||||
return WorkspaceServer.App().fetch(request)
|
||||
return Server.Default().fetch(request)
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,24 +0,0 @@
|
|||
import { Context } from "../util/context"
|
||||
import type { WorkspaceID } from "./schema"
|
||||
|
||||
interface Context {
|
||||
workspaceID?: WorkspaceID
|
||||
}
|
||||
|
||||
const context = Context.create<Context>("workspace")
|
||||
|
||||
export const WorkspaceContext = {
|
||||
async provide<R>(input: { workspaceID?: WorkspaceID; fn: () => R }): Promise<R> {
|
||||
return context.provide({ workspaceID: input.workspaceID }, async () => {
|
||||
return input.fn()
|
||||
})
|
||||
},
|
||||
|
||||
get workspaceID() {
|
||||
try {
|
||||
return context.use().workspaceID
|
||||
} catch (e) {
|
||||
return undefined
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
@ -1,23 +1,38 @@
|
|||
import type { MiddlewareHandler } from "hono"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { getAdaptor } from "./adaptors"
|
||||
import { WorkspaceID } from "./schema"
|
||||
import { Workspace } from "./workspace"
|
||||
import { WorkspaceContext } from "./workspace-context"
|
||||
|
||||
// This middleware forwards all non-GET requests if the workspace is a
|
||||
// remote. The remote workspace needs to handle session mutations
|
||||
type Rule = { method?: string; path: string; exact?: boolean; action: "local" | "forward" }
|
||||
|
||||
const RULES: Array<Rule> = [
|
||||
{ path: "/session/status", action: "forward" },
|
||||
{ method: "GET", path: "/session", action: "local" },
|
||||
]
|
||||
|
||||
function local(method: string, path: string) {
|
||||
for (const rule of RULES) {
|
||||
if (rule.method && rule.method !== method) continue
|
||||
const match = rule.exact ? path === rule.path : path === rule.path || path.startsWith(rule.path + "/")
|
||||
if (match) return rule.action === "local"
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
async function routeRequest(req: Request) {
|
||||
// Right now, we need to forward all requests to the workspace
|
||||
// because we don't have syncing. In the future all GET requests
|
||||
// which don't mutate anything will be handled locally
|
||||
//
|
||||
// if (req.method === "GET") return
|
||||
const url = new URL(req.url)
|
||||
const raw = url.searchParams.get("workspace") || req.headers.get("x-opencode-workspace")
|
||||
|
||||
if (!WorkspaceContext.workspaceID) return
|
||||
if (!raw) return
|
||||
|
||||
const workspace = await Workspace.get(WorkspaceContext.workspaceID)
|
||||
if (local(req.method, url.pathname)) return
|
||||
|
||||
const workspaceID = WorkspaceID.make(raw)
|
||||
|
||||
const workspace = await Workspace.get(workspaceID)
|
||||
if (!workspace) {
|
||||
return new Response(`Workspace not found: ${WorkspaceContext.workspaceID}`, {
|
||||
return new Response(`Workspace not found: ${workspaceID}`, {
|
||||
status: 500,
|
||||
headers: {
|
||||
"content-type": "text/plain; charset=utf-8",
|
||||
|
|
@ -27,11 +42,14 @@ async function routeRequest(req: Request) {
|
|||
|
||||
const adaptor = await getAdaptor(workspace.type)
|
||||
|
||||
return adaptor.fetch(workspace, `${new URL(req.url).pathname}${new URL(req.url).search}`, {
|
||||
const headers = new Headers(req.headers)
|
||||
headers.delete("x-opencode-workspace")
|
||||
|
||||
return adaptor.fetch(workspace, `${url.pathname}${url.search}`, {
|
||||
method: req.method,
|
||||
body: req.method === "GET" || req.method === "HEAD" ? undefined : await req.arrayBuffer(),
|
||||
signal: req.signal,
|
||||
headers: req.headers,
|
||||
headers,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,33 +0,0 @@
|
|||
import { GlobalBus } from "../../bus/global"
|
||||
import { Hono } from "hono"
|
||||
import { streamSSE } from "hono/streaming"
|
||||
|
||||
export function WorkspaceServerRoutes() {
|
||||
return new Hono().get("/event", async (c) => {
|
||||
c.header("X-Accel-Buffering", "no")
|
||||
c.header("X-Content-Type-Options", "nosniff")
|
||||
return streamSSE(c, async (stream) => {
|
||||
const send = async (event: unknown) => {
|
||||
await stream.writeSSE({
|
||||
data: JSON.stringify(event),
|
||||
})
|
||||
}
|
||||
const handler = async (event: { directory?: string; payload: unknown }) => {
|
||||
await send(event.payload)
|
||||
}
|
||||
GlobalBus.on("event", handler)
|
||||
await send({ type: "server.connected", properties: {} })
|
||||
const heartbeat = setInterval(() => {
|
||||
void send({ type: "server.heartbeat", properties: {} })
|
||||
}, 10_000)
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.onAbort(() => {
|
||||
clearInterval(heartbeat)
|
||||
GlobalBus.off("event", handler)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
@ -1,65 +0,0 @@
|
|||
import { Hono } from "hono"
|
||||
import { Instance } from "../../project/instance"
|
||||
import { InstanceBootstrap } from "../../project/bootstrap"
|
||||
import { SessionRoutes } from "../../server/routes/session"
|
||||
import { WorkspaceServerRoutes } from "./routes"
|
||||
import { WorkspaceContext } from "../workspace-context"
|
||||
import { WorkspaceID } from "../schema"
|
||||
|
||||
export namespace WorkspaceServer {
|
||||
export function App() {
|
||||
const session = new Hono()
|
||||
.use(async (c, next) => {
|
||||
// Right now, we need handle all requests because we don't
|
||||
// have syncing. In the future all GET requests will handled
|
||||
// by the control plane
|
||||
//
|
||||
// if (c.req.method === "GET") return c.notFound()
|
||||
await next()
|
||||
})
|
||||
.route("/", SessionRoutes())
|
||||
|
||||
return new Hono()
|
||||
.use(async (c, next) => {
|
||||
const rawWorkspaceID = c.req.query("workspace") || c.req.header("x-opencode-workspace")
|
||||
const raw = c.req.query("directory") || c.req.header("x-opencode-directory")
|
||||
if (rawWorkspaceID == null) {
|
||||
throw new Error("workspaceID parameter is required")
|
||||
}
|
||||
if (raw == null) {
|
||||
throw new Error("directory parameter is required")
|
||||
}
|
||||
|
||||
const directory = (() => {
|
||||
try {
|
||||
return decodeURIComponent(raw)
|
||||
} catch {
|
||||
return raw
|
||||
}
|
||||
})()
|
||||
|
||||
return WorkspaceContext.provide({
|
||||
workspaceID: WorkspaceID.make(rawWorkspaceID),
|
||||
async fn() {
|
||||
return Instance.provide({
|
||||
directory,
|
||||
init: InstanceBootstrap,
|
||||
async fn() {
|
||||
return next()
|
||||
},
|
||||
})
|
||||
},
|
||||
})
|
||||
})
|
||||
.route("/session", session)
|
||||
.route("/", WorkspaceServerRoutes())
|
||||
}
|
||||
|
||||
export function Listen(opts: { hostname: string; port: number }) {
|
||||
return Bun.serve({
|
||||
hostname: opts.hostname,
|
||||
port: opts.port,
|
||||
fetch: App().fetch,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -14,7 +14,6 @@ import { Installation } from "./installation"
|
|||
import { NamedError } from "@opencode-ai/util/error"
|
||||
import { FormatError } from "./cli/error"
|
||||
import { ServeCommand } from "./cli/cmd/serve"
|
||||
import { WorkspaceServeCommand } from "./cli/cmd/workspace-serve"
|
||||
import { Filesystem } from "./util/filesystem"
|
||||
import { DebugCommand } from "./cli/cmd/debug"
|
||||
import { StatsCommand } from "./cli/cmd/stats"
|
||||
|
|
@ -47,7 +46,7 @@ process.on("uncaughtException", (e) => {
|
|||
})
|
||||
})
|
||||
|
||||
let cli = yargs(hideBin(process.argv))
|
||||
const cli = yargs(hideBin(process.argv))
|
||||
.parserConfiguration({ "populate--": true })
|
||||
.scriptName("opencode")
|
||||
.wrap(100)
|
||||
|
|
@ -145,12 +144,6 @@ let cli = yargs(hideBin(process.argv))
|
|||
.command(PrCommand)
|
||||
.command(SessionCommand)
|
||||
.command(DbCommand)
|
||||
|
||||
if (Installation.isLocal()) {
|
||||
cli = cli.command(WorkspaceServeCommand)
|
||||
}
|
||||
|
||||
cli = cli
|
||||
.fail((msg, err) => {
|
||||
if (
|
||||
msg?.startsWith("Unknown argument") ||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import { Auth } from "../auth"
|
|||
import { Flag } from "../flag/flag"
|
||||
import { Command } from "../command"
|
||||
import { Global } from "../global"
|
||||
import { WorkspaceContext } from "../control-plane/workspace-context"
|
||||
import { WorkspaceID } from "../control-plane/schema"
|
||||
import { ProviderID } from "../provider/schema"
|
||||
import { WorkspaceRouterMiddleware } from "../control-plane/workspace-router-middleware"
|
||||
|
|
@ -204,7 +203,6 @@ export namespace Server {
|
|||
)
|
||||
.use(async (c, next) => {
|
||||
if (c.req.path === "/log") return next()
|
||||
const rawWorkspaceID = c.req.query("workspace") || c.req.header("x-opencode-workspace")
|
||||
const raw = c.req.query("directory") || c.req.header("x-opencode-directory") || process.cwd()
|
||||
const directory = Filesystem.resolve(
|
||||
(() => {
|
||||
|
|
@ -216,9 +214,6 @@ export namespace Server {
|
|||
})(),
|
||||
)
|
||||
|
||||
return WorkspaceContext.provide({
|
||||
workspaceID: rawWorkspaceID ? WorkspaceID.make(rawWorkspaceID) : undefined,
|
||||
async fn() {
|
||||
return Instance.provide({
|
||||
directory,
|
||||
init: InstanceBootstrap,
|
||||
|
|
@ -226,10 +221,7 @@ export namespace Server {
|
|||
return next()
|
||||
},
|
||||
})
|
||||
},
|
||||
})
|
||||
})
|
||||
.use(WorkspaceRouterMiddleware)
|
||||
.get(
|
||||
"/doc",
|
||||
openAPIRouteHandler(app, {
|
||||
|
|
@ -252,6 +244,7 @@ export namespace Server {
|
|||
}),
|
||||
),
|
||||
)
|
||||
.use(WorkspaceRouterMiddleware)
|
||||
.route("/project", ProjectRoutes())
|
||||
.route("/pty", PtyRoutes())
|
||||
.route("/config", ConfigRoutes())
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ import { SessionPrompt } from "./prompt"
|
|||
import { fn } from "@/util/fn"
|
||||
import { Command } from "../command"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
import { WorkspaceContext } from "../control-plane/workspace-context"
|
||||
import { ProjectID } from "../project/schema"
|
||||
import { WorkspaceID } from "../control-plane/schema"
|
||||
import { SessionID, MessageID, PartID } from "./schema"
|
||||
|
|
@ -494,8 +493,8 @@ export namespace Session {
|
|||
const project = Instance.project
|
||||
const conditions = [eq(SessionTable.project_id, project.id)]
|
||||
|
||||
if (WorkspaceContext.workspaceID) {
|
||||
conditions.push(eq(SessionTable.workspace_id, WorkspaceContext.workspaceID))
|
||||
if (input?.workspaceID) {
|
||||
conditions.push(eq(SessionTable.workspace_id, input.workspaceID))
|
||||
}
|
||||
if (input?.directory) {
|
||||
conditions.push(eq(SessionTable.directory, input.directory))
|
||||
|
|
|
|||
|
|
@ -1,159 +0,0 @@
|
|||
import { afterEach, describe, expect, mock, test } from "bun:test"
|
||||
import { WorkspaceID } from "../../src/control-plane/schema"
|
||||
import { Hono } from "hono"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { Project } from "../../src/project/project"
|
||||
import { WorkspaceTable } from "../../src/control-plane/workspace.sql"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { WorkspaceContext } from "../../src/control-plane/workspace-context"
|
||||
import { Database } from "../../src/storage/db"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import * as adaptors from "../../src/control-plane/adaptors"
|
||||
import type { Adaptor } from "../../src/control-plane/types"
|
||||
import { Flag } from "../../src/flag/flag"
|
||||
|
||||
afterEach(async () => {
|
||||
mock.restore()
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
|
||||
// @ts-expect-error don't do this normally, but it works
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
|
||||
|
||||
afterEach(() => {
|
||||
// @ts-expect-error don't do this normally, but it works
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
|
||||
})
|
||||
|
||||
type State = {
|
||||
workspace?: "first" | "second"
|
||||
calls: Array<{ method: string; url: string; body?: string }>
|
||||
}
|
||||
|
||||
const remote = { type: "testing", name: "remote-a" } as unknown as typeof WorkspaceTable.$inferInsert
|
||||
|
||||
async function setup(state: State) {
|
||||
const TestAdaptor: Adaptor = {
|
||||
configure(config) {
|
||||
return config
|
||||
},
|
||||
async create() {
|
||||
throw new Error("not used")
|
||||
},
|
||||
async remove() {},
|
||||
|
||||
async fetch(_config: unknown, input: RequestInfo | URL, init?: RequestInit) {
|
||||
const url =
|
||||
input instanceof Request || input instanceof URL
|
||||
? input.toString()
|
||||
: new URL(input, "http://workspace.test").toString()
|
||||
const request = new Request(url, init)
|
||||
const body = request.method === "GET" || request.method === "HEAD" ? undefined : await request.text()
|
||||
state.calls.push({
|
||||
method: request.method,
|
||||
url: `${new URL(request.url).pathname}${new URL(request.url).search}`,
|
||||
body,
|
||||
})
|
||||
return new Response("proxied", { status: 202 })
|
||||
},
|
||||
}
|
||||
|
||||
adaptors.installAdaptor("testing", TestAdaptor)
|
||||
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const { project } = await Project.fromDirectory(tmp.path)
|
||||
|
||||
const id1 = WorkspaceID.ascending()
|
||||
const id2 = WorkspaceID.ascending()
|
||||
|
||||
Database.use((db) =>
|
||||
db
|
||||
.insert(WorkspaceTable)
|
||||
.values([
|
||||
{
|
||||
id: id1,
|
||||
branch: "main",
|
||||
project_id: project.id,
|
||||
type: remote.type,
|
||||
name: remote.name,
|
||||
},
|
||||
{
|
||||
id: id2,
|
||||
branch: "main",
|
||||
project_id: project.id,
|
||||
type: "worktree",
|
||||
directory: tmp.path,
|
||||
name: "local",
|
||||
},
|
||||
])
|
||||
.run(),
|
||||
)
|
||||
|
||||
const { WorkspaceRouterMiddleware } = await import("../../src/control-plane/workspace-router-middleware")
|
||||
const app = new Hono().use(WorkspaceRouterMiddleware)
|
||||
|
||||
return {
|
||||
id1,
|
||||
id2,
|
||||
app,
|
||||
async request(input: RequestInfo | URL, init?: RequestInit) {
|
||||
return Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () =>
|
||||
WorkspaceContext.provide({
|
||||
workspaceID: state.workspace === "first" ? id1 : id2,
|
||||
fn: () => app.request(input, init),
|
||||
}),
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
describe("control-plane/session-proxy-middleware", () => {
|
||||
test("forwards non-GET session requests for workspaces", async () => {
|
||||
const state: State = {
|
||||
workspace: "first",
|
||||
calls: [],
|
||||
}
|
||||
|
||||
const ctx = await setup(state)
|
||||
|
||||
ctx.app.post("/session/foo", (c) => c.text("local", 200))
|
||||
const response = await ctx.request("http://workspace.test/session/foo?x=1", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ hello: "world" }),
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
},
|
||||
})
|
||||
|
||||
expect(response.status).toBe(202)
|
||||
expect(await response.text()).toBe("proxied")
|
||||
expect(state.calls).toEqual([
|
||||
{
|
||||
method: "POST",
|
||||
url: "/session/foo?x=1",
|
||||
body: '{"hello":"world"}',
|
||||
},
|
||||
])
|
||||
})
|
||||
|
||||
// It will behave this way when we have syncing
|
||||
//
|
||||
// test("does not forward GET requests", async () => {
|
||||
// const state: State = {
|
||||
// workspace: "first",
|
||||
// calls: [],
|
||||
// }
|
||||
|
||||
// const ctx = await setup(state)
|
||||
|
||||
// ctx.app.get("/session/foo", (c) => c.text("local", 200))
|
||||
// const response = await ctx.request("http://workspace.test/session/foo?x=1")
|
||||
|
||||
// expect(response.status).toBe(200)
|
||||
// expect(await response.text()).toBe("local")
|
||||
// expect(state.calls).toEqual([])
|
||||
// })
|
||||
})
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { Log } from "../../src/util/log"
|
||||
import { WorkspaceServer } from "../../src/control-plane/workspace-server/server"
|
||||
import { parseSSE } from "../../src/control-plane/sse"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
afterEach(async () => {
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
Log.init({ print: false })
|
||||
|
||||
describe("control-plane/workspace-server SSE", () => {
|
||||
test("streams GlobalBus events and parseSSE reads them", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const app = WorkspaceServer.App()
|
||||
const stop = new AbortController()
|
||||
const seen: unknown[] = []
|
||||
try {
|
||||
const response = await app.request("/event", {
|
||||
signal: stop.signal,
|
||||
headers: {
|
||||
"x-opencode-workspace": "wrk_test_workspace",
|
||||
"x-opencode-directory": tmp.path,
|
||||
},
|
||||
})
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(response.body).toBeDefined()
|
||||
|
||||
const done = new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
reject(new Error("timed out waiting for workspace.test event"))
|
||||
}, 3000)
|
||||
|
||||
void parseSSE(response.body!, stop.signal, (event) => {
|
||||
seen.push(event)
|
||||
const next = event as { type?: string }
|
||||
if (next.type === "server.connected") {
|
||||
GlobalBus.emit("event", {
|
||||
payload: {
|
||||
type: "workspace.test",
|
||||
properties: { ok: true },
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
if (next.type !== "workspace.test") return
|
||||
clearTimeout(timeout)
|
||||
resolve()
|
||||
}).catch((error) => {
|
||||
clearTimeout(timeout)
|
||||
reject(error)
|
||||
})
|
||||
})
|
||||
|
||||
await done
|
||||
|
||||
expect(seen.some((event) => (event as { type?: string }).type === "server.connected")).toBe(true)
|
||||
expect(seen).toContainEqual({
|
||||
type: "workspace.test",
|
||||
properties: { ok: true },
|
||||
})
|
||||
} finally {
|
||||
stop.abort()
|
||||
}
|
||||
})
|
||||
})
|
||||
|
|
@ -1,99 +0,0 @@
|
|||
import { afterEach, describe, expect, mock, test } from "bun:test"
|
||||
import { WorkspaceID } from "../../src/control-plane/schema"
|
||||
import { Log } from "../../src/util/log"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { Project } from "../../src/project/project"
|
||||
import { Database } from "../../src/storage/db"
|
||||
import { WorkspaceTable } from "../../src/control-plane/workspace.sql"
|
||||
import { GlobalBus } from "../../src/bus/global"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import * as adaptors from "../../src/control-plane/adaptors"
|
||||
import type { Adaptor } from "../../src/control-plane/types"
|
||||
|
||||
afterEach(async () => {
|
||||
mock.restore()
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
Log.init({ print: false })
|
||||
|
||||
const remote = { type: "testing", name: "remote-a" } as unknown as typeof WorkspaceTable.$inferInsert
|
||||
|
||||
const TestAdaptor: Adaptor = {
|
||||
configure(config) {
|
||||
return config
|
||||
},
|
||||
async create() {
|
||||
throw new Error("not used")
|
||||
},
|
||||
async remove() {},
|
||||
async fetch(_config: unknown, _input: RequestInfo | URL, _init?: RequestInit) {
|
||||
const body = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
const encoder = new TextEncoder()
|
||||
controller.enqueue(encoder.encode('data: {"type":"remote.ready","properties":{}}\n\n'))
|
||||
controller.close()
|
||||
},
|
||||
})
|
||||
return new Response(body, {
|
||||
status: 200,
|
||||
headers: {
|
||||
"content-type": "text/event-stream",
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
adaptors.installAdaptor("testing", TestAdaptor)
|
||||
|
||||
describe("control-plane/workspace.startSyncing", () => {
|
||||
test("syncs only remote workspaces and emits remote SSE events", async () => {
|
||||
const { Workspace } = await import("../../src/control-plane/workspace")
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
const { project } = await Project.fromDirectory(tmp.path)
|
||||
|
||||
const id1 = WorkspaceID.ascending()
|
||||
const id2 = WorkspaceID.ascending()
|
||||
|
||||
Database.use((db) =>
|
||||
db
|
||||
.insert(WorkspaceTable)
|
||||
.values([
|
||||
{
|
||||
id: id1,
|
||||
branch: "main",
|
||||
project_id: project.id,
|
||||
type: remote.type,
|
||||
name: remote.name,
|
||||
},
|
||||
{
|
||||
id: id2,
|
||||
branch: "main",
|
||||
project_id: project.id,
|
||||
type: "worktree",
|
||||
directory: tmp.path,
|
||||
name: "local",
|
||||
},
|
||||
])
|
||||
.run(),
|
||||
)
|
||||
|
||||
const done = new Promise<void>((resolve) => {
|
||||
const listener = (event: { directory?: string; payload: { type: string } }) => {
|
||||
if (event.directory !== id1) return
|
||||
if (event.payload.type !== "remote.ready") return
|
||||
GlobalBus.off("event", listener)
|
||||
resolve()
|
||||
}
|
||||
GlobalBus.on("event", listener)
|
||||
})
|
||||
|
||||
const sync = Workspace.startSyncing(project)
|
||||
await Promise.race([
|
||||
done,
|
||||
new Promise((_, reject) => setTimeout(() => reject(new Error("timed out waiting for sync event")), 2000)),
|
||||
])
|
||||
|
||||
await sync.stop()
|
||||
})
|
||||
})
|
||||
Loading…
Reference in New Issue