Skip to content

Commit 07cf814

Browse files
committed
add tests for pre-agg jobs
1 parent d804630 commit 07cf814

File tree

1 file changed

+175
-4
lines changed

1 file changed

+175
-4
lines changed

packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts

Lines changed: 175 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import R from 'ramda';
22
import { BaseDriver } from '@cubejs-backend/query-orchestrator';
33
import { pausePromise, SchemaFileRepository } from '@cubejs-backend/shared';
4-
import { CubejsServerCore } from '../../src';
5-
import { RefreshScheduler } from '../../src/core/RefreshScheduler';
6-
import { CompilerApi } from '../../src/core/CompilerApi';
4+
import { clearInterval } from 'timers';
5+
import { CubejsServerCore, CompilerApi, RefreshScheduler } from '../../src';
76

87
const schemaContent = `
98
cube('Foo', {
@@ -234,6 +233,16 @@ cube('Bar', {
234233
]),
235234
};
236235

236+
function createDeferred() {
237+
let resolve;
238+
let reject;
239+
const promise = new Promise((res, rej) => {
240+
resolve = res;
241+
reject = rej;
242+
});
243+
return { promise, resolve, reject };
244+
}
245+
237246
class MockDriver extends BaseDriver {
238247
public tables: any[] = [];
239248

@@ -395,7 +404,7 @@ const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertS
395404
jest.spyOn(serverCore, 'getCompilerApi').mockImplementation(async () => compilerApi);
396405

397406
const refreshScheduler = new RefreshScheduler(serverCore);
398-
return { refreshScheduler, compilerApi, mockDriver };
407+
return { refreshScheduler, compilerApi, mockDriver, serverCore };
399408
};
400409

401410
describe('Refresh Scheduler', () => {
@@ -819,6 +828,168 @@ describe('Refresh Scheduler', () => {
819828
expect(refreshResult.finished).toEqual(true);
820829
});
821830

831+
describe('Manual pre-aggregations rebuild via postBuildJobs', () => {
832+
test('All pre-aggregations', async () => {
833+
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
834+
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';
835+
836+
const {
837+
refreshScheduler, mockDriver, serverCore
838+
} = setupScheduler({ repository: repositoryWithPreAggregations });
839+
840+
const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' };
841+
842+
let finish = false;
843+
844+
while (!finish) {
845+
try {
846+
await refreshScheduler.postBuildJobs(
847+
ctx,
848+
{
849+
metadata: undefined,
850+
preAggregations: [],
851+
timezones: ['UTC', 'America/Los_Angeles'],
852+
forceBuildPreAggregations: false,
853+
throwErrors: false,
854+
preAggregationLoadConcurrency: 1,
855+
}
856+
);
857+
finish = true;
858+
} catch (err: any) {
859+
if (err.error !== 'Continue wait') {
860+
throw err;
861+
}
862+
}
863+
}
864+
865+
const deferred = createDeferred();
866+
const orchestrator = await serverCore.getOrchestratorApi(ctx);
867+
868+
const interval = setInterval(async () => {
869+
const queuedList = await orchestrator.getPreAggregationQueueStates();
870+
871+
if (queuedList.length === 0) {
872+
deferred.resolve();
873+
}
874+
}, 500);
875+
876+
await deferred.promise;
877+
clearInterval(interval);
878+
879+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5);
880+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(5);
881+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'UTC').length).toEqual(5);
882+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned') && o.timezone === 'America/Los_Angeles').length).toEqual(5);
883+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'UTC').length).toEqual(5);
884+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second') && o.timezone === 'America/Los_Angeles').length).toEqual(5);
885+
});
886+
887+
test('Only `first` pre-aggregation', async () => {
888+
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
889+
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';
890+
891+
const {
892+
refreshScheduler, mockDriver, serverCore
893+
} = setupScheduler({ repository: repositoryWithPreAggregations });
894+
895+
const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' };
896+
897+
let finish = false;
898+
899+
while (!finish) {
900+
try {
901+
await refreshScheduler.postBuildJobs(
902+
ctx,
903+
{
904+
metadata: undefined,
905+
preAggregations: [{ id: 'Foo.first' }],
906+
timezones: ['UTC', 'America/Los_Angeles'],
907+
forceBuildPreAggregations: false,
908+
throwErrors: false,
909+
}
910+
);
911+
finish = true;
912+
} catch (err: any) {
913+
if (err.error !== 'Continue wait') {
914+
throw err;
915+
}
916+
}
917+
}
918+
919+
const deferred = createDeferred();
920+
const orchestrator = await serverCore.getOrchestratorApi(ctx);
921+
922+
const interval = setInterval(async () => {
923+
const queuedList = await orchestrator.getPreAggregationQueueStates();
924+
925+
if (queuedList.length === 0) {
926+
deferred.resolve();
927+
}
928+
}, 500);
929+
930+
await deferred.promise;
931+
clearInterval(interval);
932+
933+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(5);
934+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(5);
935+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned')).length).toEqual(0);
936+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second')).length).toEqual(0);
937+
});
938+
939+
test('Only `first` pre-aggregation with dateRange', async () => {
940+
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
941+
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';
942+
943+
const {
944+
refreshScheduler, mockDriver, serverCore
945+
} = setupScheduler({ repository: repositoryWithPreAggregations });
946+
947+
const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' };
948+
949+
let finish = false;
950+
951+
while (!finish) {
952+
try {
953+
await refreshScheduler.postBuildJobs(
954+
ctx,
955+
{
956+
metadata: undefined,
957+
preAggregations: [{ id: 'Foo.first' }],
958+
timezones: ['UTC', 'America/Los_Angeles'],
959+
dateRange: ['2020-12-29T00:00:00.000', '2021-01-01T00:00:00.000'],
960+
forceBuildPreAggregations: false,
961+
throwErrors: false,
962+
}
963+
);
964+
finish = true;
965+
} catch (err: any) {
966+
if (err.error !== 'Continue wait') {
967+
throw err;
968+
}
969+
}
970+
}
971+
972+
const deferred = createDeferred();
973+
const orchestrator = await serverCore.getOrchestratorApi(ctx);
974+
975+
const interval = setInterval(async () => {
976+
const queuedList = await orchestrator.getPreAggregationQueueStates();
977+
978+
if (queuedList.length === 0) {
979+
deferred.resolve();
980+
}
981+
}, 500);
982+
983+
await deferred.promise;
984+
clearInterval(interval);
985+
986+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'UTC').length).toEqual(3);
987+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_first') && o.timezone === 'America/Los_Angeles').length).toEqual(3);
988+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_orphaned')).length).toEqual(0);
989+
expect(mockDriver.createdTables.filter(o => o.tableName.includes('foo_second')).length).toEqual(0);
990+
});
991+
});
992+
822993
test('Iterator waits before advance', async () => {
823994
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
824995
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';

0 commit comments

Comments
 (0)