diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index 98b9696ffe414..fe53c59a55888 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -446,7 +446,7 @@ export class QueryOrchestrator { // TODO (@MikeNitsenko): Metadata queries need object params like [{ schema, table }] // but QueryWithParams expects string[]. This forces JSON.stringify workaround. [JSON.stringify(params)], - { external: false, renewalThreshold: 24 * 60 * 60 } + { external: false } ]; } @@ -456,26 +456,20 @@ export class QueryOrchestrator { dataSource: string = 'default', options: { requestId?: string; - forceRefresh?: boolean; - renewalThreshold?: number; + syncJobId?: string; expiration?: number; } = {} ): Promise { const { requestId, - forceRefresh = false, - renewalThreshold = 24 * 60 * 60, - expiration = 7 * 24 * 60 * 60, + syncJobId, + expiration = 30 * 24 * 60 * 60, } = options; const metadataQuery = this.createMetadataQuery(operation, params); - const cacheKey: CacheKey = [metadataQuery, dataSource]; - - const renewalKey = forceRefresh ? undefined : [ - `METADATA_RENEWAL:${operation}`, - dataSource, - Math.floor(Date.now() / (renewalThreshold * 1000)) - ]; + const cacheKey: CacheKey = syncJobId + ? [metadataQuery, dataSource, syncJobId] + : [metadataQuery, dataSource]; return this.queryCache.cacheQueryResult( metadataQuery, @@ -483,13 +477,10 @@ export class QueryOrchestrator { cacheKey, expiration, { - renewalThreshold, - renewalKey, - forceNoCache: forceRefresh, requestId, dataSource, + forceNoCache: !syncJobId, useInMemory: true, - waitForRenew: true, } ); } @@ -501,8 +492,7 @@ export class QueryOrchestrator { dataSource: string = 'default', options: { requestId?: string; - forceRefresh?: boolean; - renewalThreshold?: number; + syncJobId?: string; expiration?: number; } = {} ): Promise { @@ -522,8 +512,7 @@ export class QueryOrchestrator { dataSource: string = 'default', options: { requestId?: string; - forceRefresh?: boolean; - renewalThreshold?: number; + syncJobId?: string; expiration?: number; } = {} ): Promise { @@ -543,8 +532,7 @@ export class QueryOrchestrator { dataSource: string = 'default', options: { requestId?: string; - forceRefresh?: boolean; - renewalThreshold?: number; + syncJobId?: string; expiration?: number; } = {} ): Promise { diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js b/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js index 08196c142739d..6e1346de0acd8 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js +++ b/packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js @@ -1955,30 +1955,34 @@ describe('QueryOrchestrator', () => { ]); }); - test('should use cache on second call', async () => { - // First call - await metadataOrchestrator.queryDataSourceSchemas(); - // Second call should use cache - const result = await metadataOrchestrator.queryDataSourceSchemas(); + test('should use cache when syncJobId is provided', async () => { + // First call with syncJobId + await metadataOrchestrator.queryDataSourceSchemas('default', { syncJobId: 'job-123' }); + + // Clear the mock calls + metadataMockDriver.getSchemas.mockClear(); + + // Second call with same syncJobId should use cache + const result = await metadataOrchestrator.queryDataSourceSchemas('default', { syncJobId: 'job-123' }); expect(result).toEqual([ { schema_name: 'public' }, { schema_name: 'analytics' }, { schema_name: 'staging' } ]); + + // Verify driver wasn't called again + expect(metadataMockDriver.getSchemas).not.toHaveBeenCalled(); }); - test('should force refresh when requested', async () => { + test('should refresh when syncJobId is not provided', async () => { // First call await metadataOrchestrator.queryDataSourceSchemas(); - // Second call with forceRefresh - const result = await metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true }); + // Second call without syncJobId should refresh + await metadataOrchestrator.queryDataSourceSchemas(); - expect(result).toEqual([ - { schema_name: 'public' }, - { schema_name: 'analytics' }, - { schema_name: 'staging' } - ]); + // Driver should be called twice + expect(metadataMockDriver.getSchemas).toHaveBeenCalledTimes(2); }); test('should pass requestId option', async () => { @@ -2008,11 +2012,11 @@ describe('QueryOrchestrator', () => { expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledWith(schemas); }); - test('should cache results based on schema list', async () => { + test('should use cache when syncJobId is provided', async () => { const schemas = [{ schema_name: 'public' }]; - // First call - will execute and store in cache - await metadataOrchestrator.queryTablesForSchemas(schemas); + // First call with syncJobId - will execute and store in cache + await metadataOrchestrator.queryTablesForSchemas(schemas, 'default', { syncJobId: 'job-123' }); // Add a delay to ensure the first query has completed and cached its result await new Promise(resolve => setTimeout(resolve, 100)); @@ -2024,8 +2028,8 @@ describe('QueryOrchestrator', () => { // Our hash function should handle this correctly const schemas2 = [{ schema_name: 'public' }]; - // Second call should use cache - const result = await metadataOrchestrator.queryTablesForSchemas(schemas2); + // Second call with same syncJobId should use cache + const result = await metadataOrchestrator.queryTablesForSchemas(schemas2, 'default', { syncJobId: 'job-123' }); expect(result).toEqual([ { schema_name: 'public', table_name: 'users' }, @@ -2044,11 +2048,11 @@ describe('QueryOrchestrator', () => { expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledWith([]); }); - test('should force refresh when requested', async () => { + test('should refresh when syncJobId is not provided', async () => { const schemas = [{ schema_name: 'public' }]; await metadataOrchestrator.queryTablesForSchemas(schemas); - await metadataOrchestrator.queryTablesForSchemas(schemas, 'default', { forceRefresh: true }); + await metadataOrchestrator.queryTablesForSchemas(schemas); expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledTimes(2); }); @@ -2111,11 +2115,11 @@ describe('QueryOrchestrator', () => { expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledWith(tables); }); - test('should cache results based on table list', async () => { + test('should use cache when syncJobId is provided', async () => { const tables = [{ schema_name: 'public', table_name: 'users' }]; - // First call - will execute and store in cache - await metadataOrchestrator.queryColumnsForTables(tables); + // First call with syncJobId - will execute and store in cache + await metadataOrchestrator.queryColumnsForTables(tables, 'default', { syncJobId: 'job-123' }); // Add a delay to ensure the first query has completed and cached its result await new Promise(resolve => setTimeout(resolve, 100)); @@ -2127,8 +2131,8 @@ describe('QueryOrchestrator', () => { // Our hash function should handle this correctly const tables2 = [{ schema_name: 'public', table_name: 'users' }]; - // Second call should use cache - const result = await metadataOrchestrator.queryColumnsForTables(tables2); + // Second call with same syncJobId should use cache + const result = await metadataOrchestrator.queryColumnsForTables(tables2, 'default', { syncJobId: 'job-123' }); expect(result).toEqual([ { @@ -2165,11 +2169,11 @@ describe('QueryOrchestrator', () => { expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledWith([]); }); - test('should force refresh when requested', async () => { + test('should refresh when syncJobId is not provided', async () => { const tables = [{ schema_name: 'public', table_name: 'users' }]; await metadataOrchestrator.queryColumnsForTables(tables); - await metadataOrchestrator.queryColumnsForTables(tables, 'default', { forceRefresh: true }); + await metadataOrchestrator.queryColumnsForTables(tables); expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledTimes(2); }); @@ -2217,11 +2221,11 @@ describe('QueryOrchestrator', () => { // Mock driver error metadataMockDriver.getSchemas.mockRejectedValueOnce(new Error('Database connection failed')); - await expect(metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true })).rejects.toThrow('Database connection failed'); + await expect(metadataOrchestrator.queryDataSourceSchemas()).rejects.toThrow('Database connection failed'); // Should retry on next call metadataMockDriver.getSchemas.mockResolvedValueOnce([{ schema_name: 'recovered' }]); - const result = await metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true }); + const result = await metadataOrchestrator.queryDataSourceSchemas(); expect(result).toEqual([{ schema_name: 'recovered' }]); }); });