From dc719269b60f5721970ad0fcf3f5dc6c7c52262a Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 1 Apr 2026 23:31:36 -0400 Subject: [PATCH] refactor(sync): effectify sync event --- packages/opencode/src/sync/index.ts | 367 +++++++++++++++++----------- 1 file changed, 218 insertions(+), 149 deletions(-) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 270950fd4b..9b9dd27b4c 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,6 +1,8 @@ import z from "zod" import type { ZodObject } from "zod" import { EventEmitter } from "events" +import { Effect, Layer, ServiceMap } from "effect" +import { makeRuntime } from "@/effect/run-service" import { Database, eq } from "@/storage/db" import { Bus as ProjectBus } from "@/bus" import { BusEvent } from "@/bus/bus-event" @@ -31,37 +33,18 @@ export namespace SyncEvent { type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void - export const registry = new Map() - let projectors: Map | undefined - const versions = new Map() - let frozen = false - let convertEvent: (type: string, event: Event["data"]) => Promise> | Record - - const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>() - - export function reset() { - frozen = false - projectors = undefined - convertEvent = (_, data) => data + type State = { + projectors: Map | undefined + convert: (type: string, event: Event["data"]) => Promise> | Record + bus: EventEmitter<{ event: [{ def: Definition; event: Event }] }> } - export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) { - projectors = new Map(input.projectors) + export const registry = new Map() + const versions = new Map() + let frozen = false - // Install all the latest event defs to the bus. We only ever emit - // latest versions from code, and keep around old versions for - // replaying. Replaying does not go through the bus, and it - // simplifies the bus to only use unversioned latest events - for (let [type, version] of versions.entries()) { - let def = registry.get(versionedType(type, version))! - - BusEvent.define(def.type, def.properties || def.schema) - } - - // Freeze the system so it clearly errors if events are defined - // after `init` which would cause bugs - frozen = true - convertEvent = input.convertEvent || ((_, data) => data) + function noop(_: string, data: Event["data"]) { + return data } export function versionedType(type: A): A @@ -102,140 +85,226 @@ export namespace SyncEvent { return [def, func as ProjectorFunc] } - function process(def: Def, event: Event, options: { publish: boolean }) { - if (projectors == null) { - throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") - } - - const projector = projectors.get(def) - if (!projector) { - throw new Error(`Projector not found for event: ${def.type}`) - } - - // idempotent: need to ignore any events already logged - - Database.transaction((tx) => { - projector(tx, event.data) - - if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { - tx.insert(EventSequenceTable) - .values({ - aggregate_id: event.aggregateID, - seq: event.seq, - }) - .onConflictDoUpdate({ - target: EventSequenceTable.aggregate_id, - set: { seq: event.seq }, - }) - .run() - tx.insert(EventTable) - .values({ - id: event.id, - seq: event.seq, - aggregate_id: event.aggregateID, - type: versionedType(def.type, def.version), - data: event.data as Record, - }) - .run() - } - - Database.effect(() => { - Bus.emit("event", { - def, - event, - }) - - if (options?.publish) { - const result = convertEvent(def.type, event.data) - if (result instanceof Promise) { - result.then((data) => { - ProjectBus.publish({ type: def.type, properties: def.schema }, data) - }) - } else { - ProjectBus.publish({ type: def.type, properties: def.schema }, result) - } - } - }) - }) + export interface Interface { + readonly reset: () => Effect.Effect + readonly init: (input: { + projectors: Array<[Definition, ProjectorFunc]> + convertEvent?: State["convert"] + }) => Effect.Effect + readonly replay: (event: SerializedEvent, options?: { republish: boolean }) => Effect.Effect + readonly run: (def: Def, data: Event["data"]) => Effect.Effect + readonly remove: (aggregateID: string) => Effect.Effect + readonly subscribeAll: (handler: (event: { def: Definition; event: Event }) => void) => Effect.Effect<() => void> } - // TODO: - // - // * Support applying multiple events at one time. One transaction, - // and it validets all the sequence ids - // * when loading events from db, apply zod validation to ensure shape + export class Service extends ServiceMap.Service()("@opencode/SyncEvent") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const state: State = { + projectors: undefined, + convert: noop, + bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(), + } + + const process = Effect.fnUntraced(function* ( + def: Def, + event: Event, + options: { publish: boolean }, + ) { + if (state.projectors == null) { + throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") + } + + const projector = state.projectors.get(def) + if (!projector) { + throw new Error(`Projector not found for event: ${def.type}`) + } + + Database.transaction((tx) => { + projector(tx, event.data) + + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { + tx.insert(EventSequenceTable) + .values({ + aggregate_id: event.aggregateID, + seq: event.seq, + }) + .onConflictDoUpdate({ + target: EventSequenceTable.aggregate_id, + set: { seq: event.seq }, + }) + .run() + tx.insert(EventTable) + .values({ + id: event.id, + seq: event.seq, + aggregate_id: event.aggregateID, + type: versionedType(def.type, def.version), + data: event.data as Record, + }) + .run() + } + + Database.effect(() => { + state.bus.emit("event", { def, event }) + + if (!options.publish) return + + const result = state.convert(def.type, event.data) + if (result instanceof Promise) { + result.then((data) => { + ProjectBus.publish({ type: def.type, properties: def.schema }, data) + }) + return + } + + ProjectBus.publish({ type: def.type, properties: def.schema }, result) + }) + }) + }) + + const reset = Effect.fn("SyncEvent.reset")(() => + Effect.sync(() => { + frozen = false + state.projectors = undefined + state.convert = noop + }), + ) + + const init = Effect.fn("SyncEvent.init")( + (input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) => + Effect.sync(() => { + state.projectors = new Map(input.projectors) + + for (const [type, version] of versions.entries()) { + const def = registry.get(versionedType(type, version))! + BusEvent.define(def.type, def.properties || def.schema) + } + + frozen = true + state.convert = input.convertEvent || noop + }), + ) + + // TODO: + // + // * Support applying multiple events at one time. One transaction, + // and it validets all the sequence ids + // * when loading events from db, apply zod validation to ensure shape + + const replay = Effect.fn("SyncEvent.replay")(function* ( + event: SerializedEvent, + options?: { republish: boolean }, + ) { + const def = registry.get(event.type) + if (!def) { + throw new Error(`Unknown event type: ${event.type}`) + } + + const row = Database.use((db) => + db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) + .get(), + ) + + const latest = row?.seq ?? -1 + if (event.seq <= latest) { + return + } + + const expected = latest + 1 + if (event.seq !== expected) { + throw new Error( + `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`, + ) + } + + yield* process(def, event, { publish: !!options?.republish }) + }) + + const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* ( + def: Def, + data: Event["data"], + ) { + const agg = (data as Record)[def.aggregate] + if (agg == null) { + throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) + } + + if (def.version !== versions.get(def.type)) { + throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) + } + + Database.transaction( + (tx) => { + const id = EventID.ascending() + const row = tx + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, agg)) + .get() + const seq = row?.seq != null ? row.seq + 1 : 0 + + const event = { id, seq, aggregateID: agg, data } + Effect.runSync(process(def, event, { publish: true })) + }, + { + behavior: "immediate", + }, + ) + }) + + const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) => + Effect.sync(() => { + Database.transaction((tx) => { + tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() + tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() + }) + }), + ) + + const subscribeAll = Effect.fn("SyncEvent.subscribeAll")( + (handler: (event: { def: Definition; event: Event }) => void) => + Effect.sync(() => { + state.bus.on("event", handler) + return () => state.bus.off("event", handler) + }), + ) + + return Service.of({ reset, init, replay, run, remove, subscribeAll }) + }), + ) + + export const defaultLayer = layer + + const { runSync } = makeRuntime(Service, defaultLayer) + + export function reset() { + return runSync((svc) => svc.reset()) + } + + export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) { + return runSync((svc) => svc.init(input)) + } export function replay(event: SerializedEvent, options?: { republish: boolean }) { - const def = registry.get(event.type) - if (!def) { - throw new Error(`Unknown event type: ${event.type}`) - } - - const row = Database.use((db) => - db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) - .get(), - ) - - const latest = row?.seq ?? -1 - if (event.seq <= latest) { - return - } - - const expected = latest + 1 - if (event.seq !== expected) { - throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`) - } - - process(def, event, { publish: !!options?.republish }) + return runSync((svc) => svc.replay(event, options)) } export function run(def: Def, data: Event["data"]) { - const agg = (data as Record)[def.aggregate] - // This should never happen: we've enforced it via typescript in - // the definition - if (agg == null) { - throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) - } - - if (def.version !== versions.get(def.type)) { - throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) - } - - // Note that this is an "immediate" transaction which is critical. - // We need to make sure we can safely read and write with nothing - // else changing the data from under us - Database.transaction( - (tx) => { - const id = EventID.ascending() - const row = tx - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, agg)) - .get() - const seq = row?.seq != null ? row.seq + 1 : 0 - - const event = { id, seq, aggregateID: agg, data } - process(def, event, { publish: true }) - }, - { - behavior: "immediate", - }, - ) + return runSync((svc) => svc.run(def, data)) } export function remove(aggregateID: string) { - Database.transaction((tx) => { - tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() - tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() - }) + return runSync((svc) => svc.remove(aggregateID)) } export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) { - Bus.on("event", handler) - return () => Bus.off("event", handler) + return runSync((svc) => svc.subscribeAll(handler)) } export function payloads() {