Skip to content

Commit 05fcdca

Browse files
authored
fix: Put temp table strategy drop under lock to avoid missing table r… (#6642)
* fix: Put temp table strategy drop under lock to avoid missing table race conditions * Add tests * Always suffix tables created by driver tests so they don't interfere with each other * Do not run core and driver suites to test if it affects full * Run suite sequential vs in parallel to avoid db use clashes * Run suite sequential vs in parallel to avoid db use clashes: use only full test * Run suite sequential vs in parallel to avoid db use clashes: increase jdbc test connection timeout
1 parent 8382ea5 commit 05fcdca

File tree

5 files changed

+100
-39
lines changed

5 files changed

+100
-39
lines changed

.github/workflows/drivers-tests.yml

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,6 @@ jobs:
108108
- mysql
109109
- postgres
110110
- snowflake
111-
suite:
112-
- driver
113-
- core
114-
- full
115111
fail-fast: false
116112

117113
steps:
@@ -157,23 +153,23 @@ jobs:
157153
158154
- name: Run tests
159155
env:
160-
# Athena
161-
DRIVERS_TESTS_ATHENA_CUBEJS_AWS_KEY: ${{ secrets.DRIVERS_TESTS_ATHENA_CUBEJS_AWS_KEY }}
162-
DRIVERS_TESTS_ATHENA_CUBEJS_AWS_SECRET: ${{ secrets.DRIVERS_TESTS_ATHENA_CUBEJS_AWS_SECRET }}
163-
164-
# BigQuery
165-
DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS }}
166-
167-
# Databricks
168-
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL }}
169-
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN }}
170-
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY }}
171-
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET }}
172-
173-
# Snowflake
174-
DRIVERS_TESTS_CUBEJS_DB_USER: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_USER }}
175-
DRIVERS_TESTS_CUBEJS_DB_PASS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_PASS }}
156+
# Athena
157+
DRIVERS_TESTS_ATHENA_CUBEJS_AWS_KEY: ${{ secrets.DRIVERS_TESTS_ATHENA_CUBEJS_AWS_KEY }}
158+
DRIVERS_TESTS_ATHENA_CUBEJS_AWS_SECRET: ${{ secrets.DRIVERS_TESTS_ATHENA_CUBEJS_AWS_SECRET }}
159+
160+
# BigQuery
161+
DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS }}
162+
163+
# Databricks
164+
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL }}
165+
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN }}
166+
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY }}
167+
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET }}
168+
169+
# Snowflake
170+
DRIVERS_TESTS_CUBEJS_DB_USER: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_USER }}
171+
DRIVERS_TESTS_CUBEJS_DB_PASS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_PASS }}
176172
run: |
177173
cd ./packages/cubejs-testing-drivers
178174
export DEBUG=testcontainers
179-
yarn ${{ matrix.database }}-${{ matrix.suite }}
175+
yarn ${{ matrix.database }}-full

packages/cubejs-jdbc-driver/src/JDBCDriver.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ export class JDBCDriver extends BaseDriver {
8888

8989
/**
9090
* Time to wait for a response from a connection after validation
91-
* request before determining it as not valid. Default - 10000 ms.
91+
* request before determining it as not valid. Default - 30000 ms.
9292
*/
9393
testConnectionTimeout?: number,
9494
} = {}
9595
) {
9696
super({
97-
testConnectionTimeout: config.testConnectionTimeout,
97+
testConnectionTimeout: config.testConnectionTimeout || 30000,
9898
});
9999

100100
const dataSource =

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
FROM_PARTITION_RANGE,
99
getEnv,
1010
inDbTimeZone,
11-
MAX_SOURCE_ROW_LIMIT, reformatInIsoLocal,
11+
MAX_SOURCE_ROW_LIMIT, MaybeCancelablePromise, reformatInIsoLocal,
1212
timeSeries,
1313
TO_PARTITION_RANGE,
1414
utcToLocalTimeZone,
@@ -1062,11 +1062,15 @@ export class PreAggregationLoader {
10621062
dropSourceTempTable: boolean,
10631063
) {
10641064
if (withTempTable && dropSourceTempTable) {
1065-
const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema);
1066-
const mappedActualTables = actualTables.map(t => `${this.preAggregation.preAggregationsSchema}.${t.table_name || t.TABLE_NAME}`);
1067-
if (mappedActualTables.includes(targetTableName)) {
1068-
await client.dropTable(targetTableName);
1069-
}
1065+
await this.withDropLock(false, async () => {
1066+
this.logger('Dropping source temp table', queryOptions);
1067+
1068+
const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema);
1069+
const mappedActualTables = actualTables.map(t => `${this.preAggregation.preAggregationsSchema}.${t.table_name || t.TABLE_NAME}`);
1070+
if (mappedActualTables.includes(targetTableName)) {
1071+
await client.dropTable(targetTableName);
1072+
}
1073+
});
10701074
}
10711075

