-
Notifications
You must be signed in to change notification settings - Fork 335
Description
Describe the bug
There is a race condition in PGliteWorker leader election when the main thread is under heavy load (e.g., during React hydration or complex app initialization).
The leader-now message sent by the worker immediately after ready is lost because the main thread is still blocked or has not yet attached the message event listener to the worker instance.
This results in isLeader remaining false indefinitely, even though the worker successfully acquired the lock and thinks it is the leader.
To Reproduce
The issue occurs when the main thread yields execution (e.g., awaiting a promise or being blocked by heavy computation) between the time the worker is created and the time the event listener for leader-now is attached.
In PGliteWorker.create (simplified flow):
- Worker is created.
worker.postMessage({ type: 'init' })is sent.await acquireLock(...)yields execution on the main thread.- Worker receives init, acquires lock, sends
ready, then immediately sendsleader-now. - Main Thread resumes from
acquireLock, receivesready(resolving the init promise). - CRITICAL GAP: The
leader-nowmessage is in the task queue. If the main thread processes tasks (like message events) before PGlite attaches its internal listener, the message is fired on the worker object but ignored. - PGlite attaches its listener after the message has already been processed/lost.
Workaround / Fix
I was able to confirm and fix this by proxying the worker and delaying the leader-now message.
Here is the full code (TypeScript) to wrap the worker (with debug logs):
// How to use:
// 1. Create your worker
const worker = new Worker(new URL('./my-worker.js', import.meta.url), {
type: 'module'
})
// 2. Apply the proxy to fix the race condition
proxyWorkerMessages(worker, {
delayLeaderNowEvent: true, // Enables the fix (50ms delay)
debug: true // Enables verbose logging to confirm the issue
})
// 3. Pass the proxied worker to PGlite
const pg = await PGliteWorker.create(worker, {
dataDir: 'idb://my-db'
// ... other options
})
/**
* Proxy worker messages to debug race conditions or delay events
*/
function proxyWorkerMessages(
worker: Worker,
{ debug = false, delayLeaderNowEvent = false }: { debug?: boolean; delayLeaderNowEvent?: boolean }
) {
const originalAddEventListener = worker.addEventListener.bind(worker)
const originalRemoveEventListener = worker.removeEventListener.bind(worker)
const originalPostMessage = worker.postMessage.bind(worker)
const messageListeners: Array<{
listener: EventListenerOrEventListenerObject
options?: boolean | AddEventListenerOptions
}> = []
worker.addEventListener = (
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | AddEventListenerOptions
) => {
if (type === 'message') {
if (debug) {
console.log(
`[RaceCheck] π’ PGlite adding 'message' listener at ${performance.now().toFixed(2)}ms`
)
}
messageListeners.push({ listener, options })
return
}
originalAddEventListener(type, listener, options)
}
worker.removeEventListener = (
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | EventListenerOptions
) => {
if (type === 'message') {
if (debug) {
console.log(
`[RaceCheck] π΄ PGlite removing 'message' listener at ${performance.now().toFixed(2)}ms`
)
}
const index = messageListeners.findIndex((l) => l.listener === listener)
if (index !== -1) {
messageListeners.splice(index, 1)
}
return
}
originalRemoveEventListener(type, listener, options)
}
originalAddEventListener('message', (event: Event) => {
const e = event as MessageEvent
const data = e.data
if (debug) {
console.log(`[RaceCheck] Worker -> Main: ${data?.type} at ${performance.now().toFixed(2)}ms`)
}
let shouldDelay = false
if (data?.type === 'leader-now') {
if (debug) {
console.log(
`[RaceCheck] π΄ 'leader-now' received. Listeners registered: ${messageListeners.length}`
)
if (messageListeners.length === 0) {
console.error(
`[RaceCheck] β RACE CONDITION CONFIRMED: Message arrived before listener was registered!`
)
} else {
console.log(`[RaceCheck] β
No race condition: Listener was ready.`)
}
}
if (delayLeaderNowEvent) {
shouldDelay = true
if (debug) {
console.log('β οΈ Intercepted leader-now. Delaying to fix race condition...')
}
}
}
const dispatch = () => {
;[...messageListeners].forEach((item) => {
const { listener, options } = item
try {
if (typeof listener === 'function') {
listener.call(worker, e)
} else {
listener.handleEvent(e)
}
} catch (err) {
console.error('Error in worker listener:', err)
}
if (typeof options === 'object' && options?.once) {
const index = messageListeners.indexOf(item)
if (index !== -1) {
messageListeners.splice(index, 1)
}
}
})
}
if (shouldDelay) {
setTimeout(() => {
dispatch()
if (debug) {
console.log(
`[RaceCheck] β
Delayed leader-now dispatch at ${performance.now().toFixed(2)}ms`
)
}
}, 50)
} else {
dispatch()
}
})
if (debug) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
worker.postMessage = (message: any, ...args: any[]) => {
console.log(
`[RaceCheck] Main -> Worker: ${message?.type} at ${performance.now().toFixed(2)}ms`,
message
)
return originalPostMessage(message, ...args)
}
}
}Logs
These logs from my application clearly show the sequence of events causing the race condition, and how the delay fixes it:
[RaceCheck] π’ PGlite adding 'message' listener at 627.30ms
// Listener 1: Likely `this.#workerHerePromise` waiting for "here"
[RaceCheck] π’ PGlite adding 'message' listener at 627.50ms
// Listener 2: Likely `this.#workerReadyPromise` waiting for "ready"
[RaceCheck] Worker -> Main: here at 756.60ms
// "here" received. Listener 1 probably resolves and finishes.
[RaceCheck] Main -> Worker: init at 756.70ms {type: 'init', options: {β¦}}
// Main thread sends init config.
[RaceCheck] Worker -> Main: ready at 757.60ms
// "ready" received. Listener 2 catches this.
[RaceCheck] π΄ PGlite removing 'message' listener at 757.70ms
// PGlite removes the "ready" listener (Listener 2) because init promise resolved.
// CRITICAL: Main thread now hits `await acquireLock(...)` and yields execution.
// There are currently NO relevant listeners on the worker.
[RaceCheck] Worker -> Main: leader-now at 763.60ms
// ~6ms later, the worker sends "leader-now".
// Main thread processes this message from the queue immediately.
[RaceCheck] π΄ 'leader-now' received. Listeners registered: 0
[RaceCheck] β RACE CONDITION CONFIRMED: Message arrived before listener was registered!
// The message is dropped because PGlite hasn't resumed from `acquireLock` yet.
β οΈ Intercepted leader-now. Delaying to fix race condition...
// Our proxy intercepts and holds the message.
[RaceCheck] π’ PGlite adding 'message' listener at 764.00ms
// 0.4ms AFTER the message arrived: PGlite finally resumes and adds the runtime event listener.
// Without the delay, this would have been too late.
[RaceCheck] β
Delayed leader-now dispatch at 814.40ms
// 50ms later, we release the message, and now the listener is ready to receive it.
Details
- PGlite version:
@electric-sql/[email protected],@electric-sql/[email protected],@electric-sql/[email protected] - OS version: macOS 15.0 (Sequoia)
- Browser: Arc version 1.121.0
Additional context
This seems to happen in my applications because the main thread is busy during initialization, widening the gap between await acquireLock and listener registration.
Note that the delay between the event being sent and the message listener being set is always a few millis in my logs (763.60ms vs 764.00ms), but it's enough to miss the event.