Skip to content

Commit f023484

Browse files
authored
fix(bigquery-driver): Revert read-only implementation as LIMIT 1 queries generate too much usage (#6523)
1 parent 40195c9 commit f023484

File tree

1 file changed

+62
-276
lines changed

1 file changed

+62
-276
lines changed

packages/cubejs-bigquery-driver/src/BigQueryDriver.ts

Lines changed: 62 additions & 276 deletions
Original file line numberDiff line numberDiff line change
@@ -20,41 +20,12 @@ import {
2020
} from '@google-cloud/bigquery';
2121
import { Bucket, Storage } from '@google-cloud/storage';
2222
import {
23-
BaseDriver,
24-
DownloadTableCSVData,
25-
DriverInterface,
26-
QueryOptions,
27-
UnloadOptions,
28-
StreamTableDataWithTypes,
29-
DownloadQueryResultsOptions,
30-
DownloadQueryResultsResult,
31-
DownloadTableMemoryData,
32-
TableStructure,
33-
Row,
34-
DriverCapabilities,
23+
BaseDriver, DownloadTableCSVData,
24+
DriverInterface, QueryOptions, StreamTableData,
3525
} from '@cubejs-backend/base-driver';
3626
import { Query } from '@google-cloud/bigquery/build/src/bigquery';
3727
import { HydrationStream } from './HydrationStream';
3828

39-
const BQTypeToGenericType: Record<string, string> = {
40-
array: 'text', // ?
41-
bignumeric: 'double', // ?
42-
bool: 'boolean',
43-
bytes: 'text', // ?
44-
date: 'date',
45-
datetime: 'timestamp',
46-
float64: 'double',
47-
geography: 'text', // ?
48-
int64: 'bigint',
49-
interval: 'text', // ?
50-
json: 'text', // ?
51-
numeric: 'double', // ?
52-
string: 'text',
53-
struct: 'text',
54-
time: 'timestamp',
55-
timestamp: 'timestamp',
56-
};
57-
5829
interface BigQueryDriverOptions extends BigQueryOptions {
5930
readOnly?: boolean
6031
projectId?: string,
@@ -141,10 +112,7 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
141112
getEnv('dbExportBucket', { dataSource }) ||
142113
getEnv('bigqueryExportBucket', { dataSource }),
143114
location: getEnv('bigqueryLocation', { dataSource }),
144-
readOnly: true,
145-
146115
...config,
147-
148116
pollTimeout: (
149117
config.pollTimeout ||
150118
getEnv('dbPollTimeout', { dataSource }) ||
@@ -154,8 +122,7 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
154122
config.pollMaxInterval ||
155123
getEnv('dbPollMaxInterval', { dataSource })
156124
) * 1000,
157-
exportBucketCsvEscapeSymbol:
158-
getEnv('dbExportBucketCsvEscapeSymbol', { dataSource }),
125+
exportBucketCsvEscapeSymbol: getEnv('dbExportBucketCsvEscapeSymbol', { dataSource }),
159126
};
160127

161128
getEnv('dbExportBucketType', {
@@ -179,262 +146,27 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
179146
];
180147
}
181148

182-
/**
183-
* Driver read-only flag.
184-
*/
185-
public readOnly() {
186-
return !!this.options.readOnly;
187-
}
188-
189-
/**
190-
* Returns driver's capabilities object.
191-
*/
192-
public capabilities(): DriverCapabilities {
193-
return { unloadWithoutTempTable: true };
194-
}
195-
196-
/**
197-
* Test driver's connection.
198-
*/
199149
public async testConnection() {
200150
await this.bigquery.query({
201151
query: 'SELECT ? AS number', params: ['1']
202152
});
203153
}
204154

205-
/**
206-
* Determines whether export bucket feature is configured or not.
207-
*/
208-
public async isUnloadSupported() {
209-
return this.bucket !== null;
210-
}
211-
212-
/**
213-
* Returns to the Cubestore an object with links to unloaded to the
214-
* export bucket data.
215-
*/
216-
public async unload(
217-
table: string,
218-
options: UnloadOptions,
219-
): Promise<DownloadTableCSVData> {
220-
if (!this.bucket) {
221-
throw new Error('Export bucket misconfigured.');
222-
}
223-
const types = options.query
224-
? await this.unloadWithSql(table, options)
225-
: await this.unloadWithTable(table);
226-
const csvFile = await this.getCsvFiles(table);
227-
return {
228-
types,
229-
csvFile,
230-
csvNoHeader: false,
231-
exportBucketCsvEscapeSymbol:
232-
this.options.exportBucketCsvEscapeSymbol,
233-
};
234-
}
235-
236-
/**
237-
* Unload data from a SQL query to an export bucket.
238-
*/
239-
private async unloadWithSql(
240-
table: string,
241-
options: UnloadOptions,
242-
): Promise<TableStructure> {
243-
if (!this.bucket) {
244-
throw new Error('Export bucket misconfigured.');
245-
}
246-
if (!options.query) {
247-
throw new Error('Unload query is missed.');
248-
}
249-
const types = await this.queryColumnTypes(options.query.sql);
250-
const unloadSql = `
251-
EXPORT DATA
252-
OPTIONS (
253-
uri='gs://${this.options.exportBucket}/${table}/*.csv.gz',
254-
format='CSV',
255-
overwrite=true,
256-
header=true,
257-
field_delimiter=',',
258-
compression='GZIP'
259-
) AS
260-
${options.query.sql}`;
261-
await this.query(unloadSql, []);
262-
return types;
263-
}
264-
265-
/**
266-
* Unload data from a temp table to an export bucket.
267-
* : Promise<TableStructure>
268-
*/
269-
private async unloadWithTable(table: string) {
270-
if (!this.bucket) {
271-
throw new Error('Export bucket misconfigured.');
272-
}
273-
const types = await this.tableColumnTypes(table);
274-
const [schema, tableName] = table.split('.');
275-
const bigQueryTable = this.bigquery
276-
.dataset(schema)
277-
.table(tableName);
278-
const [job] = await bigQueryTable.createExtractJob(
279-
this.bucket.file(`${table}/*.csv.gz`),
280-
{ format: 'CSV', gzip: true },
281-
);
282-
await this.waitForJobResult(job, { table }, false);
283-
return types;
284-
}
285-
286-
/**
287-
* Returns an array of signed URLs of the unloaded csv files.
288-
*/
289-
private async getCsvFiles(table: string): Promise<string[]> {
290-
if (!this.bucket) {
291-
throw new Error('Export bucket misconfigured.');
292-
}
293-
const [files] = await this.bucket.getFiles({
294-
prefix: `${table}/`,
295-
});
296-
const csvFiles = await Promise.all(files.map(async file => {
297-
const [url] = await file.getSignedUrl({
298-
action: 'read',
299-
expires: new Date(new Date().getTime() + 60 * 60 * 1000),
300-
});
301-
return url;
302-
}));
303-
return csvFiles;
304-
}
305-
306-
/**
307-
* Executes a query and returns either query result memory data or
308-
* query result stream, depending on options.
309-
*/
310-
public async downloadQueryResults(
311-
query: string,
312-
values: unknown[],
313-
options: DownloadQueryResultsOptions,
314-
): Promise<DownloadQueryResultsResult> {
315-
if (!options.streamImport) {
316-
return this.memory(query, values);
317-
} else {
318-
return this.stream(query, values);
319-
}
320-
}
321-
322-
/**
323-
* Executes query and returns table memory data that includes rows
324-
* and queried fields types.
325-
*/
326-
public async memory(
327-
query: string,
328-
values: unknown[],
329-
): Promise<DownloadTableMemoryData & { types: TableStructure }> {
330-
const types = await this.queryColumnTypes(query);
331-
const rows: Row[] = await this.query(query, values);
332-
return { types, rows };
333-
}
334-
335-
/**
336-
* Returns stream table object that includes query result stream and
337-
* queried fields types.
338-
*/
339-
public async stream(
340-
query: string,
341-
values: unknown[]
342-
): Promise<StreamTableDataWithTypes> {
343-
const types = await this.queryColumnTypes(query);
344-
const stream = await this.bigquery.createQueryStream({
345-
query,
346-
params: values,
347-
parameterMode: 'positional',
348-
useLegacySql: false
349-
});
350-
const rowStream = new HydrationStream();
351-
stream.pipe(rowStream);
352-
return {
353-
rowStream,
354-
types,
355-
release: async () => {
356-
//
357-
}
358-
};
359-
}
360-
361-
/**
362-
* Returns an array of queried fields meta info.
363-
*/
364-
public async queryColumnTypes(sql: string): Promise<TableStructure> {
365-
const rowSql = `${sql} LIMIT 1`;
366-
const row = (await this.runQueryJob({
367-
query: rowSql,
368-
params: [],
369-
parameterMode: 'positional',
370-
useLegacySql: false
371-
}, {}))[0][0];
372-
const cols = Object.keys(row)
373-
.map((f) => (`bqutil.fn.typeof(${f}) AS ${f}`))
374-
.join(', ');
375-
const typesSql = `
376-
WITH ORIGIN AS (${rowSql})
377-
SELECT ${cols}
378-
FROM ORIGIN`;
379-
const types = (await this.runQueryJob({
380-
query: typesSql,
381-
params: [],
382-
parameterMode: 'positional',
383-
useLegacySql: false
384-
}, {}))[0][0];
385-
return Object.keys(types).map((f) => ({
386-
name: f,
387-
type: this.toGenericType(types[f]),
388-
}));
389-
}
390-
391-
/**
392-
* Returns an array of table fields meta info.
393-
*/
394-
public async tableColumnTypes(table: string) {
395-
const [schema, name] = table.split('.');
396-
const [bigQueryTable] = await this.bigquery
397-
.dataset(schema)
398-
.table(name)
399-
.getMetadata();
400-
return bigQueryTable.schema.fields.map((c: any) => ({
401-
name: c.name,
402-
type: this.toGenericType(c.type),
403-
}));
404-
}
405-
406-
/**
407-
* Returns generic type for the provided BQ type.
408-
*/
409-
public toGenericType(type: string) {
410-
return BQTypeToGenericType[type.toLowerCase()] ||
411-
super.toGenericType(type.toLowerCase());
155+
public readOnly() {
156+
return !!this.options.readOnly;
412157
}
413158

414-
/**
415-
* Executes query and returns queried rows.
416-
*/
417-
public async query<R = unknown>(
418-
query: string,
419-
values: unknown[],
420-
options?: QueryOptions,
421-
): Promise<R[]> {
159+
public async query<R = unknown>(query: string, values: unknown[], options?: QueryOptions): Promise<R[]> {
422160
const data = await this.runQueryJob({
423161
query,
424162
params: values,
425163
parameterMode: 'positional',
426164
useLegacySql: false
427165
}, options);
166+
428167
return <any>(
429168
data[0] && data[0].map(
430-
row => R.map(
431-
value => (
432-
value && value.value && typeof value.value === 'string'
433-
? value.value
434-
: value
435-
),
436-
row,
437-
),
169+
row => R.map(value => (value && value.value && typeof value.value === 'string' ? value.value : value), row)
438170
)
439171
);
440172
}
@@ -493,10 +225,64 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
493225
}
494226
}
495227

228+
public async tableColumnTypes(table: string) {
229+
const [schema, name] = table.split('.');
230+
const [bigQueryTable] = await this.bigquery.dataset(schema).table(name).getMetadata();
231+
return bigQueryTable.schema.fields.map((c: any) => ({ name: c.name, type: this.toGenericType(c.type) }));
232+
}
233+
496234
public async createSchemaIfNotExists(schemaName: string): Promise<void> {
497235
await this.bigquery.dataset(schemaName).get({ autoCreate: true });
498236
}
499237

238+
public async isUnloadSupported() {
239+
return this.bucket !== null;
240+
}
241+
242+
public async stream(
243+
query: string,
244+
values: unknown[]
245+
): Promise<StreamTableData> {
246+
const stream = await this.bigquery.createQueryStream({
247+
query,
248+
params: values,
249+
parameterMode: 'positional',
250+
useLegacySql: false
251+
});
252+
253+
const rowStream = new HydrationStream();
254+
stream.pipe(rowStream);
255+
256+
return {
257+
rowStream,
258+
};
259+
}
260+
261+
public async unload(table: string): Promise<DownloadTableCSVData> {
262+
if (!this.bucket) {
263+
throw new Error('Unload is not configured');
264+
}
265+
266+
const destination = this.bucket.file(`${table}-*.csv.gz`);
267+
const [schema, tableName] = table.split('.');
268+
const bigQueryTable = this.bigquery.dataset(schema).table(tableName);
269+
const [job] = await bigQueryTable.createExtractJob(destination, { format: 'CSV', gzip: true });
270+
await this.waitForJobResult(job, { table }, false);
271+
const [files] = await this.bucket.getFiles({ prefix: `${table}-` });
272+
const urls = await Promise.all(files.map(async file => {
273+
const [url] = await file.getSignedUrl({
274+
action: 'read',
275+
expires: new Date(new Date().getTime() + 60 * 60 * 1000),
276+
});
277+
return url;
278+
}));
279+
280+
return {
281+
exportBucketCsvEscapeSymbol: this.options.exportBucketCsvEscapeSymbol,
282+
csvFile: urls,
283+
};
284+
}
285+
500286
public async loadPreAggregationIntoTable(
501287
preAggregationTableName: string,
502288
loadSql: string,

0 commit comments

Comments
 (0)