Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/drivers-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,12 @@ jobs:
snowflake
snowflake-encrypted-pk
snowflake-export-bucket-s3
snowflake-export-bucket-s3-prefix
snowflake-export-bucket-azure
snowflake-export-bucket-azure-prefix
snowflake-export-bucket-azure-via-storage-integration
snowflake-export-bucket-gcs
snowflake-export-bucket-gcs-prefix
# As per docs:
# Secrets cannot be directly referenced in if: conditionals. Instead, consider setting
# secrets as job-level environment variables, then referencing the environment variables
Expand Down Expand Up @@ -259,9 +262,12 @@ jobs:
- snowflake
- snowflake-encrypted-pk
- snowflake-export-bucket-s3
- snowflake-export-bucket-s3-prefix
- snowflake-export-bucket-azure
- snowflake-export-bucket-azure-prefix
- snowflake-export-bucket-azure-via-storage-integration
- snowflake-export-bucket-gcs
- snowflake-export-bucket-gcs-prefix
fail-fast: false

steps:
Expand Down
53 changes: 53 additions & 0 deletions packages/cubejs-base-driver/src/BaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ export type GoogleStorageClientConfig = {
credentials: any,
};

export type ParsedBucketUrl = {
/**
* may be 's3', 'wasbs', 'gs', 'azure', etc
*/
schema?: string;
bucketName: string;
/**
* prefix/path without leading and trailing / or empty string if not presented
*/
path: string;
username?: string;
password?: string;
original: string;
};

const sortByKeys = (unordered: any) => {
const ordered: any = {};

Expand Down Expand Up @@ -695,6 +710,44 @@ export abstract class BaseDriver implements DriverInterface {
query.query = `SELECT * FROM (${query.query}) AS t LIMIT ${query.limit}`;
}

/**
* Returns parsed bucket structure.
* Supported variants:
* s3://my-bucket-name/prefix/longer/
* s3://my-bucket-name
* my-bucket-name/some-path
* my-bucket-name
* wasbs://[email protected]
*/
protected parseBucketUrl(input: string | null | undefined): ParsedBucketUrl {
const original = input?.trim() || '';

if (!original) {
return {
bucketName: '',
path: '',
original,
};
}

const hasSchema = /^[a-zA-Z][a-zA-Z0-9+\-.]*:\/\//.test(original);
const normalized = hasSchema ? original : `schema://${original}`;

const url = new URL(normalized);

const path = url.pathname.replace(/^\/+|\/+$/g, '');
const schema = url.protocol.replace(/:$/, '');

return {
schema: schema || undefined,
bucketName: url.hostname,
path,
username: url.username || undefined,
password: url.password || undefined,
original,
};
}

/**
* Returns an array of signed AWS S3 URLs of the unloaded csv files.
*/
Expand Down
22 changes: 4 additions & 18 deletions packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,33 +584,19 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
);
}

/**
* Returns clean S3 bucket name and prefix path ending with / (if set)
*/
private parseS3Path(input: string): { bucket: string; prefix: string | null } {
let trimmed = input.startsWith('s3://') ? input.slice(5) : input;
trimmed = trimmed.endsWith('/') ? trimmed.slice(0, -1) : trimmed;
const parts = trimmed.split('/');
const bucket = parts[0];
const prefixParts = parts.slice(1);
const prefix = prefixParts.length > 0 ? `${prefixParts.join('/')}/` : null;

return { bucket, prefix };
}

