Skip to content

Commit e0c23d4

Browse files
authored
feat: buffer and flush parallelized window queries to always flush inorder data (#1481)
We should be able to send most chart series queries all at once and view the results as the data comes in. This also ensures the data arrives in order. This is only enabled it for the histogram on DBSearchPage so far. Closes HDX-3051
1 parent 19b710f commit e0c23d4

File tree

5 files changed

+215
-16
lines changed

5 files changed

+215
-16
lines changed

.changeset/hip-fireants-argue.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@hyperdx/api": patch
3+
"@hyperdx/app": patch
4+
---
5+
6+
feat: flush chunk data as it arrives if in order

packages/app/src/components/DBTimeChart.tsx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,11 @@ function DBTimeChartComponent({
256256
queryKeyPrefix,
257257
queriedConfig,
258258
'chunked',
259-
disableQueryChunking,
260-
enableParallelQueries && me?.team?.parallelizeWhenPossible,
259+
{
260+
disableQueryChunking,
261+
enableParallelQueries,
262+
parallelizeWhenPossible: me?.team?.parallelizeWhenPossible,
263+
},
261264
],
262265
enabled: enabled && !isLoadingMe,
263266
enableQueryChunking: !disableQueryChunking,

packages/app/src/hooks/__tests__/useChartConfig.test.tsx

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,5 +1122,174 @@ describe('useChartConfig', () => {
11221122
expect(result.current.isLoading).toBe(false);
11231123
expect(result.current.isPending).toBe(false);
11241124
});
1125+
1126+
const setupParallelQueries = () => {
1127+
const config = createMockChartConfig({
1128+
dateRange: [
1129+
new Date('2025-10-01 00:00:00Z'),
1130+
new Date('2025-10-02 00:00:00Z'),
1131+
],
1132+
granularity: '3 hour',
1133+
});
1134+
const mockResponse1 = createMockQueryResponse([
1135+
{
1136+
'count()': '71',
1137+
__hdx_time_bucket: '2025-10-01T18:00:00Z',
1138+
},
1139+
{
1140+
'count()': '72',
1141+
__hdx_time_bucket: '2025-10-01T19:00:00Z',
1142+
},
1143+
]);
1144+
const mockResponse2 = createMockQueryResponse([
1145+
{
1146+
'count()': '73',
1147+
__hdx_time_bucket: '2025-10-01T12:00:00Z',
1148+
},
1149+
{
1150+
'count()': '74',
1151+
__hdx_time_bucket: '2025-10-01T14:00:00Z',
1152+
},
1153+
]);
1154+
const mockResponse3 = createMockQueryResponse([
1155+
{
1156+
'count()': '75',
1157+
__hdx_time_bucket: '2025-10-01T01:00:00Z',
1158+
},
1159+
]);
1160+
1161+
return { config, mockResponse1, mockResponse2, mockResponse3 };
1162+
};
1163+
1164+
it('fetches data in parallel when enableParallelQueries is true', async () => {
1165+
const { config, mockResponse1, mockResponse2, mockResponse3 } =
1166+
setupParallelQueries();
1167+
1168+
// Create promises that resolve with different delays to simulate parallel execution
1169+
const promise1 = Promise.resolve(mockResponse1);
1170+
const promise2 = new Promise<typeof mockResponse2>(resolve =>
1171+
setTimeout(() => resolve(mockResponse2), 50),
1172+
);
1173+
const promise3 = new Promise<typeof mockResponse3>(resolve =>
1174+
setTimeout(() => resolve(mockResponse3), 100),
1175+
);
1176+
1177+
mockClickhouseClient.queryChartConfig
1178+
.mockReturnValueOnce(promise1)
1179+
.mockReturnValueOnce(promise2)
1180+
.mockReturnValueOnce(promise3);
1181+
1182+
const { result } = renderHook(
1183+
() =>
1184+
useQueriedChartConfig(config, {
1185+
enableQueryChunking: true,
1186+
enableParallelQueries: true,
1187+
}),
1188+
{
1189+
wrapper,
1190+
},
1191+
);
1192+
1193+
await waitFor(() => expect(result.current.isSuccess).toBe(true), {
1194+
timeout: 1000,
1195+
});
1196+
await waitFor(() => expect(result.current.isFetching).toBe(false), {
1197+
timeout: 1000,
1198+
});
1199+
1200+
expect(mockClickhouseClient.queryChartConfig).toHaveBeenCalledTimes(3);
1201+
1202+
// Data should be in order based on time window chunks (newest first)
1203+
expect(result.current.data).toEqual({
1204+
data: [
1205+
...mockResponse3.data,
1206+
...mockResponse2.data,
1207+
...mockResponse1.data,
1208+
],
1209+
meta: mockResponse1.meta,
1210+
rows: 5,
1211+
isComplete: true,
1212+
});
1213+
expect(result.current.isLoading).toBe(false);
1214+
expect(result.current.isPending).toBe(false);
1215+
});
1216+
1217+
it('streams parallel query results in order', async () => {
1218+
const { config, mockResponse1, mockResponse2, mockResponse3 } =
1219+
setupParallelQueries();
1220+
1221+
// Create promises with controlled resolution order - simulate last chunk finishing first
1222+
let resolvePromise3: (value: any) => void;
1223+
let resolvePromise2: (value: any) => void;
1224+
let resolvePromise1: (value: any) => void;
1225+
1226+
const promise1 = new Promise(resolve => {
1227+
resolvePromise1 = resolve;
1228+
});
1229+
const promise2 = new Promise(resolve => {
1230+
resolvePromise2 = resolve;
1231+
});
1232+
const promise3 = new Promise(resolve => {
1233+
resolvePromise3 = resolve;
1234+
});
1235+
1236+
mockClickhouseClient.queryChartConfig
1237+
.mockReturnValueOnce(promise1 as any)
1238+
.mockReturnValueOnce(promise2 as any)
1239+
.mockReturnValueOnce(promise3 as any);
1240+
1241+
const { result } = renderHook(
1242+
() =>
1243+
useQueriedChartConfig(config, {
1244+
enableQueryChunking: true,
1245+
enableParallelQueries: true,
1246+
}),
1247+
{
1248+
wrapper,
1249+
},
1250+
);
1251+
1252+
// Should be in loading state initially
1253+
expect(result.current.isLoading).toBe(true);
1254+
expect(result.current.data).toBeUndefined();
1255+
1256+
// Resolve the last chunk first (out of order)
1257+
resolvePromise3!(mockResponse3);
1258+
1259+
// Should still be loading since we need the first chunk
1260+
await new Promise(resolve => setTimeout(resolve, 10));
1261+
expect(result.current.isLoading).toBe(true);
1262+
1263+
// Resolve the first chunk
1264+
resolvePromise1!(mockResponse1);
1265+
1266+
await waitFor(() => expect(result.current.isLoading).toBe(false));
1267+
1268+
// Should have partial data from first chunk only (in chronological order)
1269+
expect(result.current.data).toEqual({
1270+
data: mockResponse1.data,
1271+
meta: mockResponse1.meta,
1272+
rows: 2,
1273+
isComplete: false,
1274+
});
1275+
expect(result.current.isFetching).toBe(true);
1276+
1277+
// Resolve the middle chunk
1278+
resolvePromise2!(mockResponse2);
1279+
1280+
await waitFor(() => expect(result.current.isFetching).toBe(false));
1281+
1282+
// Should now have all data in chronological order
1283+
expect(result.current.data).toEqual({
1284+
data: [
1285+
...mockResponse3.data,
1286+
...mockResponse2.data,
1287+
...mockResponse1.data,
1288+
],
1289+
meta: mockResponse1.meta,
1290+
rows: 5,
1291+
isComplete: true,
1292+
});
1293+
});
11251294
});
11261295
});

