Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ export class QueryOrchestrator {
// TODO (@MikeNitsenko): Metadata queries need object params like [{ schema, table }]
// but QueryWithParams expects string[]. This forces JSON.stringify workaround.
[JSON.stringify(params)],
{ external: false, renewalThreshold: 24 * 60 * 60 }
{ external: false }
];
}

Expand All @@ -456,40 +456,31 @@ export class QueryOrchestrator {
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
syncJobId?: string;
expiration?: number;
} = {}
): Promise<T> {
const {
requestId,
forceRefresh = false,
renewalThreshold = 24 * 60 * 60,
expiration = 7 * 24 * 60 * 60,
syncJobId,
expiration = 30 * 24 * 60 * 60,
} = options;

const metadataQuery = this.createMetadataQuery(operation, params);
const cacheKey: CacheKey = [metadataQuery, dataSource];

const renewalKey = forceRefresh ? undefined : [
`METADATA_RENEWAL:${operation}`,
dataSource,
Math.floor(Date.now() / (renewalThreshold * 1000))
];
const cacheKey: CacheKey = syncJobId
? [metadataQuery, dataSource, syncJobId]
: [metadataQuery, dataSource];

return this.queryCache.cacheQueryResult(
metadataQuery,
[],
cacheKey,
expiration,
{
renewalThreshold,
renewalKey,
forceNoCache: forceRefresh,
requestId,
dataSource,
forceNoCache: !syncJobId,
useInMemory: true,
waitForRenew: true,
}
);
}
Expand All @@ -501,8 +492,7 @@ export class QueryOrchestrator {
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
syncJobId?: string;
expiration?: number;
} = {}
): Promise<QuerySchemasResult[]> {
Expand All @@ -522,8 +512,7 @@ export class QueryOrchestrator {
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
syncJobId?: string;
expiration?: number;
} = {}
): Promise<QueryTablesResult[]> {
Expand All @@ -543,8 +532,7 @@ export class QueryOrchestrator {
dataSource: string = 'default',
options: {
requestId?: string;
forceRefresh?: boolean;
renewalThreshold?: number;
syncJobId?: string;
expiration?: number;
} = {}
): Promise<QueryColumnsResult[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1955,30 +1955,34 @@ describe('QueryOrchestrator', () => {
]);
});

