refactor: clarify queue tracking names

pull/20720/head
Kit Langton 2026-04-02 13:12:19 -04:00
parent 76b90e3fb1
commit 946d2eecbe
3 changed files with 9 additions and 9 deletions

View File

@ -33,7 +33,7 @@ export const EventRoutes = () =>
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>({ name: "sse:event" })
let done = false
let closed = false
q.push(
JSON.stringify({
@ -53,12 +53,12 @@ export const EventRoutes = () =>
}, 10_000)
const stop = () => {
if (done) return
done = true
if (closed) return
closed = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.done()
q.untrack()
log.info("event disconnected")
}

View File

@ -20,7 +20,7 @@ export const GlobalDisposedEvent = BusEvent.define("global.disposed", z.object({
async function streamEvents(c: Context, name: string, subscribe: (q: AsyncQueue<string | null>) => () => void) {
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>({ name })
let done = false
let closed = false
q.push(
JSON.stringify({
@ -44,12 +44,12 @@ async function streamEvents(c: Context, name: string, subscribe: (q: AsyncQueue<
}, 10_000)
const stop = () => {
if (done) return
done = true
if (closed) return
closed = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.done()
q.untrack()
log.info("global event disconnected")
}

View File

@ -63,7 +63,7 @@ export class AsyncQueue<T> implements AsyncIterable<T> {
})
}
done() {
untrack() {
if (this.id === undefined) return
all.delete(this.id)
}