diff --git a/src/data/CHDatasource.test.ts b/src/data/CHDatasource.test.ts index 72af16b4..6fabe635 100644 --- a/src/data/CHDatasource.test.ts +++ b/src/data/CHDatasource.test.ts @@ -1,8 +1,10 @@ import { arrayToDataFrame, CoreApp, + dateTime, DataQueryRequest, DataQueryResponse, + FieldType, SupplementaryQueryType, TimeRange, toDataFrame, @@ -12,7 +14,7 @@ import { DataSourceWithBackend } from '@grafana/runtime'; import { DataQuery } from '@grafana/schema'; import { mockDatasource } from '__mocks__/datasource'; import { cloneDeep } from 'lodash'; -import { Observable, of } from 'rxjs'; +import { firstValueFrom, Observable, of } from 'rxjs'; import { BuilderMode, ColumnHint, QueryBuilderOptions, QueryType } from 'types/queryBuilder'; import { CHBuilderQuery, CHQuery, CHSqlQuery, EditorType } from 'types/sql'; import { AdHocFilter } from './adHocFilter'; @@ -730,6 +732,10 @@ describe('ClickHouseDatasource', () => { }); describe('query', () => { + afterEach(() => { + jest.restoreAllMocks(); + }); + it('attaches timezone metadata to targets', async () => { const instance = cloneDeep(mockDatasource); const spy = jest @@ -748,6 +754,328 @@ describe('ClickHouseDatasource', () => { timezone: 'UTC', }); }); + + it('splits builder timeseries queries into aligned time chunks and merges the frames', async () => { + const instance = cloneDeep(mockDatasource); + const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => { + const frame = arrayToDataFrame([{ time: request.range.from.valueOf(), value: request.range.to.valueOf() }]); + frame.refId = request.targets[0].refId; + frame.fields[0].type = FieldType.time; + return of({ data: [frame] }); + }); + + const result = await firstValueFrom( + instance.query({ + range: { + from: dateTime(0), + to: dateTime(24 * 60 * 60 * 1000), + raw: { from: 'now-1d', to: 'now' }, + }, + requestId: 'request-1', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + expect(spy).toHaveBeenCalledTimes(3); + expect(spy.mock.calls.map(([request]) => request.requestId)).toEqual([ + 'request-1-chunk-0', + 'request-1-chunk-1', + 'request-1-chunk-2', + ]); + expect(spy.mock.calls.map(([request]) => request.targets[0].refId)).toEqual([ + 'A__chunk__0', + 'A__chunk__1', + 'A__chunk__2', + ]); + expect(spy.mock.calls.map(([request]) => [request.range.from.valueOf(), request.range.to.valueOf()])).toEqual([ + [0, 43200000], + [43200000, 85500000], + [85500000, 86400000], + ]); + expect(result.data).toHaveLength(1); + expect(result.data[0].refId).toBe('A'); + expect(result.data[0].fields[0].values).toEqual([0, 43200000, 85500000]); + expect(result.data[0].fields[1].values).toEqual([43200000, 85500000, 86400000]); + }); + + it('does not split non-timeseries targets when chunking the request', async () => { + const instance = cloneDeep(mockDatasource); + const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => { + const frame = arrayToDataFrame([{ ref: request.targets[0].refId }]); + frame.refId = request.targets[0].refId; + return of({ data: [frame] }); + }); + + const result = await firstValueFrom( + instance.query({ + range: { + from: dateTime(0), + to: dateTime(24 * 60 * 60 * 1000), + raw: { from: 'now-1d', to: 'now' }, + }, + requestId: 'request-2', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + { + refId: 'B', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT *', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.Table, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + expect(spy).toHaveBeenCalledTimes(4); + expect(spy.mock.calls[0][0].targets.map((target: CHQuery) => target.refId)).toEqual(['B']); + expect(spy.mock.calls.slice(1).map(([request]) => request.targets[0].refId)).toEqual([ + 'A__chunk__0', + 'A__chunk__1', + 'A__chunk__2', + ]); + expect(result.data.map((frame) => frame.refId)).toEqual(['A', 'B']); + }); + + it('does not chunk when time range is smaller than the first chunk duration', async () => { + const instance = cloneDeep(mockDatasource); + const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => { + const frame = arrayToDataFrame([{ time: 0, value: 10 }]); + frame.refId = request.targets[0].refId; + frame.fields[0].type = FieldType.time; + return of({ data: [frame] }); + }); + + await firstValueFrom( + instance.query({ + range: { + from: dateTime(0), + to: dateTime(5 * 60 * 1000), // 5 minutes — less than the 15-min first chunk + raw: { from: 'now-5m', to: 'now' }, + }, + requestId: 'request-small', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.calls[0][0].targets[0].refId).toBe('A'); + }); + + it('caps the number of chunks to MAX_CHUNKS for very long ranges', async () => { + const instance = cloneDeep(mockDatasource); + const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => { + const frame = arrayToDataFrame([{ time: 0, value: 10 }]); + frame.refId = request.targets[0].refId; + frame.fields[0].type = FieldType.time; + return of({ data: [frame] }); + }); + + await firstValueFrom( + instance.query({ + range: { + from: dateTime(0), + to: dateTime(90 * 24 * 60 * 60 * 1000), // 90 days + raw: { from: 'now-90d', to: 'now' }, + }, + requestId: 'request-large', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + // Should never exceed MAX_CHUNKS (20) regardless of range duration + expect(spy.mock.calls.length).toBeLessThanOrEqual(20); + expect(spy.mock.calls.length).toBeGreaterThan(1); + + // First chunk should start at fromMs and last chunk should end at toMs + const firstCall = spy.mock.calls[0][0]; + const lastCall = spy.mock.calls[spy.mock.calls.length - 1][0]; + expect(firstCall.range.from.valueOf()).toBe(0); + expect(lastCall.range.to.valueOf()).toBe(90 * 24 * 60 * 60 * 1000); + }); + + it('returns a single chunk when from equals to', async () => { + const instance = cloneDeep(mockDatasource); + const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => { + const frame = arrayToDataFrame([{ time: 0, value: 10 }]); + frame.refId = request.targets[0].refId; + frame.fields[0].type = FieldType.time; + return of({ data: [frame] }); + }); + + const ts = 1000000; + await firstValueFrom( + instance.query({ + range: { + from: dateTime(ts), + to: dateTime(ts), + raw: { from: 'now', to: 'now' }, + }, + requestId: 'request-zero', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + // from >= to means no chunking, single request + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.calls[0][0].targets[0].refId).toBe('A'); + }); + + it('deduplicates exact boundary rows when merging overlapping chunk responses', async () => { + const instance = cloneDeep(mockDatasource); + let callIndex = 0; + jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation(() => { + const frames = [ + arrayToDataFrame([{ time: 0, value: 10 }, { time: 43200000, value: 20 }]), + arrayToDataFrame([{ time: 43200000, value: 20 }, { time: 85500000, value: 30 }]), + arrayToDataFrame([{ time: 85500000, value: 30 }, { time: 86400000, value: 40 }]), + ]; + const frame = frames[callIndex++]; + frame.refId = `A__chunk__${callIndex - 1}`; + frame.fields[0].type = FieldType.time; + return of({ data: [frame] }); + }); + + const result = await firstValueFrom( + instance.query({ + range: { + from: dateTime(0), + to: dateTime(24 * 60 * 60 * 1000), + raw: { from: 'now-1d', to: 'now' }, + }, + requestId: 'request-3', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + expect(result.data[0].fields[0].values).toEqual([0, 43200000, 85500000, 86400000]); + expect(result.data[0].fields[1].values).toEqual([10, 20, 30, 40]); + }); + + it('propagates errors from all chunk responses', async () => { + const instance = cloneDeep(mockDatasource); + let callIndex = 0; + jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => { + const frame = arrayToDataFrame([{ time: callIndex * 1000, value: callIndex * 10 }]); + frame.refId = request.targets[0].refId; + frame.fields[0].type = FieldType.time; + const response: DataQueryResponse = { data: [frame] }; + if (callIndex === 1) { + response.errors = [{ message: 'chunk error from middle chunk' }]; + } + callIndex++; + return of(response); + }); + + const result = await firstValueFrom( + instance.query({ + range: { + from: dateTime(0), + to: dateTime(24 * 60 * 60 * 1000), + raw: { from: 'now-1d', to: 'now' }, + }, + requestId: 'request-err', + targets: [ + { + refId: 'A', + pluginVersion: '', + editorType: EditorType.Builder, + rawSql: 'SELECT count()', + builderOptions: { + database: 'default', + table: 'events', + queryType: QueryType.TimeSeries, + }, + }, + ], + timezone: 'UTC', + } as any) + ); + + expect(result.errors).toBeDefined(); + expect(result.errors!.length).toBe(1); + expect(result.errors![0].message).toBe('chunk error from middle chunk'); + }); }); describe('SupplementaryQueriesSupport', () => { diff --git a/src/data/CHDatasource.ts b/src/data/CHDatasource.ts index b9bb087e..c9034045 100644 --- a/src/data/CHDatasource.ts +++ b/src/data/CHDatasource.ts @@ -7,6 +7,7 @@ import { DataSourceInstanceSettings, DataSourceWithLogsContextSupport, DataSourceWithSupplementaryQueriesSupport, + dateTime, Field, getTimeZone, getTimeZoneInfo, @@ -25,7 +26,7 @@ import LogsContextPanel from 'components/LogsContextPanel'; import { cloneDeep, isEmpty, isString } from 'lodash'; import otel from 'otel'; import { createElement as createReactElement, ReactNode } from 'react'; -import { firstValueFrom, map, Observable } from 'rxjs'; +import { firstValueFrom, from, lastValueFrom, map, Observable } from 'rxjs'; import { CHConfig } from 'types/config'; import { AggregateColumn, @@ -58,8 +59,20 @@ import { labelsFieldName, transformQueryResponseWithTraceAndLogLinks } from './u export class Datasource extends DataSourceWithBackend - implements DataSourceWithSupplementaryQueriesSupport, DataSourceWithLogsContextSupport -{ + implements DataSourceWithSupplementaryQueriesSupport, DataSourceWithLogsContextSupport { + private static readonly chunkRefIdSeparator = '__chunk__'; + + // Chunk durations grow from recent (15m) to historical (24h repeating). + // The small first window keeps most cached chunks stable across dashboard refreshes; + // only the leading 15m chunk is re-queried as "now" advances. + private static readonly timeRangeChunkDurationsMs = [ + 15 * 60 * 1000, // 15 minutes + 6 * 60 * 60 * 1000, // 6 hours + 6 * 60 * 60 * 1000, // 6 hours + 12 * 60 * 60 * 1000, // 12 hours + 24 * 60 * 60 * 1000, // 24 hours + ]; + // This enables default annotation support for 7.2+ annotations = {}; settings: DataSourceInstanceSettings; @@ -826,12 +839,270 @@ export class Datasource }; }); - return super - .query({ - ...request, - targets, - }) - .pipe(map((res: DataQueryResponse) => transformQueryResponseWithTraceAndLogLinks(this, request, res))); + const chunkRanges = this.getTimeRangeChunks(request.range); + const hasChunkedTargets = chunkRanges.length > 1 && targets.some((target) => this.shouldChunkTimeRangeTarget(target)); + const preparedRequest = { + ...request, + targets, + }; + + if (!hasChunkedTargets) { + return super + .query(preparedRequest) + .pipe(map((res: DataQueryResponse) => transformQueryResponseWithTraceAndLogLinks(this, request, res))); + } + + return from(this.queryChunked(preparedRequest, chunkRanges)).pipe( + map((res: DataQueryResponse) => transformQueryResponseWithTraceAndLogLinks(this, request, res)) + ); + } + + // Chunking is limited to Builder+TimeSeries to avoid breaking SQL editor queries + // or non-time-series panels. Supplementary logs volume queries (also Builder+TimeSeries) + // are chunked too, which is intentional — they benefit from the same cache alignment. + private shouldChunkTimeRangeTarget(target: CHQuery): boolean { + return target.editorType === EditorType.Builder && target.builderOptions.queryType === QueryType.TimeSeries && !target.hide; + } + + // Cap concurrent chunk requests per panel. With Promise.all, all chunks fire simultaneously. + // 20 is a reasonable limit that balances cache reuse with server load. For shared clusters + // with strict max_concurrent_queries, consider lowering this or adding a concurrency limiter. + private static readonly MAX_CHUNKS = 20; + + private getTimeRangeChunks(range?: DataQueryRequest['range']): Array['range']> { + if (!range?.from || !range?.to) { + return range ? [range] : []; + } + + const fromMs = range.from.valueOf(); + const toMs = range.to.valueOf(); + + if (!Number.isFinite(fromMs) || !Number.isFinite(toMs) || fromMs >= toMs) { + return [range]; + } + + const chunks: Array['range']> = []; + let chunkEndMs = toMs; + let durationIndex = 0; + + while (chunkEndMs > fromMs) { + // When the chunk limit is reached, let the final chunk span the remaining range + // so no data is lost. This prevents excessive chunk counts for very long ranges. + if (chunks.length >= Datasource.MAX_CHUNKS - 1) { + chunks.unshift({ + ...range, + from: dateTime(fromMs), + to: dateTime(chunkEndMs), + }); + break; + } + + const durationMs = + Datasource.timeRangeChunkDurationsMs[ + Math.min(durationIndex, Datasource.timeRangeChunkDurationsMs.length - 1) + ]; + // When a chunk ends exactly on an alignment boundary, shift the chunk start back by one full duration so + // it expands farther into history instead of repeating the same aligned start. + const alignedStartMs = + Math.floor((chunkEndMs - durationMs) / durationMs) * durationMs - + (durationIndex > 0 && chunkEndMs % durationMs === 0 ? durationMs : 0); + const chunkStartMs = Math.max(fromMs, alignedStartMs); + + chunks.unshift({ + ...range, + from: dateTime(chunkStartMs), + to: dateTime(chunkEndMs), + }); + + chunkEndMs = chunkStartMs; + durationIndex++; + } + + return chunks; + } + + private async queryChunked( + request: DataQueryRequest, + chunkRanges: Array['range']>> + ): Promise { + const chunkedTargets = request.targets.filter((target) => this.shouldChunkTimeRangeTarget(target)); + const passthroughTargets = request.targets.filter((target) => !this.shouldChunkTimeRangeTarget(target)); + const responses: DataQueryResponse[] = []; + + if (passthroughTargets.length > 0) { + responses.push(await lastValueFrom(super.query({ ...request, targets: passthroughTargets }))); + } + + // Promise.all preserves input order, so chunkResponses[i] corresponds to chunkRanges[i]. + // The merge logic relies on this temporal ordering for boundary-row deduplication. + const chunkResponses = await Promise.all( + chunkRanges.map((range, chunkIndex) => + lastValueFrom( + super.query({ + ...request, + requestId: request.requestId ? `${request.requestId}-chunk-${chunkIndex}` : `chunk-${chunkIndex}`, + range, + targets: chunkedTargets.map((target) => ({ + ...target, + refId: this.toChunkRefId(target.refId, chunkIndex), + })), + }) + ) + ) + ); + + if (chunkResponses.length > 0) { + const chunkedFrames = chunkResponses.flatMap((r) => r.data); + const chunkErrors = chunkResponses.flatMap((r) => r.errors ?? []); + responses.push({ + ...chunkResponses[chunkResponses.length - 1], + data: this.mergeChunkedFrames(chunkedFrames), + ...(chunkErrors.length > 0 ? { errors: chunkErrors } : {}), + }); + } + + return this.combineQueryResponses(request.targets, responses); + } + + private toChunkRefId(refId: string, chunkIndex: number): string { + return `${refId}${Datasource.chunkRefIdSeparator}${chunkIndex}`; + } + + private getOriginalRefId(refId?: string): string | undefined { + if (!refId) { + return refId; + } + const separatorIndex = refId.indexOf(Datasource.chunkRefIdSeparator); + return separatorIndex === -1 ? refId : refId.slice(0, separatorIndex); + } + + private mergeChunkedFrames(frames: DataFrame[]): DataFrame[] { + const groupedFrames = new Map(); + + for (const frame of frames) { + const originalRefId = this.getOriginalRefId(frame.refId); + const signature = JSON.stringify({ + refId: originalRefId, + name: frame.name, + fields: frame.fields.map((field) => ({ + name: field.name, + type: field.type, + labels: this.stableSerialize(field.labels), + })), + }); + const currentFrames = groupedFrames.get(signature) || []; + currentFrames.push(frame); + groupedFrames.set(signature, currentFrames); + } + + return Array.from(groupedFrames.values()).map((group) => this.mergeFrameGroup(group)); + } + + /** + * Merges temporally ordered chunk frames into a single frame, deduplicating boundary rows. + * Assumes: (1) chunks are in chronological order, (2) overlap is at most one boundary row. + */ + private mergeFrameGroup(frames: DataFrame[]): DataFrame { + const firstFrame = frames[0]; + const mergedFields = firstFrame.fields.map((field) => ({ + name: field.name, + type: field.type, + config: field.config, + labels: field.labels, + values: [] as unknown[], + })); + + for (const frame of frames) { + const frameLength = frame.length ?? frame.fields[0]?.values.length ?? 0; + for (let rowIndex = 0; rowIndex < frameLength; rowIndex++) { + const rowValues = frame.fields.map((field) => field.values[rowIndex]); + const lastMergedIndex = mergedFields[0].values.length - 1; + const isDuplicateBoundaryRow = + lastMergedIndex >= 0 && + this.valuesEqual(mergedFields[0].values[lastMergedIndex], rowValues[0]) && + rowValues.every((value, fieldIndex) => + this.valuesEqual(mergedFields[fieldIndex].values[lastMergedIndex], value) + ); + + if (!isDuplicateBoundaryRow) { + for (let i = 0; i < rowValues.length; i++) { + mergedFields[i].values.push(rowValues[i]); + } + } + } + } + + return { + ...firstFrame, + refId: this.getOriginalRefId(firstFrame.refId), + fields: mergedFields, + }; + } + + /** Deterministic JSON-like serializer for plain primitives, arrays, and objects. + * Does not unwrap wrapper types (e.g. `new Number(42)`) or `moment` instances. */ + private stableSerialize(value: unknown): string { + if (value === undefined) { + return 'undefined'; + } + + if (value === null || typeof value !== 'object') { + return JSON.stringify(value); + } + + if (Array.isArray(value)) { + return `[${value.map((item) => this.stableSerialize(item)).join(',')}]`; + } + + const entries = Object.entries(value as Record).sort(([a], [b]) => a.localeCompare(b)); + return `{${entries.map(([key, v]) => `${JSON.stringify(key)}:${this.stableSerialize(v)}`).join(',')}}`; + } + + private valuesEqual(left: unknown, right: unknown): boolean { + if (Object.is(left, right)) { + return true; + } + + if (typeof left !== 'object' || typeof right !== 'object' || left === null || right === null) { + return false; + } + + return this.stableSerialize(left) === this.stableSerialize(right); + } + + private combineQueryResponses( + originalTargets: CHQuery[], + responses: DataQueryResponse[] + ): DataQueryResponse { + const combinedResponse = responses[0] || { data: [] }; + const framesByRefId = new Map(); + const untrackedFrames: DataFrame[] = []; + const allErrors: DataQueryResponse['errors'] = []; + + for (const response of responses) { + if (response.errors) { + allErrors.push(...response.errors); + } + + for (const frame of response.data) { + if (!frame.refId) { + untrackedFrames.push(frame); + continue; + } + + const groupedFrames = framesByRefId.get(frame.refId) || []; + groupedFrames.push(frame); + framesByRefId.set(frame.refId, groupedFrames); + } + } + + const orderedFrames = originalTargets.flatMap((target) => framesByRefId.get(target.refId) || []); + + return { + ...combinedResponse, + data: orderedFrames.concat(untrackedFrames), + ...(allErrors.length > 0 ? { errors: allErrors } : {}), + }; } private runQuery(request: Partial, options?: any): Promise {