packages/app/src/hooks/useChartConfig.tsx

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -153,23 +153,41 @@ async function* fetchDataInChunks({
153153

154154
if (enableParallelQueries) {
155155
// fetch in parallel
156-
const results = await Promise.all(
157-
windows.map(w => {
158-
const windowedConfig = {
159-
...config,
160-
...(w ?? {}),
161-
};
162-
return clickhouseClient.queryChartConfig({
156+
const promises = windows.map(async (w, index) => {
157+
const windowedConfig = {
158+
...config,
159+
...(w ?? {}),
160+
};
161+
return {
162+
index,
163+
queryResult: await clickhouseClient.queryChartConfig({
163164
config: windowedConfig,
164165
metadata: getMetadata(),
165166
opts: {
166167
abort_signal: signal,
167168
},
168-
});
169-
}),
170-
);
171-
for (let i = 0; i < results.length; i++) {
172-
yield { chunk: results[i], isComplete: i === results.length - 1 };
169+
}),
170+
};
171+
});
172+
const remainingPromises = [...promises];
173+
const bufferedChunks = new Array(windows.length);
174+
let flushed = 0;
175+
for (let i = 0; i < promises.length; i++) {
176+
// receive any promise in the array that resolves
177+
const { index, queryResult } = await Promise.race(remainingPromises);
178+
// add to an ordered buffer array, keeping in mind the flushed count thus far
179+
bufferedChunks[index - flushed] = queryResult;
180+
// use promises array (doesn't change in size) to find the index in the ever-changing remainingPromises array
181+
const resolvedPromiseIdx = remainingPromises.indexOf(promises[index]);
182+
// use found index to remove entry from remainingPromises
183+
remainingPromises.splice(resolvedPromiseIdx, 1);
184+
// while bufferedChunks has in-ordered data, flush it
185+
while (bufferedChunks.length > 0 && bufferedChunks[0] !== undefined) {
186+
// remove data from front so that it always arrives in order
187+
const chunk = bufferedChunks.shift();
188+
yield { chunk, isComplete: bufferedChunks.length === 0 };
189+
flushed += 1;
190+
}
173191
}
174192
return;
175193
}

packages/app/src/timeQuery.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,8 +449,11 @@ export function useNewTimeQuery({
449449
},
450450
);
451451

452-
const [searchedTimeRange, setSearchedTimeRange] =
453-
useState<[Date, Date]>(initialTimeRange);
452+
const [searchedTimeRange, setSearchedTimeRange] = useState<[Date, Date]>(
453+
from != null && to != null
454+
? [new Date(from), new Date(to)]
455+
: initialTimeRange,
456+
);
454457

455458
const onSearch = useCallback(
456459
(timeQuery: string) => {

0 commit comments

Comments
 (0)