Skip to content

Commit 4782d5f

Browse files
committed
fix: Possible race condition on Orchestrator API initialization
1 parent 0d13673 commit 4782d5f

File tree

2 files changed

+106
-87
lines changed

2 files changed

+106
-87
lines changed

packages/cubejs-server-core/src/core/OrchestratorStorage.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ export class OrchestratorStorage {
1212
});
1313
}
1414

15+
protected readonly initializers: Map<string, Promise<OrchestratorApi>> = new Map();
16+
1517
public has(orchestratorId: string) {
1618
return this.storage.has(orchestratorId);
1719
}
@@ -24,6 +26,29 @@ export class OrchestratorStorage {
2426
return this.storage.set(orchestratorId, orchestratorApi);
2527
}
2628

29+
public async getOrInit(orchestratorId: string, init: () => Promise<OrchestratorApi>): Promise<OrchestratorApi> {
30+
if (this.storage.has(orchestratorId)) {
31+
return this.storage.get(orchestratorId);
32+
}
33+
34+
if (this.initializers.has(orchestratorId)) {
35+
return this.initializers.get(orchestratorId);
36+
}
37+
38+
try {
39+
const initPromise = init();
40+
this.initializers.set(orchestratorId, initPromise);
41+
42+
const instance = await initPromise;
43+
44+
this.storage.set(orchestratorId, instance);
45+
46+
return instance;
47+
} finally {
48+
this.initializers.delete(orchestratorId);
49+
}
50+
}
51+
2752
public clear() {
2853
this.storage.clear();
2954
}

packages/cubejs-server-core/src/core/server.ts