test('should use cache on second call', async () => {
// First call
await metadataOrchestrator.queryDataSourceSchemas();
// Second call should use cache
const result = await metadataOrchestrator.queryDataSourceSchemas();
test('should use cache when syncJobId is provided', async () => {
// First call with syncJobId
await metadataOrchestrator.queryDataSourceSchemas('default', { syncJobId: 'job-123' });

// Clear the mock calls
metadataMockDriver.getSchemas.mockClear();

// Second call with same syncJobId should use cache
const result = await metadataOrchestrator.queryDataSourceSchemas('default', { syncJobId: 'job-123' });

expect(result).toEqual([
{ schema_name: 'public' },
{ schema_name: 'analytics' },
{ schema_name: 'staging' }
]);

// Verify driver wasn't called again
expect(metadataMockDriver.getSchemas).not.toHaveBeenCalled();
});

test('should force refresh when requested', async () => {
test('should refresh when syncJobId is not provided', async () => {
// First call
await metadataOrchestrator.queryDataSourceSchemas();
// Second call with forceRefresh
const result = await metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true });
// Second call without syncJobId should refresh
await metadataOrchestrator.queryDataSourceSchemas();

expect(result).toEqual([
{ schema_name: 'public' },
{ schema_name: 'analytics' },
{ schema_name: 'staging' }
]);
// Driver should be called twice
expect(metadataMockDriver.getSchemas).toHaveBeenCalledTimes(2);
});

test('should pass requestId option', async () => {
Expand Down Expand Up @@ -2008,11 +2012,11 @@ describe('QueryOrchestrator', () => {
expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledWith(schemas);
});

test('should cache results based on schema list', async () => {
test('should use cache when syncJobId is provided', async () => {
const schemas = [{ schema_name: 'public' }];

// First call - will execute and store in cache
await metadataOrchestrator.queryTablesForSchemas(schemas);
// First call with syncJobId - will execute and store in cache
await metadataOrchestrator.queryTablesForSchemas(schemas, 'default', { syncJobId: 'job-123' });

// Add a delay to ensure the first query has completed and cached its result
await new Promise(resolve => setTimeout(resolve, 100));
Expand All @@ -2024,8 +2028,8 @@ describe('QueryOrchestrator', () => {
// Our hash function should handle this correctly
const schemas2 = [{ schema_name: 'public' }];

// Second call should use cache
const result = await metadataOrchestrator.queryTablesForSchemas(schemas2);
// Second call with same syncJobId should use cache
const result = await metadataOrchestrator.queryTablesForSchemas(schemas2, 'default', { syncJobId: 'job-123' });

expect(result).toEqual([
{ schema_name: 'public', table_name: 'users' },
Expand All @@ -2044,11 +2048,11 @@ describe('QueryOrchestrator', () => {
expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledWith([]);
});

test('should force refresh when requested', async () => {
test('should refresh when syncJobId is not provided', async () => {
const schemas = [{ schema_name: 'public' }];

await metadataOrchestrator.queryTablesForSchemas(schemas);
await metadataOrchestrator.queryTablesForSchemas(schemas, 'default', { forceRefresh: true });
await metadataOrchestrator.queryTablesForSchemas(schemas);

expect(metadataMockDriver.getTablesForSpecificSchemas).toHaveBeenCalledTimes(2);
});
Expand Down Expand Up @@ -2111,11 +2115,11 @@ describe('QueryOrchestrator', () => {
expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledWith(tables);
});

test('should cache results based on table list', async () => {
test('should use cache when syncJobId is provided', async () => {
const tables = [{ schema_name: 'public', table_name: 'users' }];

// First call - will execute and store in cache
await metadataOrchestrator.queryColumnsForTables(tables);
// First call with syncJobId - will execute and store in cache
await metadataOrchestrator.queryColumnsForTables(tables, 'default', { syncJobId: 'job-123' });

// Add a delay to ensure the first query has completed and cached its result
await new Promise(resolve => setTimeout(resolve, 100));
Expand All @@ -2127,8 +2131,8 @@ describe('QueryOrchestrator', () => {
// Our hash function should handle this correctly
const tables2 = [{ schema_name: 'public', table_name: 'users' }];

// Second call should use cache
const result = await metadataOrchestrator.queryColumnsForTables(tables2);
// Second call with same syncJobId should use cache
const result = await metadataOrchestrator.queryColumnsForTables(tables2, 'default', { syncJobId: 'job-123' });

expect(result).toEqual([
{
Expand Down Expand Up @@ -2165,11 +2169,11 @@ describe('QueryOrchestrator', () => {
expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledWith([]);
});

test('should force refresh when requested', async () => {
test('should refresh when syncJobId is not provided', async () => {
const tables = [{ schema_name: 'public', table_name: 'users' }];

await metadataOrchestrator.queryColumnsForTables(tables);
await metadataOrchestrator.queryColumnsForTables(tables, 'default', { forceRefresh: true });
await metadataOrchestrator.queryColumnsForTables(tables);

expect(metadataMockDriver.getColumnsForSpecificTables).toHaveBeenCalledTimes(2);
});
Expand Down Expand Up @@ -2217,11 +2221,11 @@ describe('QueryOrchestrator', () => {
// Mock driver error
metadataMockDriver.getSchemas.mockRejectedValueOnce(new Error('Database connection failed'));

await expect(metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true })).rejects.toThrow('Database connection failed');
await expect(metadataOrchestrator.queryDataSourceSchemas()).rejects.toThrow('Database connection failed');

// Should retry on next call
metadataMockDriver.getSchemas.mockResolvedValueOnce([{ schema_name: 'recovered' }]);
const result = await metadataOrchestrator.queryDataSourceSchemas('default', { forceRefresh: true });
const result = await metadataOrchestrator.queryDataSourceSchemas();
expect(result).toEqual([{ schema_name: 'recovered' }]);
});
});
Expand Down
Loading