diff --git a/packages/cubejs-prestodb-driver/src/PrestoDriver.ts b/packages/cubejs-prestodb-driver/src/PrestoDriver.ts index 1989bec444bfb..8232349ddbd3e 100644 --- a/packages/cubejs-prestodb-driver/src/PrestoDriver.ts +++ b/packages/cubejs-prestodb-driver/src/PrestoDriver.ts @@ -10,7 +10,8 @@ import { StreamOptions, StreamTableData, TableStructure, - BaseDriver + BaseDriver, + UnloadOptions } from '@cubejs-backend/base-driver'; import { getEnv, @@ -27,7 +28,14 @@ import SqlString from 'sqlstring'; const presto = require('presto-client'); -export type PrestoDriverConfiguration = { +export type PrestoDriverExportBucket = { + exportBucket?: string, + bucketType?: 'gcs', + credentials?: any, + exportBucketCsvEscapeSymbol?: string, +}; + +export type PrestoDriverConfiguration = PrestoDriverExportBucket & { host?: string; port?: string; catalog?: string; @@ -40,6 +48,7 @@ export type PrestoDriverConfiguration = { queryTimeout?: number; }; +const SUPPORTED_BUCKET_TYPES = ['gcs']; /** * Presto driver class. */ @@ -84,6 +93,9 @@ export class PrestoDriver extends BaseDriver implements DriverInterface { } : undefined, ssl: this.getSslOptions(dataSource), + bucketType: getEnv('dbExportBucketType', { supported: ['gcs'], dataSource }), + exportBucket: getEnv('dbExportBucket', { dataSource }), + credentials: getEnv('dbExportGCSCredentials', { dataSource }), ...config }; this.catalog = this.config.catalog; @@ -201,4 +213,124 @@ export class PrestoDriver extends BaseDriver implements DriverInterface { unloadWithoutTempTable: true }; } + + public async createSchemaIfNotExists(schemaName: string) { + await this.query( + `CREATE SCHEMA IF NOT EXISTS ${this.config.catalog}.${schemaName}`, + [], + ); + } + + // Export bucket methods + public async isUnloadSupported() { + return this.config.exportBucket !== undefined; + } + + public async unload(tableName: string, options: UnloadOptions) { + if (!this.config.exportBucket) { + throw new Error('Export bucket is not configured.'); + } + + if (!SUPPORTED_BUCKET_TYPES.includes(this.config.bucketType as string)) { + throw new Error(`Unsupported export bucket type: ${ + this.config.bucketType + }`); + } + + const types = options.query + ? await this.unloadWithSql(tableName, options.query.sql, options.query.params) + : await this.unloadWithTable(tableName); + + const csvFile = await this.getCsvFiles(tableName); + + return { + exportBucketCsvEscapeSymbol: this.config.exportBucketCsvEscapeSymbol, + csvFile, + types, + csvNoHeader: true, + }; + } + + private splitTableFullName(tableFullName: string) { + const [schema, tableName] = tableFullName.split('.'); + return { schema, tableName }; + } + + private generateTableColumnsForExport(types: {name: string, type: string}[]) { + return types.map((c) => `CAST(${c.name} AS varchar) ${c.name}`).join(', '); + } + + private async unloadWithSql(tableFullName: string, sql: string, params: any[]) { + return this.unloadGeneric({ + tableFullName, + typeSql: sql, + typeParams: params, + fromSql: sql, + fromParams: params + }); + } + + private async unloadWithTable(tableFullName: string) { + return this.unloadGeneric({ + tableFullName, + typeSql: `SELECT * FROM ${tableFullName}`, + typeParams: [], + fromSql: tableFullName, + fromParams: [] + }); + } + + private async unloadGeneric(params: {tableFullName: string, typeSql: string, typeParams: any[], fromSql: string, fromParams: any[]}) { + if (!this.config.exportBucket) { + throw new Error('Export bucket is not configured.'); + } + + const { bucketType, exportBucket } = this.config; + const types = await this.queryColumnTypes(params.typeSql, params.typeParams); + + const { schema, tableName } = this.splitTableFullName(params.tableFullName); + const tableWithCatalogAndSchema = `${this.config.catalog}.${schema}.${tableName}`; + const protocol = bucketType === 'gcs' ? 'gs' : bucketType; + const externalLocation = `${protocol}://${exportBucket}/${schema}/${tableName}`; + const withParams = `( external_location = '${externalLocation}', format = 'CSV')`; + const select = `SELECT ${this.generateTableColumnsForExport(types)} FROM (${params.fromSql})`; + const createTableQuery = `CREATE TABLE ${tableWithCatalogAndSchema} WITH ${withParams} AS (${select})`; + + try { + await this.query( + createTableQuery, + params.fromParams, + ); + } finally { + await this.query(`DROP TABLE IF EXISTS ${tableWithCatalogAndSchema}`, []); + } + + return types; + } + + public async queryColumnTypes(sql: string, params: unknown[]): Promise<{ name: string; type: string; }[]> { + const response = await this.stream(`${sql} LIMIT 0`, params || [], { highWaterMark: 1 }); + const result = []; + for (const column of response.types || []) { + result.push({ name: column.name, type: this.toGenericType(column.type) }); + } + return result; + } + + private async getCsvFiles( + tableFullName: string, + ): Promise { + if (!this.config.exportBucket) { + throw new Error('Export bucket is not configured.'); + } + const { bucketType, exportBucket, credentials } = this.config; + const { schema, tableName } = this.splitTableFullName(tableFullName); + + switch (bucketType) { + case 'gcs': + return this.extractFilesFromGCS({ credentials }, exportBucket, `${schema}/${tableName}`); + default: + throw new Error(`Unsupported export bucket type: ${bucketType}`); + } + } } diff --git a/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts b/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts index 6852f94e52f0a..fac22d846293f 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts @@ -129,4 +129,8 @@ export class PrestodbQuery extends BaseQuery { templates.types.binary = 'VARBINARY'; return templates; } + + public castToString(sql: any): string { + return `CAST(${sql} as VARCHAR)`; + } }