10721076
// We must clean orphaned in any cases: success or exception
@@ -1369,6 +1373,11 @@ export class PreAggregationLoader {
13691373
});
13701374
}
13711375

1376+
private async withDropLock<T>(external: boolean, lockFn: () => MaybeCancelablePromise<T>): Promise<boolean> {
1377+
const lockKey = this.dropLockKey(external);
1378+
return this.queryCache.withLock(lockKey, 60 * 5, lockFn);
1379+
}
1380+
13721381
protected async dropOrphanedTables(
13731382
client: DriverInterface,
13741383
justCreatedTable: string,
@@ -1378,12 +1387,8 @@ export class PreAggregationLoader {
13781387
) {
13791388
await this.preAggregations.addTableUsed(justCreatedTable);
13801389

1381-
const lockKey = external
1382-
? 'drop-orphaned-tables-external'
1383-
: `drop-orphaned-tables:${this.preAggregation.dataSource}`;
1384-
1385-
return this.queryCache.withLock(lockKey, 60 * 5, async () => {
1386-
this.logger('Dropping orphaned tables', queryOptions);
1390+
return this.withDropLock(external, async () => {
1391+
this.logger('Dropping orphaned tables', { ...queryOptions, external });
13871392
const actualTables = await client.getTablesQuery(
13881393
this.preAggregation.preAggregationsSchema,
13891394
);
@@ -1436,10 +1441,17 @@ export class PreAggregationLoader {
14361441
await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table))));
14371442
this.logger('Dropping orphaned tables completed', {
14381443
...queryOptions,
1444+
external,
14391445
tablesToDrop: JSON.stringify(toDrop),
14401446
});
14411447
});
14421448
}
1449+
1450+
private dropLockKey(external: boolean) {
1451+
return external
1452+
? 'drop-orphaned-tables-external'
1453+
: `drop-orphaned-tables:${this.preAggregation.dataSource}`;
1454+
}
14431455
}
14441456