public async unloadFromQuery(sql: string, params: unknown[], _options: UnloadOptions): Promise<DownloadTableCSVData> {
if (!this.config.exportBucket) {
throw new Error('Unload is not configured');
}

const types = await this.queryColumnTypes(`(${sql})`, params);
const { bucket, prefix } = this.parseS3Path(this.config.exportBucket.bucketName);
const exportPrefix = prefix ? `${prefix}${uuidv4()}` : uuidv4();
const { bucketName, path } = this.parseBucketUrl(this.config.exportBucket.bucketName);
const exportPrefix = path ? `${path}/${uuidv4()}` : uuidv4();

const formattedQuery = sqlstring.format(`
INSERT INTO FUNCTION
s3(
'https://${bucket}.s3.${this.config.exportBucket.region}.amazonaws.com/${exportPrefix}/export.csv.gz',
'https://${bucketName}.s3.${this.config.exportBucket.region}.amazonaws.com/${exportPrefix}/export.csv.gz',
'${this.config.exportBucket.keyId}',
'${this.config.exportBucket.secretKey}',
'CSV'
Expand All @@ -628,7 +614,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
},
region: this.config.exportBucket.region,
},
bucket,
bucketName,
exportPrefix,
);

Expand Down
27 changes: 17 additions & 10 deletions packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,6 @@ export class DatabricksDriver extends JDBCDriver {
// s3://real-bucket-name
// wasbs://[email protected]
// The extractors in BaseDriver expect just clean bucket name
const url = new URL(this.config.exportBucket || '');
const prefix = url.pathname.slice(1);
const delimiter = (prefix && !prefix.endsWith('/')) ? '/' : '';
const objectSearchPrefix = `${prefix}${delimiter}${tableName}`;

if (this.config.bucketType === 'azure') {
const {
Expand All @@ -733,14 +729,22 @@ export class DatabricksDriver extends JDBCDriver {
azureTenantId: tenantId,
azureClientSecret: clientSecret
} = this.config;

const { bucketName, path, username } = this.parseBucketUrl(this.config.exportBucket);
const azureBucketPath = `${bucketName}/${username}`;
const exportPrefix = path ? `${path}/${tableName}` : tableName;

return this.extractFilesFromAzure(
{ azureKey, clientId, tenantId, clientSecret },
// Databricks uses different bucket address form, so we need to transform it
// to the one understandable by extractFilesFromAzure implementation
`${url.host}/${url.username}`,
objectSearchPrefix,
azureBucketPath,
exportPrefix,
);
} else if (this.config.bucketType === 's3') {
const { bucketName, path } = this.parseBucketUrl(this.config.exportBucket);
const exportPrefix = path ? `${path}/${tableName}` : tableName;

return this.extractUnloadedFilesFromS3(
{
credentials: {
Expand All @@ -749,14 +753,17 @@ export class DatabricksDriver extends JDBCDriver {
},
region: this.config.awsRegion || '',
},
url.host,
objectSearchPrefix,
bucketName,
exportPrefix,
);
} else if (this.config.bucketType === 'gcs') {
const { bucketName, path } = this.parseBucketUrl(this.config.exportBucket);
const exportPrefix = path ? `${path}/${tableName}` : tableName;

return this.extractFilesFromGCS(
{ credentials: this.config.gcsCredentials },
url.host,
objectSearchPrefix,
bucketName,
exportPrefix,
);
} else {
throw new Error(`Unsupported export bucket type: ${
Expand Down
72 changes: 58 additions & 14 deletions packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
}

/**
* Executes query and rerutns queried rows.
* Executes query and returns queried rows.
*/
public async query<R = unknown>(query: string, values?: unknown[]): Promise<R> {
return this.getConnection().then((connection) => this.execute<R>(connection, query, values));
Expand Down Expand Up @@ -545,10 +545,25 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
} else {
const types = await this.queryColumnTypes(options.query.sql, options.query.params);
const connection = await this.getConnection();
const { bucketType, bucketName } =
const { bucketType } =
<SnowflakeDriverExportBucket> this.config.exportBucket;

let bucketName: string;
let exportPrefix: string;
let path: string;

if (bucketType === 'azure') {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
const pathArr = path.split('/');
bucketName = `${bucketName}/${pathArr[0]}`;
exportPrefix = pathArr.length > 1 ? `${pathArr.slice(1).join('/')}/${tableName}` : tableName;
} else {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
exportPrefix = path ? `${path}/${tableName}` : tableName;
}

const unloadSql = `
COPY INTO '${bucketType}://${bucketName}/${tableName}/'
COPY INTO '${bucketType}://${bucketName}/${exportPrefix}/'
FROM (${options.query.sql})
${this.exportOptionsClause(options)}`;
const result = await this.execute<UnloadResponse[]>(
Expand Down Expand Up @@ -594,10 +609,25 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
): Promise<TableStructure> {
const types = await this.tableColumnTypes(tableName);
const connection = await this.getConnection();
const { bucketType, bucketName } =
const { bucketType } =
<SnowflakeDriverExportBucket> this.config.exportBucket;

let bucketName: string;
let exportPrefix: string;
let path: string;

if (bucketType === 'azure') {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
const pathArr = path.split('/');
bucketName = `${bucketName}/${pathArr[0]}`;
exportPrefix = pathArr.length > 1 ? `${pathArr.slice(1).join('/')}/${tableName}` : tableName;
} else {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
exportPrefix = path ? `${path}/${tableName}` : tableName;
}

const unloadSql = `
COPY INTO '${bucketType}://${bucketName}/${tableName}/'
COPY INTO '${bucketType}://${bucketName}/${exportPrefix}/'
FROM ${tableName}
${this.exportOptionsClause(options)}`;
const result = await this.execute<UnloadResponse[]>(
Expand Down Expand Up @@ -695,36 +725,50 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
* Returns an array of signed URLs of the unloaded csv files.
*/
private async getCsvFiles(tableName: string): Promise<string[]> {
const { bucketType, bucketName } =
const { bucketType } =
<SnowflakeDriverExportBucket> this.config.exportBucket;

if (bucketType === 's3') {
const cfg = <SnowflakeDriverExportAWS> this.config.exportBucket;
const { keyId, secretKey, region } = <SnowflakeDriverExportAWS> this.config.exportBucket;

const { bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName);
const exportPrefix = path ? `${path}/${tableName}` : tableName;

return this.extractUnloadedFilesFromS3(
{
credentials: {
accessKeyId: cfg.keyId,
secretAccessKey: cfg.secretKey,
accessKeyId: keyId,
secretAccessKey: secretKey,
},
region: cfg.region,
region,
},
bucketName,
tableName,
exportPrefix,
);
} else if (bucketType === 'gcs') {
const { credentials } = (
<SnowflakeDriverExportGCS> this.config.exportBucket
);
return this.extractFilesFromGCS({ credentials }, bucketName, tableName);

const { bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName);
const exportPrefix = path ? `${path}/${tableName}` : tableName;

return this.extractFilesFromGCS({ credentials }, bucketName, exportPrefix);
} else if (bucketType === 'azure') {
const { azureKey, sasToken, clientId, tenantId, tokenFilePath } = (
<SnowflakeDriverExportAzure> this.config.exportBucket
);

const { bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName);
const pathArr = path.split('/');
const azureBucketPath = `${bucketName}/${pathArr[0]}`;

const exportPrefix = pathArr.length > 1 ? `${pathArr.slice(1).join('/')}/${tableName}` : tableName;

return this.extractFilesFromAzure(
{ azureKey, sasToken, clientId, tenantId, tokenFilePath },
bucketName,
tableName,
azureBucketPath,
exportPrefix,
);
} else {
throw new Error(`Unsupported export bucket type: ${bucketType}`);
Expand Down
31 changes: 31 additions & 0 deletions packages/cubejs-testing-drivers/fixtures/snowflake.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@
}
}
},
"export-bucket-s3-prefix": {
"cube": {
"environment": {
"CUBEJS_DB_EXPORT_BUCKET_TYPE": "s3",
"CUBEJS_DB_EXPORT_BUCKET": "snowflake-drivers-tests-preaggs/testing_prefix/for_export_buckets",
"CUBEJS_DB_EXPORT_BUCKET_AWS_KEY": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_KEY}",
"CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AWS_SECRET}",
"CUBEJS_DB_EXPORT_BUCKET_AWS_REGION": "us-west-1"
}
}
},
"export-bucket-azure": {
"cube": {
"environment": {
Expand All @@ -21,6 +32,16 @@
}
}
},
"export-bucket-azure-prefix": {
"cube": {
"environment": {
"CUBEJS_DB_EXPORT_BUCKET_TYPE": "azure",
"CUBEJS_DB_EXPORT_BUCKET": "coreteamdevtest.blob.core.windows.net/snowflake-drivers-tests-preaggs/testing_prefix/for_export_buckets/",
"CUBEJS_DB_EXPORT_BUCKET_AZURE_KEY": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_KEY}",
"CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN}"
}
}
},
"export-bucket-azure-via-storage-integration": {
"cube": {
"environment": {
Expand All @@ -41,6 +62,16 @@
}
}
},
"export-bucket-gcs-prefix": {
"cube": {
"environment": {
"CUBEJS_DB_EXPORT_BUCKET_TYPE": "gcs",
"CUBEJS_DB_EXPORT_BUCKET": "snowflake-drivers-tests-preaggs/testing_prefix/for_export_buckets",
"CUBEJS_DB_EXPORT_INTEGRATION": "drivers_tests_preaggs_gcs",
"CUBEJS_DB_EXPORT_GCS_CREDENTIALS": "${DRIVERS_TESTS_CUBEJS_DB_EXPORT_GCS_CREDENTIALS}"
}
}
},
"encrypted-pk": {
"cube": {
"environment": {
Expand Down
3 changes: 3 additions & 0 deletions packages/cubejs-testing-drivers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@
"snowflake-full": "yarn test-driver -i dist/test/snowflake-full.test.js",
"snowflake-encrypted-pk-full": "yarn test-driver -i dist/test/snowflake-encrypted-pk-full.test.js",
"snowflake-export-bucket-s3-full": "yarn test-driver -i dist/test/snowflake-export-bucket-s3-full.test.js",
"snowflake-export-bucket-s3-prefix-full": "yarn test-driver -i dist/test/snowflake-export-bucket-s3-prefix-full.test.js",
"snowflake-export-bucket-azure-full": "yarn test-driver -i dist/test/snowflake-export-bucket-azure-full.test.js",
"snowflake-export-bucket-azure-prefix-full": "yarn test-driver -i dist/test/snowflake-export-bucket-azure-prefix-full.test.js",
"snowflake-export-bucket-azure-via-storage-integration-full": "yarn test-driver -i dist/test/snowflake-export-bucket-azure-via-storage-integration-full.test.js",
"snowflake-export-bucket-gcs-full": "yarn test-driver -i dist/test/snowflake-export-bucket-gcs-full.test.js",
"snowflake-export-bucket-gcs-prefix-full": "yarn test-driver -i dist/test/snowflake-export-bucket-gcs-prefix-full.test.js",
"redshift-driver": "yarn test-driver -i dist/test/redshift-driver.test.js",
"redshift-core": "yarn test-driver -i dist/test/redshift-core.test.js",
"redshift-full": "yarn test-driver -i dist/test/redshift-full.test.js",
Expand Down
Loading
Loading