From 8e9e79d2769f0f944fa9da9add032e46348eb850 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 2 Apr 2026 20:56:56 -0400 Subject: [PATCH] refactor(share): effectify share next (#20596) --- packages/opencode/src/share/share-next.ts | 534 ++++++++++-------- .../opencode/test/share/share-next.test.ts | 387 ++++++++++--- 2 files changed, 630 insertions(+), 291 deletions(-) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 2a11094f80..2eb9887ea4 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -1,152 +1,47 @@ -import { Bus } from "@/bus" -import { Account } from "@/account" -import { Config } from "@/config/config" -import { Provider } from "@/provider/provider" -import { ProviderID, ModelID } from "@/provider/schema" -import { Session } from "@/session" -import type { SessionID } from "@/session/schema" -import { MessageV2 } from "@/session/message-v2" -import { Database, eq } from "@/storage/db" -import { SessionShareTable } from "./share.sql" -import { Log } from "@/util/log" import type * as SDK from "@opencode-ai/sdk/v2" +import { Effect, Exit, Layer, Option, Schema, Scope, ServiceMap, Stream } from "effect" +import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" +import { Account } from "@/account" +import { Bus } from "@/bus" +import { InstanceState } from "@/effect/instance-state" +import { makeRuntime } from "@/effect/run-service" +import { Provider } from "@/provider/provider" +import { ModelID, ProviderID } from "@/provider/schema" +import { Session } from "@/session" +import { MessageV2 } from "@/session/message-v2" +import type { SessionID } from "@/session/schema" +import { Database, eq } from "@/storage/db" +import { Config } from "@/config/config" +import { Log } from "@/util/log" +import { SessionShareTable } from "./share.sql" export namespace ShareNext { const log = Log.create({ service: "share-next" }) - - type ApiEndpoints = { - create: string - sync: (shareId: string) => string - remove: (shareId: string) => string - data: (shareId: string) => string - } - - function apiEndpoints(resource: string): ApiEndpoints { - return { - create: `/api/${resource}`, - sync: (shareId) => `/api/${resource}/${shareId}/sync`, - remove: (shareId) => `/api/${resource}/${shareId}`, - data: (shareId) => `/api/${resource}/${shareId}/data`, - } - } - - const legacyApi = apiEndpoints("share") - const consoleApi = apiEndpoints("shares") - - export async function url() { - const req = await request() - return req.baseUrl - } - - export async function request(): Promise<{ - headers: Record - api: ApiEndpoints - baseUrl: string - }> { - const headers: Record = {} - - const active = await Account.active() - if (!active?.active_org_id) { - const baseUrl = await Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai") - return { headers, api: legacyApi, baseUrl } - } - - const token = await Account.token(active.id) - if (!token) { - throw new Error("No active account token available for sharing") - } - - headers["authorization"] = `Bearer ${token}` - headers["x-org-id"] = active.active_org_id - return { headers, api: consoleApi, baseUrl: active.url } - } - const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1" - export async function init() { - if (disabled) return - Bus.subscribe(Session.Event.Updated, async (evt) => { - const session = await Session.get(evt.properties.sessionID) - - await sync(session.id, [ - { - type: "session", - data: session, - }, - ]) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - const info = evt.properties.info - await sync(info.sessionID, [ - { - type: "message", - data: evt.properties.info, - }, - ]) - if (info.role === "user") { - await sync(info.sessionID, [ - { - type: "model", - data: [await Provider.getModel(info.model.providerID, info.model.modelID).then((m) => m)], - }, - ]) - } - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync(evt.properties.part.sessionID, [ - { - type: "part", - data: evt.properties.part, - }, - ]) - }) - Bus.subscribe(Session.Event.Diff, async (evt) => { - await sync(evt.properties.sessionID, [ - { - type: "session_diff", - data: evt.properties.diff, - }, - ]) - }) + export type Api = { + create: string + sync: (shareID: string) => string + remove: (shareID: string) => string + data: (shareID: string) => string } - export async function create(sessionID: SessionID) { - if (disabled) return { id: "", url: "", secret: "" } - log.info("creating share", { sessionID }) - const req = await request() - const response = await fetch(`${req.baseUrl}${req.api.create}`, { - method: "POST", - headers: { ...req.headers, "Content-Type": "application/json" }, - body: JSON.stringify({ sessionID: sessionID }), - }) - - if (!response.ok) { - const message = await response.text().catch(() => response.statusText) - throw new Error(`Failed to create share (${response.status}): ${message || response.statusText}`) - } - - const result = (await response.json()) as { id: string; url: string; secret: string } - - Database.use((db) => - db - .insert(SessionShareTable) - .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url }) - .onConflictDoUpdate({ - target: SessionShareTable.session_id, - set: { id: result.id, secret: result.secret, url: result.url }, - }) - .run(), - ) - fullSync(sessionID) - return result + export type Req = { + headers: Record + api: Api + baseUrl: string } - function get(sessionID: SessionID) { - const row = Database.use((db) => - db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(), - ) - if (!row) return - return { id: row.id, secret: row.secret, url: row.url } + const ShareSchema = Schema.Struct({ + id: Schema.String, + url: Schema.String, + secret: Schema.String, + }) + export type Share = typeof ShareSchema.Type + + type State = { + queue: Map }> + scope: Scope.Closeable } type Data = @@ -171,6 +66,31 @@ export namespace ShareNext { data: SDK.Model[] } + export interface Interface { + readonly init: () => Effect.Effect + readonly url: () => Effect.Effect + readonly request: () => Effect.Effect + readonly create: (sessionID: SessionID) => Effect.Effect + readonly remove: (sessionID: SessionID) => Effect.Effect + } + + export class Service extends ServiceMap.Service()("@opencode/ShareNext") {} + + const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => + Effect.sync(() => Database.use(fn)) + + function api(resource: string): Api { + return { + create: `/api/${resource}`, + sync: (shareID) => `/api/${resource}/${shareID}/sync`, + remove: (shareID) => `/api/${resource}/${shareID}`, + data: (shareID) => `/api/${resource}/${shareID}/data`, + } + } + + const legacyApi = api("share") + const consoleApi = api("shares") + function key(item: Data) { switch (item.type) { case "session": @@ -186,102 +106,264 @@ export namespace ShareNext { } } - const queue = new Map }>() - async function sync(sessionID: SessionID, data: Data[]) { - if (disabled) return - const existing = queue.get(sessionID) - if (existing) { - for (const item of data) { - existing.data.set(key(item), item) + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const account = yield* Account.Service + const bus = yield* Bus.Service + const cfg = yield* Config.Service + const http = yield* HttpClient.HttpClient + const httpOk = HttpClient.filterStatusOk(http) + const provider = yield* Provider.Service + const session = yield* Session.Service + + function sync(sessionID: SessionID, data: Data[]): Effect.Effect { + return Effect.gen(function* () { + if (disabled) return + const s = yield* InstanceState.get(state) + const existing = s.queue.get(sessionID) + if (existing) { + for (const item of data) { + existing.data.set(key(item), item) + } + return + } + + const next = new Map(data.map((item) => [key(item), item])) + s.queue.set(sessionID, { data: next }) + yield* flush(sessionID).pipe( + Effect.delay(1000), + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("share flush failed", { sessionID, cause }) + }), + ), + Effect.forkIn(s.scope), + ) + }) } - return - } - const dataMap = new Map() - for (const item of data) { - dataMap.set(key(item), item) - } + const state: InstanceState = yield* InstanceState.make( + Effect.fn("ShareNext.state")(function* (_ctx) { + const cache: State = { queue: new Map(), scope: yield* Scope.make() } - const timeout = setTimeout(async () => { - const queued = queue.get(sessionID) - if (!queued) return - queue.delete(sessionID) - const share = get(sessionID) - if (!share) return + yield* Effect.addFinalizer(() => + Scope.close(cache.scope, Exit.void).pipe( + Effect.andThen( + Effect.sync(() => { + cache.queue.clear() + }), + ), + ), + ) - const req = await request() - const response = await fetch(`${req.baseUrl}${req.api.sync(share.id)}`, { - method: "POST", - headers: { ...req.headers, "Content-Type": "application/json" }, - body: JSON.stringify({ - secret: share.secret, - data: Array.from(queued.data.values()), + if (disabled) return cache + + const watch = (def: D, fn: (evt: { properties: any }) => Effect.Effect) => + bus.subscribe(def as never).pipe( + Stream.runForEach((evt) => + fn(evt).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("share subscriber failed", { type: def.type, cause }) + }), + ), + ), + ), + Effect.forkScoped, + ) + + yield* watch(Session.Event.Updated, (evt) => + Effect.gen(function* () { + const info = yield* session.get(evt.properties.sessionID) + yield* sync(info.id, [{ type: "session", data: info }]) + }), + ) + yield* watch(MessageV2.Event.Updated, (evt) => + Effect.gen(function* () { + const info = evt.properties.info + yield* sync(info.sessionID, [{ type: "message", data: info }]) + if (info.role !== "user") return + const model = yield* provider.getModel(info.model.providerID, info.model.modelID) + yield* sync(info.sessionID, [{ type: "model", data: [model] }]) + }), + ) + yield* watch(MessageV2.Event.PartUpdated, (evt) => + sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]), + ) + yield* watch(Session.Event.Diff, (evt) => + sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]), + ) + + return cache }), + ) + + const request = Effect.fn("ShareNext.request")(function* () { + const headers: Record = {} + const active = yield* account.active() + if (Option.isNone(active) || !active.value.active_org_id) { + const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai" + return { headers, api: legacyApi, baseUrl } satisfies Req + } + + const token = yield* account.token(active.value.id) + if (Option.isNone(token)) { + throw new Error("No active account token available for sharing") + } + + headers.authorization = `Bearer ${token.value}` + headers["x-org-id"] = active.value.active_org_id + return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req }) - if (!response.ok) { - log.warn("failed to sync share", { sessionID, shareID: share.id, status: response.status }) - } - }, 1000) - queue.set(sessionID, { timeout, data: dataMap }) + const get = Effect.fnUntraced(function* (sessionID: SessionID) { + const row = yield* db((db) => + db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(), + ) + if (!row) return + return { id: row.id, secret: row.secret, url: row.url } satisfies Share + }) + + const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) { + if (disabled) return + const s = yield* InstanceState.get(state) + const queued = s.queue.get(sessionID) + if (!queued) return + + s.queue.delete(sessionID) + + const share = yield* get(sessionID) + if (!share) return + + const req = yield* request() + const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe( + HttpClientRequest.setHeaders(req.headers), + HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }), + Effect.flatMap((r) => http.execute(r)), + ) + + if (res.status >= 400) { + log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status }) + } + }) + + const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) { + log.info("full sync", { sessionID }) + const info = yield* session.get(sessionID) + const diffs = yield* session.diff(sessionID) + const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID))) + const models = yield* Effect.forEach( + Array.from( + new Map( + messages + .filter((msg) => msg.info.role === "user") + .map((msg) => (msg.info as SDK.UserMessage).model) + .map((item) => [`${item.providerID}/${item.modelID}`, item] as const), + ).values(), + ), + (item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)), + { concurrency: 8 }, + ) + + yield* sync(sessionID, [ + { type: "session", data: info }, + ...messages.map((item) => ({ type: "message" as const, data: item.info })), + ...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))), + { type: "session_diff", data: diffs }, + { type: "model", data: models }, + ]) + }) + + const init = Effect.fn("ShareNext.init")(function* () { + if (disabled) return + yield* InstanceState.get(state) + }) + + const url = Effect.fn("ShareNext.url")(function* () { + return (yield* request()).baseUrl + }) + + const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) { + if (disabled) return { id: "", url: "", secret: "" } + log.info("creating share", { sessionID }) + const req = yield* request() + const result = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.create}`).pipe( + HttpClientRequest.setHeaders(req.headers), + HttpClientRequest.bodyJson({ sessionID }), + Effect.flatMap((r) => httpOk.execute(r)), + Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)), + ) + yield* db((db) => + db + .insert(SessionShareTable) + .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url }) + .onConflictDoUpdate({ + target: SessionShareTable.session_id, + set: { id: result.id, secret: result.secret, url: result.url }, + }) + .run(), + ) + const s = yield* InstanceState.get(state) + yield* full(sessionID).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("share full sync failed", { sessionID, cause }) + }), + ), + Effect.forkIn(s.scope), + ) + return result + }) + + const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) { + if (disabled) return + log.info("removing share", { sessionID }) + const share = yield* get(sessionID) + if (!share) return + + const req = yield* request() + yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe( + HttpClientRequest.setHeaders(req.headers), + HttpClientRequest.bodyJson({ secret: share.secret }), + Effect.flatMap((r) => httpOk.execute(r)), + ) + + yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run()) + }) + + return Service.of({ init, url, request, create, remove }) + }), + ) + + export const defaultLayer = layer.pipe( + Layer.provide(Bus.layer), + Layer.provide(Account.defaultLayer), + Layer.provide(Config.defaultLayer), + Layer.provide(FetchHttpClient.layer), + Layer.provide(Provider.defaultLayer), + Layer.provide(Session.defaultLayer), + ) + + const { runPromise } = makeRuntime(Service, defaultLayer) + + export async function init() { + return runPromise((svc) => svc.init()) + } + + export async function url() { + return runPromise((svc) => svc.url()) + } + + export async function request(): Promise { + return runPromise((svc) => svc.request()) + } + + export async function create(sessionID: SessionID) { + return runPromise((svc) => svc.create(sessionID)) } export async function remove(sessionID: SessionID) { - if (disabled) return - log.info("removing share", { sessionID }) - const share = get(sessionID) - if (!share) return - - const req = await request() - const response = await fetch(`${req.baseUrl}${req.api.remove(share.id)}`, { - method: "DELETE", - headers: { ...req.headers, "Content-Type": "application/json" }, - body: JSON.stringify({ - secret: share.secret, - }), - }) - - if (!response.ok) { - const message = await response.text().catch(() => response.statusText) - throw new Error(`Failed to remove share (${response.status}): ${message || response.statusText}`) - } - - Database.use((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run()) - } - - async function fullSync(sessionID: SessionID) { - log.info("full sync", { sessionID }) - const session = await Session.get(sessionID) - const diffs = await Session.diff(sessionID) - const messages = await Array.fromAsync(MessageV2.stream(sessionID)) - const models = await Promise.all( - Array.from( - new Map( - messages - .filter((m) => m.info.role === "user") - .map((m) => (m.info as SDK.UserMessage).model) - .map((m) => [`${m.providerID}/${m.modelID}`, m] as const), - ).values(), - ).map((m) => Provider.getModel(ProviderID.make(m.providerID), ModelID.make(m.modelID)).then((item) => item)), - ) - await sync(sessionID, [ - { - type: "session", - data: session, - }, - ...messages.map((x) => ({ - type: "message" as const, - data: x.info, - })), - ...messages.flatMap((x) => x.parts.map((y) => ({ type: "part" as const, data: y }))), - { - type: "session_diff", - data: diffs, - }, - { - type: "model", - data: models, - }, - ]) + return runPromise((svc) => svc.remove(sessionID)) } } diff --git a/packages/opencode/test/share/share-next.test.ts b/packages/opencode/test/share/share-next.test.ts index fc8d511509..12d71f19a0 100644 --- a/packages/opencode/test/share/share-next.test.ts +++ b/packages/opencode/test/share/share-next.test.ts @@ -1,76 +1,333 @@ -import { test, expect, mock } from "bun:test" -import { ShareNext } from "../../src/share/share-next" -import { AccessToken, Account, AccountID, OrgID } from "../../src/account" +import { NodeFileSystem } from "@effect/platform-node" +import { beforeEach, describe, expect } from "bun:test" +import { Effect, Exit, Layer, Option } from "effect" +import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" + +import { AccessToken, AccountID, OrgID, RefreshToken } from "../../src/account" +import { Account } from "../../src/account" +import { AccountRepo } from "../../src/account/repo" +import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" +import { Bus } from "../../src/bus" import { Config } from "../../src/config/config" +import { Provider } from "../../src/provider/provider" +import { Session } from "../../src/session" +import type { SessionID } from "../../src/session/schema" +import { ShareNext } from "../../src/share/share-next" +import { SessionShareTable } from "../../src/share/share.sql" +import { Database, eq } from "../../src/storage/db" +import { provideTmpdirInstance } from "../fixture/fixture" +import { resetDatabase } from "../fixture/db" +import { testEffect } from "../lib/effect" -test("ShareNext.request uses legacy share API without active org account", async () => { - const originalActive = Account.active - const originalConfigGet = Config.get +const env = Layer.mergeAll( + Session.defaultLayer, + AccountRepo.layer, + NodeFileSystem.layer, + CrossSpawnSpawner.defaultLayer, +) +const it = testEffect(env) - Account.active = mock(async () => undefined) - Config.get = mock(async () => ({ enterprise: { url: "https://legacy-share.example.com" } })) +const json = (req: Parameters[0], body: unknown, status = 200) => + HttpClientResponse.fromWeb( + req, + new Response(JSON.stringify(body), { + status, + headers: { "content-type": "application/json" }, + }), + ) - try { - const req = await ShareNext.request() +const none = HttpClient.make(() => Effect.die("unexpected http call")) - expect(req.api.create).toBe("/api/share") - expect(req.api.sync("shr_123")).toBe("/api/share/shr_123/sync") - expect(req.api.remove("shr_123")).toBe("/api/share/shr_123") - expect(req.api.data("shr_123")).toBe("/api/share/shr_123/data") - expect(req.baseUrl).toBe("https://legacy-share.example.com") - expect(req.headers).toEqual({}) - } finally { - Account.active = originalActive - Config.get = originalConfigGet - } +function live(client: HttpClient.HttpClient) { + const http = Layer.succeed(HttpClient.HttpClient, client) + return ShareNext.layer.pipe( + Layer.provide(Bus.layer), + Layer.provide(Account.layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(http))), + Layer.provide(Config.defaultLayer), + Layer.provide(http), + Layer.provide(Provider.defaultLayer), + Layer.provide(Session.defaultLayer), + ) +} + +function wired(client: HttpClient.HttpClient) { + const http = Layer.succeed(HttpClient.HttpClient, client) + return Layer.mergeAll( + Bus.layer, + ShareNext.layer, + Session.layer, + AccountRepo.layer, + NodeFileSystem.layer, + CrossSpawnSpawner.defaultLayer, + ).pipe( + Layer.provide(Bus.layer), + Layer.provide(Account.layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(http))), + Layer.provide(Config.defaultLayer), + Layer.provide(http), + Layer.provide(Provider.defaultLayer), + ) +} + +const share = (id: SessionID) => + Database.use((db) => db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, id)).get()) + +const seed = (url: string, org?: string) => + AccountRepo.use((repo) => + repo.persistAccount({ + id: AccountID.make("account-1"), + email: "user@example.com", + url, + accessToken: AccessToken.make("st_test_token"), + refreshToken: RefreshToken.make("rt_test_token"), + expiry: Date.now() + 10 * 60_000, + orgID: org ? Option.some(OrgID.make(org)) : Option.none(), + }), + ) + +beforeEach(async () => { + await resetDatabase() }) -test("ShareNext.request uses org share API with auth headers when account is active", async () => { - const originalActive = Account.active - const originalToken = Account.token +describe("ShareNext", () => { + it.live("request uses legacy share API without active org account", () => + provideTmpdirInstance( + () => + ShareNext.Service.use((svc) => + Effect.gen(function* () { + const req = yield* svc.request() - Account.active = mock(async () => ({ - id: AccountID.make("account-1"), - email: "user@example.com", - url: "https://control.example.com", - active_org_id: OrgID.make("org-1"), - })) - Account.token = mock(async () => AccessToken.make("st_test_token")) + expect(req.api.create).toBe("/api/share") + expect(req.api.sync("shr_123")).toBe("/api/share/shr_123/sync") + expect(req.api.remove("shr_123")).toBe("/api/share/shr_123") + expect(req.api.data("shr_123")).toBe("/api/share/shr_123/data") + expect(req.baseUrl).toBe("https://legacy-share.example.com") + expect(req.headers).toEqual({}) + }), + ).pipe(Effect.provide(live(none))), + { config: { enterprise: { url: "https://legacy-share.example.com" } } }, + ), + ) - try { - const req = await ShareNext.request() + it.live("request uses default URL when no enterprise config", () => + provideTmpdirInstance(() => + ShareNext.Service.use((svc) => + Effect.gen(function* () { + const req = yield* svc.request() - expect(req.api.create).toBe("/api/shares") - expect(req.api.sync("shr_123")).toBe("/api/shares/shr_123/sync") - expect(req.api.remove("shr_123")).toBe("/api/shares/shr_123") - expect(req.api.data("shr_123")).toBe("/api/shares/shr_123/data") - expect(req.baseUrl).toBe("https://control.example.com") - expect(req.headers).toEqual({ - authorization: "Bearer st_test_token", - "x-org-id": "org-1", - }) - } finally { - Account.active = originalActive - Account.token = originalToken - } -}) - -test("ShareNext.request fails when org account has no token", async () => { - const originalActive = Account.active - const originalToken = Account.token - - Account.active = mock(async () => ({ - id: AccountID.make("account-1"), - email: "user@example.com", - url: "https://control.example.com", - active_org_id: OrgID.make("org-1"), - })) - Account.token = mock(async () => undefined) - - try { - await expect(ShareNext.request()).rejects.toThrow("No active account token available for sharing") - } finally { - Account.active = originalActive - Account.token = originalToken - } + expect(req.baseUrl).toBe("https://opncd.ai") + expect(req.api.create).toBe("/api/share") + expect(req.headers).toEqual({}) + }), + ).pipe(Effect.provide(live(none))), + ), + ) + + it.live("request uses org share API with auth headers when account is active", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + yield* seed("https://control.example.com", "org-1") + + const req = yield* ShareNext.Service.use((svc) => svc.request()).pipe(Effect.provide(live(none))) + + expect(req.api.create).toBe("/api/shares") + expect(req.api.sync("shr_123")).toBe("/api/shares/shr_123/sync") + expect(req.api.remove("shr_123")).toBe("/api/shares/shr_123") + expect(req.api.data("shr_123")).toBe("/api/shares/shr_123/data") + expect(req.baseUrl).toBe("https://control.example.com") + expect(req.headers).toEqual({ + authorization: "Bearer st_test_token", + "x-org-id": "org-1", + }) + }), + ), + ) + + it.live("create posts share, persists it, and returns the result", () => + provideTmpdirInstance( + () => + Effect.gen(function* () { + const session = yield* Session.Service.use((svc) => svc.create({ title: "test" })) + const seen: HttpClientRequest.HttpClientRequest[] = [] + const client = HttpClient.make((req) => { + seen.push(req) + if (req.url.endsWith("/api/share")) { + return Effect.succeed( + json(req, { + id: "shr_abc", + url: "https://legacy-share.example.com/share/abc", + secret: "sec_123", + }), + ) + } + return Effect.succeed(json(req, { ok: true })) + }) + + const result = yield* ShareNext.Service.use((svc) => svc.create(session.id)).pipe( + Effect.provide(live(client)), + ) + + expect(result.id).toBe("shr_abc") + expect(result.url).toBe("https://legacy-share.example.com/share/abc") + expect(result.secret).toBe("sec_123") + + const row = share(session.id) + expect(row?.id).toBe("shr_abc") + expect(row?.url).toBe("https://legacy-share.example.com/share/abc") + expect(row?.secret).toBe("sec_123") + + expect(seen).toHaveLength(1) + expect(seen[0].method).toBe("POST") + expect(seen[0].url).toBe("https://legacy-share.example.com/api/share") + }), + { config: { enterprise: { url: "https://legacy-share.example.com" } } }, + ), + ) + + it.live("remove deletes the persisted share and calls the delete endpoint", () => + provideTmpdirInstance( + () => + Effect.gen(function* () { + const session = yield* Session.Service.use((svc) => svc.create({ title: "test" })) + const seen: HttpClientRequest.HttpClientRequest[] = [] + const client = HttpClient.make((req) => { + seen.push(req) + if (req.method === "POST") { + return Effect.succeed( + json(req, { + id: "shr_abc", + url: "https://legacy-share.example.com/share/abc", + secret: "sec_123", + }), + ) + } + return Effect.succeed(HttpClientResponse.fromWeb(req, new Response(null, { status: 200 }))) + }) + + yield* Effect.gen(function* () { + yield* ShareNext.Service.use((svc) => svc.create(session.id)) + yield* ShareNext.Service.use((svc) => svc.remove(session.id)) + }).pipe(Effect.provide(live(client))) + + expect(share(session.id)).toBeUndefined() + expect(seen.map((req) => [req.method, req.url])).toEqual([ + ["POST", "https://legacy-share.example.com/api/share"], + ["DELETE", "https://legacy-share.example.com/api/share/shr_abc"], + ]) + }), + { config: { enterprise: { url: "https://legacy-share.example.com" } } }, + ), + ) + + it.live("create fails on a non-ok response and does not persist a share", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const session = yield* Session.Service.use((svc) => svc.create({ title: "test" })) + const client = HttpClient.make((req) => Effect.succeed(json(req, { error: "bad" }, 500))) + + const exit = yield* ShareNext.Service.use((svc) => Effect.exit(svc.create(session.id))).pipe( + Effect.provide(live(client)), + ) + + expect(Exit.isFailure(exit)).toBe(true) + expect(share(session.id)).toBeUndefined() + }), + ), + ) + + it.live("ShareNext coalesces rapid diff events into one delayed sync with latest data", () => + provideTmpdirInstance( + () => { + const seen: Array<{ url: string; body: string }> = [] + const client = HttpClient.make((req) => { + if (req.url.endsWith("/sync") && req.body._tag === "Uint8Array") { + seen.push({ url: req.url, body: new TextDecoder().decode(req.body.body) }) + } + return Effect.succeed(json(req, { ok: true })) + }) + + return Effect.gen(function* () { + const bus = yield* Bus.Service + const share = yield* ShareNext.Service + const session = yield* Session.Service + + const info = yield* session.create({ title: "first" }) + yield* share.init() + yield* Effect.sleep(50) + yield* Effect.sync(() => + Database.use((db) => + db + .insert(SessionShareTable) + .values({ + session_id: info.id, + id: "shr_abc", + url: "https://legacy-share.example.com/share/abc", + secret: "sec_123", + }) + .run(), + ), + ) + + yield* bus.publish(Session.Event.Diff, { + sessionID: info.id, + diff: [ + { + file: "a.ts", + before: "one", + after: "two", + additions: 1, + deletions: 1, + status: "modified", + }, + ], + }) + yield* bus.publish(Session.Event.Diff, { + sessionID: info.id, + diff: [ + { + file: "b.ts", + before: "old", + after: "new", + additions: 2, + deletions: 0, + status: "modified", + }, + ], + }) + yield* Effect.sleep(1_250) + + expect(seen).toHaveLength(1) + expect(seen[0].url).toBe("https://legacy-share.example.com/api/share/shr_abc/sync") + + const body = JSON.parse(seen[0].body) as { + secret: string + data: Array<{ + type: string + data: Array<{ + file: string + before: string + after: string + additions: number + deletions: number + status?: string + }> + }> + } + expect(body.secret).toBe("sec_123") + expect(body.data).toHaveLength(1) + expect(body.data[0].type).toBe("session_diff") + expect(body.data[0].data).toEqual([ + { + file: "b.ts", + before: "old", + after: "new", + additions: 2, + deletions: 0, + status: "modified", + }, + ]) + }).pipe(Effect.provide(wired(client))) + }, + { config: { enterprise: { url: "https://legacy-share.example.com" } } }, + ), + ) })