diff --git a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts index 34eb5547fa429..d229b4a5ce8ee 100644 --- a/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts +++ b/packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts @@ -4,6 +4,7 @@ * @fileoverview The `DatabricksDriver` and related types declaration. */ +import fetch from 'node-fetch'; import { assertDataSource, getEnv, } from '@cubejs-backend/shared'; import { DatabaseStructure, @@ -20,6 +21,7 @@ import { JDBCDriver, JDBCDriverConfiguration, } from '@cubejs-backend/jdbc-drive import { DatabricksQuery } from './DatabricksQuery'; import { extractAndRemoveUidPwdFromJdbcUrl, + parseDatabricksJdbcUrl, resolveJDBCDriver } from './helpers'; @@ -124,6 +126,11 @@ type ColumnInfo = { type: GenericDataBaseType; }; +export type ParsedConnectionProperties = { + host: string, + warehouseId: string, +}; + const DatabricksToGenericType: Record = { binary: 'hll_datasketches', 'decimal(10,0)': 'bigint', @@ -143,6 +150,8 @@ export class DatabricksDriver extends JDBCDriver { */ protected readonly config: DatabricksDriverConfiguration; + private readonly parsedConnectionProperties: ParsedConnectionProperties; + public static dialectClass() { return DatabricksQuery; } @@ -262,38 +271,50 @@ export class DatabricksDriver extends JDBCDriver { super(config); this.config = config; + this.parsedConnectionProperties = parseDatabricksJdbcUrl(url); this.showSparkProtocolWarn = showSparkProtocolWarn; } - /** - * @override - */ - public readOnly() { + public override readOnly() { return !!this.config.readOnly; } - /** - * @override - */ - public capabilities(): DriverCapabilities { + public override capabilities(): DriverCapabilities { return { unloadWithoutTempTable: true, incrementalSchemaLoading: true }; } - /** - * @override - */ - public setLogger(logger: any) { + public override setLogger(logger: any) { super.setLogger(logger); this.showDeprecations(); } - /** - * @override - */ - public async loadPreAggregationIntoTable( + public override async testConnection() { + const token = `Bearer ${this.config.properties.PWD}`; + + const res = await fetch(`https://${this.parsedConnectionProperties.host}/api/2.0/sql/warehouses/${this.parsedConnectionProperties.warehouseId}`, { + headers: { Authorization: token }, + }); + + if (!res.ok) { + throw new Error(`Databricks API error: ${res.statusText}`); + } + + const data = await res.json(); + + if (['DELETING', 'DELETED'].includes(data.state)) { + throw new Error(`Warehouse is being deleted (current state: ${data.state})`); + } + + // There is also DEGRADED status, but it doesn't mean that cluster is 100% not working... + if (data.health?.status === 'FAILED') { + throw new Error(`Warehouse is unhealthy: ${data.health?.summary}. Details: ${data.health?.details}`); + } + } + + public override async loadPreAggregationIntoTable( preAggregationTableName: string, loadSql: string, params: unknown[], @@ -320,10 +341,7 @@ export class DatabricksDriver extends JDBCDriver { } } - /** - * @override - */ - public async query( + public override async query( query: string, values: unknown[], ): Promise { @@ -357,10 +375,7 @@ export class DatabricksDriver extends JDBCDriver { } } - /** - * @override - */ - public dropTable(tableName: string, options?: QueryOptions): Promise { + public override dropTable(tableName: string, options?: QueryOptions): Promise { const tableFullName = `${ this.config?.catalog ? `${this.config.catalog}.` : '' }${tableName}`; @@ -392,10 +407,7 @@ export class DatabricksDriver extends JDBCDriver { } } - /** - * @override - */ - protected async getCustomClassPath() { + protected override async getCustomClassPath() { return resolveJDBCDriver(); } diff --git a/packages/cubejs-databricks-jdbc-driver/src/helpers.ts b/packages/cubejs-databricks-jdbc-driver/src/helpers.ts index 35b05dba3298f..7e62951d32a9c 100644 --- a/packages/cubejs-databricks-jdbc-driver/src/helpers.ts +++ b/packages/cubejs-databricks-jdbc-driver/src/helpers.ts @@ -2,6 +2,7 @@ import fs from 'fs'; import path from 'path'; import { downloadJDBCDriver, OSS_DRIVER_VERSION } from './installer'; +import type { ParsedConnectionProperties } from './DatabricksDriver'; async function fileExistsOr( fsPath: string, @@ -51,3 +52,33 @@ export function extractAndRemoveUidPwdFromJdbcUrl(jdbcUrl: string): [uid: string return [uid, pwd, cleanedUrl]; } + +export function parseDatabricksJdbcUrl(jdbcUrl: string): ParsedConnectionProperties { + const jdbcPrefix = 'jdbc:databricks://'; + const urlWithoutPrefix = jdbcUrl.slice(jdbcPrefix.length); + + const [hostPortAndPath, ...params] = urlWithoutPrefix.split(';'); + const [host] = hostPortAndPath.split(':'); + + const paramMap = new Map(); + for (const param of params) { + const [key, value] = param.split('='); + if (key && value) { + paramMap.set(key, value); + } + } + + const httpPath = paramMap.get('httpPath'); + if (!httpPath) { + throw new Error('Missing httpPath in JDBC URL'); + } + + const warehouseMatch = httpPath.match(/\/warehouses\/([a-zA-Z0-9]+)/); + if (!warehouseMatch) { + throw new Error('Could not extract warehouseId from httpPath'); + } + + const warehouseId = warehouseMatch[1]; + + return { host, warehouseId }; +}