From 2e6d7bb5175d3d04536d6e00f61c312378655480 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 1 Apr 2026 23:49:22 -0400 Subject: [PATCH] fix(sync): keep event application synchronous --- packages/opencode/src/sync/index.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 9b9dd27b4c..1c3798dea0 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -108,11 +108,7 @@ export namespace SyncEvent { bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(), } - const process = Effect.fnUntraced(function* ( - def: Def, - event: Event, - options: { publish: boolean }, - ) { + function process(def: Def, event: Event, options: { publish: boolean }) { if (state.projectors == null) { throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") } @@ -163,7 +159,7 @@ export namespace SyncEvent { ProjectBus.publish({ type: def.type, properties: def.schema }, result) }) }) - }) + } const reset = Effect.fn("SyncEvent.reset")(() => Effect.sync(() => { @@ -180,7 +176,7 @@ export namespace SyncEvent { for (const [type, version] of versions.entries()) { const def = registry.get(versionedType(type, version))! - BusEvent.define(def.type, def.properties || def.schema) + BusEvent.define(def.type, def.properties) } frozen = true @@ -223,7 +219,7 @@ export namespace SyncEvent { ) } - yield* process(def, event, { publish: !!options?.republish }) + yield* Effect.sync(() => process(def, event, { publish: !!options?.republish })) }) const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* ( @@ -250,7 +246,7 @@ export namespace SyncEvent { const seq = row?.seq != null ? row.seq + 1 : 0 const event = { id, seq, aggregateID: agg, data } - Effect.runSync(process(def, event, { publish: true })) + process(def, event, { publish: true }) }, { behavior: "immediate",