Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 329 additions & 1 deletion src/data/CHDatasource.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import {
arrayToDataFrame,
CoreApp,
dateTime,
DataQueryRequest,
DataQueryResponse,
FieldType,
SupplementaryQueryType,
TimeRange,
toDataFrame,
Expand All @@ -12,7 +14,7 @@ import { DataSourceWithBackend } from '@grafana/runtime';
import { DataQuery } from '@grafana/schema';
import { mockDatasource } from '__mocks__/datasource';
import { cloneDeep } from 'lodash';
import { Observable, of } from 'rxjs';
import { firstValueFrom, Observable, of } from 'rxjs';
import { BuilderMode, ColumnHint, QueryBuilderOptions, QueryType } from 'types/queryBuilder';
import { CHBuilderQuery, CHQuery, CHSqlQuery, EditorType } from 'types/sql';
import { AdHocFilter } from './adHocFilter';
Expand Down Expand Up @@ -730,6 +732,10 @@ describe('ClickHouseDatasource', () => {
});

describe('query', () => {
afterEach(() => {
jest.restoreAllMocks();
});

it('attaches timezone metadata to targets', async () => {
const instance = cloneDeep(mockDatasource);
const spy = jest
Expand All @@ -748,6 +754,328 @@ describe('ClickHouseDatasource', () => {
timezone: 'UTC',
});
});

it('splits builder timeseries queries into aligned time chunks and merges the frames', async () => {
const instance = cloneDeep(mockDatasource);
const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => {
const frame = arrayToDataFrame([{ time: request.range.from.valueOf(), value: request.range.to.valueOf() }]);
frame.refId = request.targets[0].refId;
frame.fields[0].type = FieldType.time;
return of({ data: [frame] });
});

const result = await firstValueFrom(
instance.query({
range: {
from: dateTime(0),
to: dateTime(24 * 60 * 60 * 1000),
raw: { from: 'now-1d', to: 'now' },
},
requestId: 'request-1',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
],
timezone: 'UTC',
} as any)
);

expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls.map(([request]) => request.requestId)).toEqual([
'request-1-chunk-0',
'request-1-chunk-1',
'request-1-chunk-2',
]);
expect(spy.mock.calls.map(([request]) => request.targets[0].refId)).toEqual([
'A__chunk__0',
'A__chunk__1',
'A__chunk__2',
]);
expect(spy.mock.calls.map(([request]) => [request.range.from.valueOf(), request.range.to.valueOf()])).toEqual([
[0, 43200000],
[43200000, 85500000],
[85500000, 86400000],
]);
expect(result.data).toHaveLength(1);
expect(result.data[0].refId).toBe('A');
expect(result.data[0].fields[0].values).toEqual([0, 43200000, 85500000]);
expect(result.data[0].fields[1].values).toEqual([43200000, 85500000, 86400000]);
});

it('does not split non-timeseries targets when chunking the request', async () => {
const instance = cloneDeep(mockDatasource);
const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => {
const frame = arrayToDataFrame([{ ref: request.targets[0].refId }]);
frame.refId = request.targets[0].refId;
return of({ data: [frame] });
});

const result = await firstValueFrom(
instance.query({
range: {
from: dateTime(0),
to: dateTime(24 * 60 * 60 * 1000),
raw: { from: 'now-1d', to: 'now' },
},
requestId: 'request-2',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
{
refId: 'B',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT *',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.Table,
},
},
],
timezone: 'UTC',
} as any)
);

expect(spy).toHaveBeenCalledTimes(4);
expect(spy.mock.calls[0][0].targets.map((target: CHQuery) => target.refId)).toEqual(['B']);
expect(spy.mock.calls.slice(1).map(([request]) => request.targets[0].refId)).toEqual([
'A__chunk__0',
'A__chunk__1',
'A__chunk__2',
]);
expect(result.data.map((frame) => frame.refId)).toEqual(['A', 'B']);
});

it('does not chunk when time range is smaller than the first chunk duration', async () => {
const instance = cloneDeep(mockDatasource);
const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => {
const frame = arrayToDataFrame([{ time: 0, value: 10 }]);
frame.refId = request.targets[0].refId;
frame.fields[0].type = FieldType.time;
return of({ data: [frame] });
});

await firstValueFrom(
instance.query({
range: {
from: dateTime(0),
to: dateTime(5 * 60 * 1000), // 5 minutes — less than the 15-min first chunk
raw: { from: 'now-5m', to: 'now' },
},
requestId: 'request-small',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
],
timezone: 'UTC',
} as any)
);

expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0].targets[0].refId).toBe('A');
});

it('caps the number of chunks to MAX_CHUNKS for very long ranges', async () => {
const instance = cloneDeep(mockDatasource);
const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => {
const frame = arrayToDataFrame([{ time: 0, value: 10 }]);
frame.refId = request.targets[0].refId;
frame.fields[0].type = FieldType.time;
return of({ data: [frame] });
});

await firstValueFrom(
instance.query({
range: {
from: dateTime(0),
to: dateTime(90 * 24 * 60 * 60 * 1000), // 90 days
raw: { from: 'now-90d', to: 'now' },
},
requestId: 'request-large',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
],
timezone: 'UTC',
} as any)
);

// Should never exceed MAX_CHUNKS (20) regardless of range duration
expect(spy.mock.calls.length).toBeLessThanOrEqual(20);
expect(spy.mock.calls.length).toBeGreaterThan(1);

// First chunk should start at fromMs and last chunk should end at toMs
const firstCall = spy.mock.calls[0][0];
const lastCall = spy.mock.calls[spy.mock.calls.length - 1][0];
expect(firstCall.range.from.valueOf()).toBe(0);
expect(lastCall.range.to.valueOf()).toBe(90 * 24 * 60 * 60 * 1000);
});

it('returns a single chunk when from equals to', async () => {
const instance = cloneDeep(mockDatasource);
const spy = jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => {
const frame = arrayToDataFrame([{ time: 0, value: 10 }]);
frame.refId = request.targets[0].refId;
frame.fields[0].type = FieldType.time;
return of({ data: [frame] });
});

const ts = 1000000;
await firstValueFrom(
instance.query({
range: {
from: dateTime(ts),
to: dateTime(ts),
raw: { from: 'now', to: 'now' },
},
requestId: 'request-zero',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
],
timezone: 'UTC',
} as any)
);

// from >= to means no chunking, single request
expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0].targets[0].refId).toBe('A');
});

it('deduplicates exact boundary rows when merging overlapping chunk responses', async () => {
const instance = cloneDeep(mockDatasource);
let callIndex = 0;
jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation(() => {
const frames = [
arrayToDataFrame([{ time: 0, value: 10 }, { time: 43200000, value: 20 }]),
arrayToDataFrame([{ time: 43200000, value: 20 }, { time: 85500000, value: 30 }]),
arrayToDataFrame([{ time: 85500000, value: 30 }, { time: 86400000, value: 40 }]),
];
const frame = frames[callIndex++];
frame.refId = `A__chunk__${callIndex - 1}`;
frame.fields[0].type = FieldType.time;
return of({ data: [frame] });
});

const result = await firstValueFrom(
instance.query({
range: {
from: dateTime(0),
to: dateTime(24 * 60 * 60 * 1000),
raw: { from: 'now-1d', to: 'now' },
},
requestId: 'request-3',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
],
timezone: 'UTC',
} as any)
);

expect(result.data[0].fields[0].values).toEqual([0, 43200000, 85500000, 86400000]);
expect(result.data[0].fields[1].values).toEqual([10, 20, 30, 40]);
});

it('propagates errors from all chunk responses', async () => {
const instance = cloneDeep(mockDatasource);
let callIndex = 0;
jest.spyOn(DataSourceWithBackend.prototype, 'query').mockImplementation((request: any) => {
const frame = arrayToDataFrame([{ time: callIndex * 1000, value: callIndex * 10 }]);
frame.refId = request.targets[0].refId;
frame.fields[0].type = FieldType.time;
const response: DataQueryResponse = { data: [frame] };
if (callIndex === 1) {
response.errors = [{ message: 'chunk error from middle chunk' }];
}
callIndex++;
return of(response);
});

const result = await firstValueFrom(
instance.query({
range: {
from: dateTime(0),
to: dateTime(24 * 60 * 60 * 1000),
raw: { from: 'now-1d', to: 'now' },
},
requestId: 'request-err',
targets: [
{
refId: 'A',
pluginVersion: '',
editorType: EditorType.Builder,
rawSql: 'SELECT count()',
builderOptions: {
database: 'default',
table: 'events',
queryType: QueryType.TimeSeries,
},
},
],
timezone: 'UTC',
} as any)
);

expect(result.errors).toBeDefined();
expect(result.errors!.length).toBe(1);
expect(result.errors![0].message).toBe('chunk error from middle chunk');
});
});

describe('SupplementaryQueriesSupport', () => {
Expand Down
Loading