14451457
interface PreAggsPartitionRangeLoaderOpts {

packages/cubejs-query-orchestrator/test/unit/QueryOrchestrator.test.js

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class MockDriver {
88
this.tablesReady = [];
99
this.executedQueries = [];
1010
this.cancelledQueries = [];
11+
this.droppedTables = [];
1112
this.csvImport = csvImport;
1213
this.now = new Date().getTime();
1314
this.schemaData = schemaData;
@@ -95,8 +96,22 @@ class MockDriver {
9596
}
9697

9798
async dropTable(tableName) {
99+
if (this.droppedTables.indexOf(tableName) !== -1) {
100+
throw new Error(`Can't drop table twice: ${tableName}`);
101+
}
102+
this.droppedTables.push(tableName);
103+
console.log(`Driver drops ${tableName}`);
104+
if (!this.tablesObj.find(t => (t.tableName || t) === tableName)) {
105+
throw new Error(`Can't drop missing table: ${tableName}`);
106+
}
107+
await this.query(`DROP TABLE ${tableName}`);
108+
if (this.tablesDropDelay) {
109+
await this.delay(this.tablesDropDelay);
110+
}
111+
if (!this.tablesObj.find(t => (t.tableName || t) === tableName)) {
112+
throw new Error(`Can't drop missing table: ${tableName}`);
113+
}
98114
this.tablesObj = this.tablesObj.filter(t => (t.tableName || t) !== tableName);
99-
return this.query(`DROP TABLE ${tableName}`);
100115
}
101116

102117
async downloadTable(table, { csvImport } = {}) {
@@ -276,6 +291,7 @@ describe('QueryOrchestrator', () => {
276291
preAggregationsOptions: {
277292
...options('p1').preAggregationsOptions,
278293
dropPreAggregationsWithoutTouch: true,
294+
touchTablePersistTime: 1,
279295
},
280296
});
281297
mockDriver = mockDriverLocal;
@@ -1704,4 +1720,40 @@ describe('QueryOrchestrator', () => {
17041720
expect(data['Foo.query']).toMatch(/orders_d/);
17051721
}));
17061722
});
1723+
1724+
test('drop lock', async () => {
1725+
mockDriver.tablesDropDelay = 300;
1726+
for (let i = 0; i < 10; i++) {
1727+
const promises = [];
1728+
for (let j = 0; j < 10; j++) {
1729+
// eslint-disable-next-line no-loop-func
1730+
promises.push((async () => {
1731+
await mockDriver.delay(100 * j);
1732+
await queryOrchestratorDropWithoutTouch.fetchQuery({
1733+
query: `SELECT * FROM stb_pre_aggregations.orders_d2018110${j}`,
1734+
values: [],
1735+
cacheKeyQueries: {
1736+
renewalThreshold: 21600,
1737+
queries: []
1738+
},
1739+
preAggregations: [{
1740+
preAggregationsSchema: 'stb_pre_aggregations',
1741+
tableName: `stb_pre_aggregations.orders_d2018110${j}`,
1742+
loadSql: [`CREATE TABLE stb_pre_aggregations.orders_d2018110${j} AS SELECT * FROM public.orders_d`, []],
1743+
invalidateKeyQueries: [['SELECT NOW()', [], {
1744+
renewalThreshold: 0.001
1745+
}]],
1746+
external: true,
1747+
}],
1748+
requestId: `drop lock ${i}-${j}`
1749+
});
1750+
})());
1751+
}
1752+
1753+
await Promise.all(promises);
1754+
1755+
await mockDriver.delay(200);
1756+
}
1757+
// expect(mockDriver.tables).toContainEqual(expect.stringMatching(/orders_delay/));
1758+
});
17071759
});

packages/cubejs-testing-drivers/src/tests/testQueries.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ export function testQueries(type: string): void {
3030
}
3131
const apiToken = sign({}, 'mysupersecret');
3232

33+
const suffix = `_${new Date().getTime().toString(32)}`;
3334
beforeAll(async () => {
34-
env = await runEnvironment(type);
35+
env = await runEnvironment(type, suffix);
3536
process.env.CUBEJS_REFRESH_WORKER = 'true';
3637
process.env.CUBEJS_CUBESTORE_HOST = '127.0.0.1';
3738
process.env.CUBEJS_CUBESTORE_PORT = `${env.store.port}`;
@@ -46,7 +47,7 @@ export function testQueries(type: string): void {
4647
apiUrl: `http://127.0.0.1:${env.cube.port}/cubejs-api/v1`,
4748
});
4849
driver = (await getDriver(type)).source;
49-
query = getCreateQueries(type);
50+
query = getCreateQueries(type, suffix);
5051
await Promise.all(query.map(async (q) => {
5152
await driver.query(q);
5253
}));
@@ -55,7 +56,7 @@ export function testQueries(type: string): void {
5556
afterAll(async () => {
5657
const tables = Object
5758
.keys(fixtures.tables)
58-
.map((key: string) => fixtures.tables[<'products' | 'customers' | 'ecommerce'>key]);
59+
.map((key: string) => `${fixtures.tables[<'products' | 'customers' | 'ecommerce'>key]}${suffix}`);
5960
await Promise.all(
6061
tables.map(async (t) => {
6162
await driver.dropTable(t);

0 commit comments

Comments
 (0)