diff --git a/packages/toolkit/src/listenerMiddleware/index.ts b/packages/toolkit/src/listenerMiddleware/index.ts index cfefa17e09..17c9c6771d 100644 --- a/packages/toolkit/src/listenerMiddleware/index.ts +++ b/packages/toolkit/src/listenerMiddleware/index.ts @@ -262,10 +262,10 @@ const cancelActiveListeners = ( const createClearListenerMiddleware = ( listenerMap: Map, + executingListeners: Map, ) => { return () => { - listenerMap.forEach(cancelActiveListeners) - + executingListeners.keys().forEach(cancelActiveListeners) listenerMap.clear() } } @@ -339,6 +339,23 @@ export const createListenerMiddleware = < middlewareOptions: CreateListenerMiddlewareOptions = {}, ) => { const listenerMap = new Map() + + // Track listeners whose effect is currently executing so clearListeners can + // abort even listeners that have become unsubscribed while executing. + const executingListeners = new Map() + const trackExecutingListener = (entry: ListenerEntry) => { + const count = executingListeners.get(entry) ?? 0 + executingListeners.set(entry, count + 1) + } + const untrackExecutingListener = (entry: ListenerEntry) => { + const count = executingListeners.get(entry) ?? 1 + if (count === 1) { + executingListeners.delete(entry) + } else { + executingListeners.set(entry, count - 1) + } + } + const { extra, onError = defaultErrorHandler } = middlewareOptions assertFunction(onError, 'onError') @@ -401,6 +418,7 @@ export const createListenerMiddleware = < try { entry.pending.add(internalTaskController) + trackExecutingListener(entry) await Promise.resolve( entry.effect( action, @@ -452,11 +470,15 @@ export const createListenerMiddleware = < await Promise.all(autoJoinPromises) abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed + untrackExecutingListener(entry) entry.pending.delete(internalTaskController) } } - const clearListenerMiddleware = createClearListenerMiddleware(listenerMap) + const clearListenerMiddleware = createClearListenerMiddleware( + listenerMap, + executingListeners, + ) const middleware: ListenerMiddleware< StateType, diff --git a/packages/toolkit/src/listenerMiddleware/tests/effectScenarios.test.ts b/packages/toolkit/src/listenerMiddleware/tests/effectScenarios.test.ts index ce55ac8092..3eacdc08da 100644 --- a/packages/toolkit/src/listenerMiddleware/tests/effectScenarios.test.ts +++ b/packages/toolkit/src/listenerMiddleware/tests/effectScenarios.test.ts @@ -364,4 +364,135 @@ describe('Saga-style Effects Scenarios', () => { expect(canceledCheck).toBe(true) }) + + test('long-running listener with immediate unsubscribe is cancelable', async () => { + let runCount = 0 + let abortCount = 0 + + startListening({ + actionCreator: increment, + effect: async (action, listenerApi) => { + runCount++ + + // Stop listening for this action + listenerApi.unsubscribe() + + try { + // Wait indefinitely + await listenerApi.condition(() => false) + } catch (err) { + if (err instanceof TaskAbortError) { + abortCount++ + } + } + }, + }) + + // First action starts the listener, which unsubscribes + store.dispatch(increment()) + expect(runCount).toBe(1) + + // Verify that the first action unsubscribed the listener + store.dispatch(increment()) + expect(runCount).toBe(1) + + // Now call clearListeners, which should abort the running effect, even + // though the listener is no longer subscribed + listenerMiddleware.clearListeners() + await delay(0) + + expect(abortCount).toBe(1) + }) + + test('long-running listener with unsubscribe race is cancelable', async () => { + let runCount = 0 + let abortCount = 0 + + startListening({ + actionCreator: increment, + effect: async (action, listenerApi) => { + runCount++ + + if (runCount === 2) { + // On the second run, stop listening for this action + listenerApi.unsubscribe() + return + } + + try { + // Wait indefinitely + await listenerApi.condition(() => false) + } catch (err) { + if (err instanceof TaskAbortError) { + abortCount++ + } + } + }, + }) + + // First action starts the hanging effect + store.dispatch(increment()) + expect(runCount).toBe(1) + + // Second action starts the fast effect, which unsubscribes + store.dispatch(increment()) + expect(runCount).toBe(2) + + // Third action should be a noop + store.dispatch(increment()) + expect(runCount).toBe(2) + + // The hanging effect should still be hanging + expect(abortCount).toBe(0) + + // Now call clearListeners, which should abort the hanging effect, even + // though the listener is no longer subscribed + listenerMiddleware.clearListeners() + await delay(0) + + expect(abortCount).toBe(1) + }) + + test('long-running listener with immediate unsubscribe and forked child is cancelable', async () => { + let outerAborted = false + let innerAborted = false + + startListening({ + actionCreator: increment, + effect: async (action, listenerApi) => { + // Stop listening for this action + listenerApi.unsubscribe() + + const pollingTask = listenerApi.fork(async (forkApi) => { + try { + // Cancellation-aware indefinite pause + await forkApi.pause(new Promise(() => {})) + } catch (err) { + if (err instanceof TaskAbortError) { + innerAborted = true + } + } + }) + + try { + // Wait indefinitely + await listenerApi.condition(() => false) + pollingTask.cancel() + } catch (err) { + if (err instanceof TaskAbortError) { + outerAborted = true + } + } + }, + }) + + store.dispatch(increment()) + await delay(0) + + listenerMiddleware.clearListeners() + await delay(0) + + expect(outerAborted).toBe(true) + expect(innerAborted).toBe(true) + }) })