Lines changed: 81 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -569,88 +569,49 @@ export class CubejsServerCore {
569569
public async getOrchestratorApi(context: RequestContext): Promise<OrchestratorApi> {
570570
const orchestratorId = await this.contextToOrchestratorId(context);
571571

572-
if (this.orchestratorStorage.has(orchestratorId)) {
573-
return this.orchestratorStorage.get(orchestratorId);
574-
}
575-
576-
/**
577-
* Hash table to store promises which will be resolved with the
578-
* datasource drivers. DriverFactoryByDataSource function is closure
579-
* this constant.
580-
*/
581-
const driverPromise: Record<string, Promise<BaseDriver>> = {};
582-
583-
let externalPreAggregationsDriverPromise: Promise<BaseDriver> | null = null;
584-
585-
const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this);
586-
const externalDbType = this.contextToExternalDbType(context);
587-
588-
// orchestrator options can be empty, if user didn't define it.
589-
// so we are adding default and configuring queues concurrency.
590-
const orchestratorOptions =
591-
this.optsHandler.getOrchestratorInitializedOptions(
592-
context,
593-
(await this.orchestratorOptions(context)) || {},
594-
);
595-
596-
const orchestratorApi = this.createOrchestratorApi(
572+
return this.orchestratorStorage.getOrInit(orchestratorId, async () => {
597573
/**
598-
* Driver factory function `DriverFactoryByDataSource`.
574+
* Hash table to store promises which will be resolved with the
575+
* datasource drivers. DriverFactoryByDataSource function is a closure
576+
* this constant.
599577
*/
600-
async (dataSource = 'default') => {
601-
if (driverPromise[dataSource]) {
602-
return driverPromise[dataSource];
603-
}
578+
const driverPromise: Record<string, Promise<BaseDriver>> = {};
604579

605-
// eslint-disable-next-line no-return-assign
606-
return driverPromise[dataSource] = (async () => {
607-
let driver: BaseDriver | null = null;
608-
609-
try {
610-
driver = await this.resolveDriver(
611-
{
612-
...context,
613-
dataSource,
614-
},
615-
orchestratorOptions,
616-
);
617-
618-
if (typeof driver === 'object' && driver != null) {
619-
if (driver.setLogger) {
620-
driver.setLogger(this.logger);
621-
}
580+
let externalPreAggregationsDriverPromise: Promise<BaseDriver> | null = null;
622581

623-
await driver.testConnection();
582+
const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this);
583+
const externalDbType = this.contextToExternalDbType(context);
624584

625-
return driver;
626-
}
627-
628-
throw new Error(
629-
`Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}`
630-
);
631-
} catch (e) {
632-
driverPromise[dataSource] = null;
633-
634-
if (driver) {
635-
await driver.release();
636-
}
585+
// orchestrator options can be empty, if user didn't define it.
586+
// so we are adding default and configuring queues concurrency.
587+
const orchestratorOptions =
588+
this.optsHandler.getOrchestratorInitializedOptions(
589+
context,
590+
(await this.orchestratorOptions(context)) || {},
591+
);
637592

638-
throw e;
639-
}
640-
})();
641-
},
642-
{
643-
externalDriverFactory: this.options.externalDriverFactory && (async () => {
644-
if (externalPreAggregationsDriverPromise) {
645-
return externalPreAggregationsDriverPromise;
593+
return this.createOrchestratorApi(
594+
/**
595+
* Driver factory function `DriverFactoryByDataSource`.
596+
*/
597+
async (dataSource = 'default') => {
598+
if (driverPromise[dataSource]) {
599+
return driverPromise[dataSource];
646600
}
647601

648602
// eslint-disable-next-line no-return-assign
649-
return externalPreAggregationsDriverPromise = (async () => {
603+
return driverPromise[dataSource] = (async () => {
650604
let driver: BaseDriver | null = null;
651605

652606
try {
653-
driver = await this.options.externalDriverFactory(context);
607+
driver = await this.resolveDriver(
608+
{
609+
...context,
610+
dataSource,
611+
},
612+
orchestratorOptions,
613+
);
614+
654615
if (typeof driver === 'object' && driver != null) {
655616
if (driver.setLogger) {
656617
driver.setLogger(this.logger);
@@ -662,10 +623,10 @@ export class CubejsServerCore {
662623
}
663624

664625
throw new Error(
665-
`Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}`
626+
`Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}`
666627
);
667628
} catch (e) {
668-
externalPreAggregationsDriverPromise = null;
629+
driverPromise[dataSource] = null;
669630

670631
if (driver) {
671632
await driver.release();
@@ -674,23 +635,56 @@ export class CubejsServerCore {
674635
throw e;
675636
}
676637
})();
677-
}),
678-
contextToDbType: async (dataSource) => contextToDbType({
679-
...context,
680-
dataSource
681-
}),
682-
// speedup with cache
683-
contextToExternalDbType: () => externalDbType,
684-
redisPrefix: orchestratorId,
685-
skipExternalCacheAndQueue: externalDbType === 'cubestore',
686-
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
687-
...orchestratorOptions,
688-
}
689-
);
638+
},
639+
{
640+
externalDriverFactory: this.options.externalDriverFactory && (async () => {
641+
if (externalPreAggregationsDriverPromise) {
642+
return externalPreAggregationsDriverPromise;
643+
}
644+
645+
// eslint-disable-next-line no-return-assign
646+
return externalPreAggregationsDriverPromise = (async () => {
647+
let driver: BaseDriver | null = null;
648+
649+
try {
650+
driver = await this.options.externalDriverFactory(context);
651+
if (typeof driver === 'object' && driver != null) {
652+
if (driver.setLogger) {
653+
driver.setLogger(this.logger);
654+
}
690655

691-
this.orchestratorStorage.set(orchestratorId, orchestratorApi);
656+
await driver.testConnection();
657+
658+
return driver;
659+
}
692660

693-
return orchestratorApi;
661+
throw new Error(
662+
`Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}`
663+
);
664+
} catch (e) {
665+
externalPreAggregationsDriverPromise = null;
666+
667+
if (driver) {
668+
await driver.release();
669+
}
670+
671+
throw e;
672+
}
673+
})();
674+
}),
675+
contextToDbType: async (dataSource) => contextToDbType({
676+
...context,
677+
dataSource
678+
}),
679+
// speedup with cache
680+
contextToExternalDbType: () => externalDbType,
681+
redisPrefix: orchestratorId,
682+
skipExternalCacheAndQueue: externalDbType === 'cubestore',
683+
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
684+
...orchestratorOptions,
685+
}
686+
);
687+
});
694688
}
695689

696690
protected createCompilerApi(repository, options: Record<string, any> = {}) {

0 commit comments

Comments
 (0)