Skip to content

Commit 471b72d

Browse files
committed
feat: save progress
1 parent e248fa7 commit 471b72d

File tree

8 files changed

+268
-17
lines changed

8 files changed

+268
-17
lines changed

src/containers/Tenant/Query/QueryEditor/QueryEditor.tsx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import {v4 as uuidv4} from 'uuid';
88

99
import {MonacoEditor} from '../../../../components/MonacoEditor/MonacoEditor';
1010
import SplitPane from '../../../../components/SplitPane';
11-
import {useTracingLevelOptionAvailable} from '../../../../store/reducers/capabilities/hooks';
11+
import {
12+
useStreamingAvailable,
13+
useTracingLevelOptionAvailable,
14+
} from '../../../../store/reducers/capabilities/hooks';
1215
import {
1316
goToNextQuery,
1417
goToPreviousQuery,
@@ -137,6 +140,8 @@ export default function QueryEditor(props: QueryEditorProps) {
137140
return historyQueries[historyQueries.length - 1].queryText;
138141
});
139142

143+
const isStreamingSupported = useStreamingAvailable();
144+
140145
const handleSendExecuteClick = useEventHandler((text?: string) => {
141146
const query = text ?? input;
142147

@@ -155,6 +160,7 @@ export default function QueryEditor(props: QueryEditorProps) {
155160
querySettings,
156161
enableTracingLevel,
157162
queryId,
163+
isStreaming: isStreamingSupported,
158164
});
159165

160166
dispatch(setShowPreview(false));

src/containers/Tenant/Query/QueryResult/QueryResultViewer.tsx

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ export function QueryResultViewer({
107107
});
108108
const [useShowPlanToSvg] = useSetting<boolean>(USE_SHOW_PLAN_SVG_KEY);
109109

110-
const {error, isLoading, queryId, data = {}} = result;
110+
const {error, isLoading, queryId, data = {}, streaming} = result;
111111
const {preparedPlan, simplifiedPlan, stats, resultSets, ast} = data;
112+
const isStreaming = streaming?.active;
112113

113114
React.useEffect(() => {
114115
if (resultType === 'execute' && !EXECUTE_SECTIONS.includes(activeSection)) {
@@ -217,9 +218,17 @@ export function QueryResultViewer({
217218
return <QueryResultError error={error} />;
218219
}
219220
if (activeSection === RESULT_OPTIONS_IDS.result) {
220-
return (
221+
return isStreaming ? (
221222
<ResultSetsViewer
222-
resultSets={resultSets}
223+
type="streaming"
224+
chunks={streaming?.chunks || []}
225+
selectedResultSet={selectedResultSet}
226+
setSelectedResultSet={setSelectedResultSet}
227+
/>
228+
) : (
229+
<ResultSetsViewer
230+
type="regular"
231+
resultSets={resultSets || []}
223232
selectedResultSet={selectedResultSet}
224233
setSelectedResultSet={setSelectedResultSet}
225234
/>
@@ -263,8 +272,8 @@ export function QueryResultViewer({
263272
const renderLeftControls = () => {
264273
return (
265274
<div className={b('controls-left')}>
266-
<QueryExecutionStatus error={error} loading={isLoading} />
267-
{!error && !isLoading && (
275+
<QueryExecutionStatus error={error} loading={isLoading || isStreaming} />
276+
{!error && !isLoading && !isStreaming && (
268277
<React.Fragment>
269278
{valueIsDefined(stats?.DurationUs) ? (
270279
<QueryDuration duration={Number(stats.DurationUs)} />

src/containers/Tenant/Query/QueryResult/components/ResultSetsViewer/ResultSetsViewer.tsx

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {Tabs, Text} from '@gravity-ui/uikit';
22

33
import {QueryResultTable} from '../../../../../../components/QueryResultTable';
4+
import type {ExecuteQueryResponse} from '../../../../../../types/api/query';
45
import type {ParsedResultSet} from '../../../../../../types/store/query';
56
import {getArray} from '../../../../../../utils';
67
import {cn} from '../../../../../../utils/cn';
@@ -10,25 +11,57 @@ import './ResultSetsViewer.scss';
1011

1112
const b = cn('ydb-query-result-sets-viewer');
1213

13-
interface ResultSetsViewerProps {
14-
resultSets?: ParsedResultSet[];
14+
interface BaseResultSetsViewerProps {
1515
selectedResultSet: number;
1616
setSelectedResultSet: (resultSet: number) => void;
1717
}
1818

19-
export function ResultSetsViewer({
20-
resultSets,
21-
selectedResultSet,
22-
setSelectedResultSet,
23-
}: ResultSetsViewerProps) {
24-
const resultsSetsCount = resultSets?.length || 0;
25-
const currentResult = resultSets?.[selectedResultSet];
19+
interface RegularResultSetsProps extends BaseResultSetsViewerProps {
20+
type: 'regular';
21+
resultSets: ParsedResultSet[];
22+
}
23+
24+
interface StreamingResultSetsProps extends BaseResultSetsViewerProps {
25+
type: 'streaming';
26+
chunks: ExecuteQueryResponse[];
27+
}
28+
29+
type ResultSetsViewerProps = RegularResultSetsProps | StreamingResultSetsProps;
30+
31+
export function ResultSetsViewer(props: ResultSetsViewerProps) {
32+
const {selectedResultSet, setSelectedResultSet} = props;
33+
34+
const resultsSetsCount =
35+
props.type === 'streaming' ? props.chunks.length : props.resultSets.length;
36+
37+
const currentResult =
38+
props.type === 'streaming'
39+
? {
40+
result:
41+
props.chunks[selectedResultSet]?.result?.[0]?.rows?.map((row) => {
42+
if (!Array.isArray(row)) {
43+
return row || {};
44+
}
45+
return row.reduce<Record<string, any>>((obj, val, idx) => {
46+
const columnName =
47+
props.chunks[0]?.result?.[0]?.columns?.[idx]?.name ||
48+
`column${idx}`;
49+
return {...obj, [columnName]: val};
50+
}, {});
51+
}) || [],
52+
columns: props.chunks[0]?.result?.[0]?.columns,
53+
truncated: false,
54+
}
55+
: props.resultSets[selectedResultSet];
2656

2757
const renderTabs = () => {
2858
if (resultsSetsCount > 1) {
2959
const tabsItems = getArray(resultsSetsCount).map((item) => ({
3060
id: String(item),
31-
title: `Result #${item + 1}${resultSets?.[item]?.truncated ? ' (T)' : ''}`,
61+
title:
62+
props.type === 'streaming'
63+
? `Chunk #${item + 1}`
64+
: `Result #${item + 1}${props.resultSets[item]?.truncated ? ' (T)' : ''}`,
3265
}));
3366

3467
return (

src/services/api/viewer.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import type {TUserToken} from '../../types/api/whoami';
3535
import type {QuerySyntax, TransactionMode} from '../../types/store/query';
3636
import {BINARY_DATA_IN_PLAIN_TEXT_DISPLAY} from '../../utils/constants';
3737
import type {Nullable} from '../../utils/typecheckers';
38+
import {multipartParser} from '../parsers/multipart';
3839
import {settingsManager} from '../settings';
3940

4041
import {BaseYdbAPI} from './base';
@@ -368,6 +369,61 @@ export class ViewerAPI extends BaseYdbAPI {
368369
);
369370
}
370371

372+
streamQuery<Action extends Actions>(
373+
params: {
374+
query?: string;
375+
database?: string;
376+
action?: Action;
377+
syntax?: QuerySyntax;
378+
stats?: Stats;
379+
tracingLevel?: TracingLevel;
380+
transaction_mode?: TransactionMode;
381+
timeout?: Timeout;
382+
limit_rows?: number;
383+
output_chunk_max_size?: number;
384+
},
385+
{concurrentId, signal, onChunk}: AxiosOptions & {onChunk?: (chunk: any) => void} = {},
386+
) {
387+
const base64 = !settingsManager.readUserSettingsValue(
388+
BINARY_DATA_IN_PLAIN_TEXT_DISPLAY,
389+
true,
390+
);
391+
392+
// Reset parser state for new request
393+
multipartParser.reset();
394+
395+
return this.get<string>(
396+
this.getPath('/viewer/query'),
397+
{
398+
...params,
399+
base64,
400+
schema: 'multipart',
401+
output_chunk_max_size: params.output_chunk_max_size || 1000,
402+
},
403+
{
404+
concurrentId,
405+
timeout: params.timeout,
406+
requestConfig: {
407+
signal,
408+
'axios-retry': {retries: 0}, // No retries for streaming
409+
responseType: 'text',
410+
},
411+
headers: {
412+
...(params.tracingLevel ? {'X-Trace-Verbosity': params.tracingLevel} : {}),
413+
Accept: 'multipart/x-mixed-replace',
414+
},
415+
onDownloadProgress: (progressEvent) => {
416+
console.log('progressEvent', progressEvent.event.target);
417+
const response = progressEvent.event.target as XMLHttpRequest;
418+
multipartParser.processNewData<QueryAPIResponse<Action>>(
419+
response.responseText,
420+
onChunk,
421+
);
422+
},
423+
},
424+
);
425+
}
426+
371427
getHotKeys(
372428
{path, database, enableSampling}: {path: string; database: string; enableSampling: boolean},
373429
{concurrentId, signal}: AxiosOptions = {},

src/services/parsers/multipart.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
interface MultipartChunk<T = any> {
2+
part_number: number;
3+
total_parts: number;
4+
content: T;
5+
}
6+
7+
export class MultipartParser {
8+
private boundary = 'boundary';
9+
private lastProcessedLength = 0;
10+
11+
parseChunk<T>(part: string): MultipartChunk<T> | null {
12+
try {
13+
// Normalize line endings and split headers and body
14+
const normalizedPart = part.replace(/\r\n/g, '\n');
15+
const sections = normalizedPart.split('\n\n');
16+
17+
if (sections.length < 2) {
18+
console.log('Invalid chunk format: no body found');
19+
return null;
20+
}
21+
22+
// Get the body (last section after headers)
23+
const body = sections[sections.length - 1].trim();
24+
if (!body) {
25+
console.log('Invalid chunk format: empty body');
26+
return null;
27+
}
28+
29+
// Parse the JSON content
30+
const content = JSON.parse(body);
31+
32+
return {
33+
part_number: content.Counter || 0,
34+
total_parts: 0, // We don't know the total in advance
35+
content: content as T,
36+
};
37+
} catch (error) {
38+
console.error('Failed to parse chunk:', error);
39+
return null;
40+
}
41+
}
42+
43+
processNewData<T>(responseText: string, onChunk?: (chunk: MultipartChunk<T>) => void): void {
44+
// Get only the new data
45+
const newData = responseText.slice(this.lastProcessedLength);
46+
47+
if (newData) {
48+
// Split on boundary with double dashes
49+
const boundaryStr = `--${this.boundary}`;
50+
const parts = newData.split(boundaryStr).filter(Boolean);
51+
52+
for (const part of parts) {
53+
// Skip the final boundary marker
54+
if (part.trim() === '--') {
55+
continue;
56+
}
57+
58+
const chunk = this.parseChunk<T>(part);
59+
if (chunk && onChunk) {
60+
onChunk(chunk);
61+
}
62+
}
63+
}
64+
65+
// Update the processed length
66+
this.lastProcessedLength = responseText.length;
67+
}
68+
69+
reset(): void {
70+
this.lastProcessedLength = 0;
71+
}
72+
}
73+
74+
export const multipartParser = new MultipartParser();

src/store/reducers/capabilities/hooks.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,7 @@ export const useFeatureFlagsAvailable = () => {
7272
export const useClusterDashboardAvailable = () => {
7373
return useGetFeatureVersion('/viewer/cluster') > 4;
7474
};
75+
76+
export const useStreamingAvailable = () => {
77+
return useGetFeatureVersion('/viewer/query') >= 7;
78+
};

0 commit comments

Comments
 (0)