fix(sync): keep event application synchronous

pull/20597/head
Kit Langton 2026-04-01 23:49:22 -04:00
parent dc719269b6
commit 2e6d7bb517
1 changed files with 5 additions and 9 deletions

View File

@ -108,11 +108,7 @@ export namespace SyncEvent {
bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(),
}
const process = Effect.fnUntraced(function* <Def extends Definition>(
def: Def,
event: Event<Def>,
options: { publish: boolean },
) {
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (state.projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
@ -163,7 +159,7 @@ export namespace SyncEvent {
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
})
})
})
}
const reset = Effect.fn("SyncEvent.reset")(() =>
Effect.sync(() => {
@ -180,7 +176,7 @@ export namespace SyncEvent {
for (const [type, version] of versions.entries()) {
const def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
BusEvent.define(def.type, def.properties)
}
frozen = true
@ -223,7 +219,7 @@ export namespace SyncEvent {
)
}
yield* process(def, event, { publish: !!options?.republish })
yield* Effect.sync(() => process(def, event, { publish: !!options?.republish }))
})
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
@ -250,7 +246,7 @@ export namespace SyncEvent {
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
Effect.runSync(process(def, event, { publish: true }))
process(def, event, { publish: true })
},
{
behavior: "immediate",