Skip to content

Commit 4a0a78a

Browse files
razhayatTkDodo
andauthored
fix: streamed query consume aware signal (#9963)
* chore: move consume aware signal logic to utils * fix: add consume aware signal to streamedQuery * test: add a test for not a signal that is not consumed by streamFn * chore: add changeset * fix: made queryFn of streamedQuery not consume signal directly (so that only streamFn can consume it) * fix: made onCancelled get called only once * fix: memoized getSignal and made sure it is called only once * fix: no-unnecessary-condition * chore: size-limit --------- Co-authored-by: Dominik Dorfmeister <[email protected]>
1 parent 5810292 commit 4a0a78a

File tree

6 files changed

+115
-22
lines changed

6 files changed

+115
-22
lines changed

.changeset/dry-streets-exist.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@tanstack/query-core': patch
3+
---
4+
5+
Made context.signal consume aware with streamedQuery

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
{
9595
"name": "react full",
9696
"path": "packages/react-query/build/modern/index.js",
97-
"limit": "12.02 kB",
97+
"limit": "12.10 kB",
9898
"ignore": [
9999
"react",
100100
"react-dom"
@@ -103,7 +103,7 @@
103103
{
104104
"name": "react minimal",
105105
"path": "packages/react-query/build/modern/index.js",
106-
"limit": "9.08 kB",
106+
"limit": "9.11 kB",
107107
"import": "{ useQuery, QueryClient, QueryClientProvider }",
108108
"ignore": [
109109
"react",

packages/query-core/src/__tests__/streamedQuery.test.tsx

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,12 @@ describe('streamedQuery', () => {
329329
const observer = new QueryObserver(queryClient, {
330330
queryKey: key,
331331
queryFn: streamedQuery({
332-
streamFn: () => createAsyncNumberGenerator(3),
332+
streamFn: (context) => {
333+
// just consume the signal
334+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
335+
const numbers = context.signal ? 3 : 0
336+
return createAsyncNumberGenerator(numbers)
337+
},
333338
refetchMode: 'append',
334339
}),
335340
})
@@ -420,6 +425,42 @@ describe('streamedQuery', () => {
420425
})
421426
})
422427

428+
test('should not abort when signal not consumed', async () => {
429+
const key = queryKey()
430+
const observer = new QueryObserver(queryClient, {
431+
queryKey: key,
432+
queryFn: streamedQuery({
433+
streamFn: () => createAsyncNumberGenerator(3),
434+
}),
435+
})
436+
437+
const unsubscribe = observer.subscribe(vi.fn())
438+
439+
expect(queryClient.getQueryState(key)).toMatchObject({
440+
status: 'pending',
441+
fetchStatus: 'fetching',
442+
data: undefined,
443+
})
444+
445+
await vi.advanceTimersByTimeAsync(60)
446+
447+
expect(queryClient.getQueryState(key)).toMatchObject({
448+
status: 'success',
449+
fetchStatus: 'fetching',
450+
data: [0],
451+
})
452+
453+
unsubscribe()
454+
455+
await vi.advanceTimersByTimeAsync(50)
456+
457+
expect(queryClient.getQueryState(key)).toMatchObject({
458+
status: 'success',
459+
fetchStatus: 'fetching',
460+
data: [0, 1],
461+
})
462+
})
463+
423464
test('should support custom reducer', async () => {
424465
const key = queryKey()
425466

packages/query-core/src/infiniteQueryBehavior.ts

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { addToEnd, addToStart, ensureQueryFn } from './utils'
1+
import {
2+
addConsumeAwareSignal,
3+
addToEnd,
4+
addToStart,
5+
ensureQueryFn,
6+
} from './utils'
27
import type { QueryBehavior } from './query'
38
import type {
49
InfiniteData,
@@ -23,19 +28,11 @@ export function infiniteQueryBehavior<TQueryFnData, TError, TData, TPageParam>(
2328
const fetchFn = async () => {
2429
let cancelled = false
2530
const addSignalProperty = (object: unknown) => {
26-
Object.defineProperty(object, 'signal', {
27-
enumerable: true,
28-
get: () => {
29-
if (context.signal.aborted) {
30-
cancelled = true
31-
} else {
32-
context.signal.addEventListener('abort', () => {
33-
cancelled = true
34-
})
35-
}
36-
return context.signal
37-
},
38-
})
31+
addConsumeAwareSignal(
32+
object,
33+
() => context.signal,
34+
() => (cancelled = true),
35+
)
3936
}
4037

4138
const queryFn = ensureQueryFn(context.options, context.fetchOptions)

packages/query-core/src/streamedQuery.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
import { addToEnd } from './utils'
2-
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
1+
import { addConsumeAwareSignal, addToEnd } from './utils'
2+
import type {
3+
OmitKeyof,
4+
QueryFunction,
5+
QueryFunctionContext,
6+
QueryKey,
7+
} from './types'
38

49
type BaseStreamedQueryParams<TQueryFnData, TQueryKey extends QueryKey> = {
510
streamFn: (
@@ -73,10 +78,25 @@ export function streamedQuery<
7378

7479
let result = initialValue
7580

76-
const stream = await streamFn(context)
81+
let cancelled: boolean = false as boolean
82+
const streamFnContext = addConsumeAwareSignal<
83+
OmitKeyof<typeof context, 'signal'>
84+
>(
85+
{
86+
client: context.client,
87+
meta: context.meta,
88+
queryKey: context.queryKey,
89+
pageParam: context.pageParam,
90+
direction: context.direction,
91+
},
92+
() => context.signal,
93+
() => (cancelled = true),
94+
)
95+
96+
const stream = await streamFn(streamFnContext)
7797

7898
for await (const chunk of stream) {
79-
if (context.signal.aborted) {
99+
if (cancelled) {
80100
break
81101
}
82102

@@ -90,7 +110,7 @@ export function streamedQuery<
90110
}
91111

92112
// finalize result: replace-refetching needs to write to the cache
93-
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
113+
if (isRefetch && refetchMode === 'replace' && !cancelled) {
94114
context.client.setQueryData<TData>(context.queryKey, result)
95115
}
96116

packages/query-core/src/utils.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,3 +465,33 @@ export function shouldThrowError<T extends (...args: Array<any>) => boolean>(
465465

466466
return !!throwOnError
467467
}
468+
469+
export function addConsumeAwareSignal<T>(
470+
object: T,
471+
getSignal: () => AbortSignal,
472+
onCancelled: VoidFunction,
473+
): T & { signal: AbortSignal } {
474+
let consumed = false
475+
let signal: AbortSignal | undefined
476+
477+
Object.defineProperty(object, 'signal', {
478+
enumerable: true,
479+
get: () => {
480+
signal ??= getSignal()
481+
if (consumed) {
482+
return signal
483+
}
484+
485+
consumed = true
486+
if (signal.aborted) {
487+
onCancelled()
488+
} else {
489+
signal.addEventListener('abort', onCancelled, { once: true })
490+
}
491+
492+
return signal
493+
},
494+
})
495+
496+
return object as T & { signal: AbortSignal }
497+
}

0 commit comments

Comments
 (0)