Skip to content

Commit 52ed50a

Browse files
authored
[PECOBLR-314] add thrift protocol version handling (#292)
* add thrift protocol version handling * added e2e test for all protocol version * updated test * fix lint failure * lint fix * addressed pr comments
1 parent 056ed0b commit 52ed50a

File tree

8 files changed

+749
-20
lines changed

8 files changed

+749
-20
lines changed

lib/DBSQLClient.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
227227
const session = new DBSQLSession({
228228
handle: definedOrError(response.sessionHandle),
229229
context: this,
230+
serverProtocolVersion: response.serverProtocolVersion,
230231
});
231232
this.sessions.add(session);
232233
return session;

lib/DBSQLSession.ts

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import {
1212
TSparkDirectResults,
1313
TSparkArrowTypes,
1414
TSparkParameter,
15+
TProtocolVersion,
16+
TExecuteStatementReq,
1517
} from '../thrift/TCLIService_types';
1618
import IDBSQLSession, {
1719
ExecuteStatementOptions,
@@ -29,7 +31,7 @@ import IOperation from './contracts/IOperation';
2931
import DBSQLOperation from './DBSQLOperation';
3032
import Status from './dto/Status';
3133
import InfoValue from './dto/InfoValue';
32-
import { definedOrError, LZ4 } from './utils';
34+
import { definedOrError, LZ4, ProtocolVersion } from './utils';
3335
import CloseableCollection from './utils/CloseableCollection';
3436
import { LogLevel } from './contracts/IDBSQLLogger';
3537
import HiveDriverError from './errors/HiveDriverError';
@@ -74,13 +76,16 @@ function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undef
7476
};
7577
}
7678

77-
function getArrowOptions(config: ClientConfig): {
79+
function getArrowOptions(
80+
config: ClientConfig,
81+
serverProtocolVersion: TProtocolVersion | undefined | null,
82+
): {
7883
canReadArrowResult: boolean;
7984
useArrowNativeTypes?: TSparkArrowTypes;
8085
} {
8186
const { arrowEnabled = true, useArrowNativeTypes = true } = config;
8287

83-
if (!arrowEnabled) {
88+
if (!arrowEnabled || !ProtocolVersion.supportsArrowMetadata(serverProtocolVersion)) {
8489
return {
8590
canReadArrowResult: false,
8691
};
@@ -136,6 +141,7 @@ function getQueryParameters(
136141
interface DBSQLSessionConstructorOptions {
137142
handle: TSessionHandle;
138143
context: IClientContext;
144+
serverProtocolVersion?: TProtocolVersion;
139145
}
140146

141147
export default class DBSQLSession implements IDBSQLSession {
@@ -145,14 +151,28 @@ export default class DBSQLSession implements IDBSQLSession {
145151

146152
private isOpen = true;
147153

154+
private serverProtocolVersion?: TProtocolVersion;
155+
148156
public onClose?: () => void;
149157

150158
private operations = new CloseableCollection<DBSQLOperation>();
151159

152-
constructor({ handle, context }: DBSQLSessionConstructorOptions) {
160+
/**
161+
* Helper method to determine if runAsync should be set for metadata operations
162+
* @private
163+
* @returns true if supported by protocol version, undefined otherwise
164+
*/
165+
private getRunAsyncForMetadataOperations(): boolean | undefined {
166+
return ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined;
167+
}
168+
169+
constructor({ handle, context, serverProtocolVersion }: DBSQLSessionConstructorOptions) {
153170
this.sessionHandle = handle;
154171
this.context = context;
172+
// Get the server protocol version from the provided parameter (from TOpenSessionResp)
173+
this.serverProtocolVersion = serverProtocolVersion;
155174
this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.id}`);
175+
this.context.getLogger().log(LogLevel.debug, `Server protocol version: ${this.serverProtocolVersion}`);
156176
}
157177

158178
public get id() {
@@ -193,17 +213,29 @@ export default class DBSQLSession implements IDBSQLSession {
193213
await this.failIfClosed();
194214
const driver = await this.context.getDriver();
195215
const clientConfig = this.context.getConfig();
196-
const operationPromise = driver.executeStatement({
216+
217+
const request = new TExecuteStatementReq({
197218
sessionHandle: this.sessionHandle,
198219
statement,
199220
queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined,
200221
runAsync: true,
201222
...getDirectResultsOptions(options.maxRows, clientConfig),
202-
...getArrowOptions(clientConfig),
203-
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
204-
parameters: getQueryParameters(options.namedParameters, options.ordinalParameters),
205-
canDecompressLZ4Result: (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4),
223+
...getArrowOptions(clientConfig, this.serverProtocolVersion),
206224
});
225+
226+
if (ProtocolVersion.supportsParameterizedQueries(this.serverProtocolVersion)) {
227+
request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters);
228+
}
229+
230+
if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion)) {
231+
request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4);
232+
}
233+
234+
if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) {
235+
request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch;
236+
}
237+
238+
const operationPromise = driver.executeStatement(request);
207239
const response = await this.handleResponse(operationPromise);
208240
const operation = this.createOperation(response);
209241

@@ -352,9 +384,10 @@ export default class DBSQLSession implements IDBSQLSession {
352384
await this.failIfClosed();
353385
const driver = await this.context.getDriver();
354386
const clientConfig = this.context.getConfig();
387+
355388
const operationPromise = driver.getTypeInfo({
356389
sessionHandle: this.sessionHandle,
357-
runAsync: true,
390+
runAsync: this.getRunAsyncForMetadataOperations(),
358391
...getDirectResultsOptions(request.maxRows, clientConfig),
359392
});
360393
const response = await this.handleResponse(operationPromise);
@@ -371,9 +404,10 @@ export default class DBSQLSession implements IDBSQLSession {
371404
await this.failIfClosed();
372405
const driver = await this.context.getDriver();
373406
const clientConfig = this.context.getConfig();
407+
374408
const operationPromise = driver.getCatalogs({
375409
sessionHandle: this.sessionHandle,
376-
runAsync: true,
410+
runAsync: this.getRunAsyncForMetadataOperations(),
377411
...getDirectResultsOptions(request.maxRows, clientConfig),
378412
});
379413
const response = await this.handleResponse(operationPromise);
@@ -390,11 +424,12 @@ export default class DBSQLSession implements IDBSQLSession {
390424
await this.failIfClosed();
391425
const driver = await this.context.getDriver();
392426
const clientConfig = this.context.getConfig();
427+
393428
const operationPromise = driver.getSchemas({
394429
sessionHandle: this.sessionHandle,
395430
catalogName: request.catalogName,
396431
schemaName: request.schemaName,
397-
runAsync: true,
432+
runAsync: this.getRunAsyncForMetadataOperations(),
398433
...getDirectResultsOptions(request.maxRows, clientConfig),
399434
});
400435
const response = await this.handleResponse(operationPromise);
@@ -411,13 +446,14 @@ export default class DBSQLSession implements IDBSQLSession {
411446
await this.failIfClosed();
412447
const driver = await this.context.getDriver();
413448
const clientConfig = this.context.getConfig();
449+
414450
const operationPromise = driver.getTables({
415451
sessionHandle: this.sessionHandle,
416452
catalogName: request.catalogName,
417453
schemaName: request.schemaName,
418454
tableName: request.tableName,
419455
tableTypes: request.tableTypes,
420-
runAsync: true,
456+
runAsync: this.getRunAsyncForMetadataOperations(),
421457
...getDirectResultsOptions(request.maxRows, clientConfig),
422458
});
423459
const response = await this.handleResponse(operationPromise);
@@ -434,9 +470,10 @@ export default class DBSQLSession implements IDBSQLSession {
434470
await this.failIfClosed();
435471
const driver = await this.context.getDriver();
436472
const clientConfig = this.context.getConfig();
473+
437474
const operationPromise = driver.getTableTypes({
438475
sessionHandle: this.sessionHandle,
439-
runAsync: true,
476+
runAsync: this.getRunAsyncForMetadataOperations(),
440477
...getDirectResultsOptions(request.maxRows, clientConfig),
441478
});
442479
const response = await this.handleResponse(operationPromise);
@@ -453,13 +490,14 @@ export default class DBSQLSession implements IDBSQLSession {
453490
await this.failIfClosed();
454491
const driver = await this.context.getDriver();
455492
const clientConfig = this.context.getConfig();
493+
456494
const operationPromise = driver.getColumns({
457495
sessionHandle: this.sessionHandle,
458496
catalogName: request.catalogName,
459497
schemaName: request.schemaName,
460498
tableName: request.tableName,
461499
columnName: request.columnName,
462-
runAsync: true,
500+
runAsync: this.getRunAsyncForMetadataOperations(),
463501
...getDirectResultsOptions(request.maxRows, clientConfig),
464502
});
465503
const response = await this.handleResponse(operationPromise);
@@ -476,12 +514,13 @@ export default class DBSQLSession implements IDBSQLSession {
476514
await this.failIfClosed();
477515
const driver = await this.context.getDriver();
478516
const clientConfig = this.context.getConfig();
517+
479518
const operationPromise = driver.getFunctions({
480519
sessionHandle: this.sessionHandle,
481520
catalogName: request.catalogName,
482521
schemaName: request.schemaName,
483522
functionName: request.functionName,
484-
runAsync: true,
523+
runAsync: this.getRunAsyncForMetadataOperations(),
485524
...getDirectResultsOptions(request.maxRows, clientConfig),
486525
});
487526
const response = await this.handleResponse(operationPromise);
@@ -492,12 +531,13 @@ export default class DBSQLSession implements IDBSQLSession {
492531
await this.failIfClosed();
493532
const driver = await this.context.getDriver();
494533
const clientConfig = this.context.getConfig();
534+
495535
const operationPromise = driver.getPrimaryKeys({
496536
sessionHandle: this.sessionHandle,
497537
catalogName: request.catalogName,
498538
schemaName: request.schemaName,
499539
tableName: request.tableName,
500-
runAsync: true,
540+
runAsync: this.getRunAsyncForMetadataOperations(),
501541
...getDirectResultsOptions(request.maxRows, clientConfig),
502542
});
503543
const response = await this.handleResponse(operationPromise);
@@ -514,6 +554,7 @@ export default class DBSQLSession implements IDBSQLSession {
514554
await this.failIfClosed();
515555
const driver = await this.context.getDriver();
516556
const clientConfig = this.context.getConfig();
557+
517558
const operationPromise = driver.getCrossReference({
518559
sessionHandle: this.sessionHandle,
519560
parentCatalogName: request.parentCatalogName,
@@ -522,7 +563,7 @@ export default class DBSQLSession implements IDBSQLSession {
522563
foreignCatalogName: request.foreignCatalogName,
523564
foreignSchemaName: request.foreignSchemaName,
524565
foreignTableName: request.foreignTableName,
525-
runAsync: true,
566+
runAsync: this.getRunAsyncForMetadataOperations(),
526567
...getDirectResultsOptions(request.maxRows, clientConfig),
527568
});
528569
const response = await this.handleResponse(operationPromise);

lib/utils/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ import definedOrError from './definedOrError';
22
import buildUserAgentString from './buildUserAgentString';
33
import formatProgress, { ProgressUpdateTransformer } from './formatProgress';
44
import LZ4 from './lz4';
5+
import * as ProtocolVersion from './protocolVersion';
56

6-
export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4 };
7+
export { definedOrError, buildUserAgentString, formatProgress, ProgressUpdateTransformer, LZ4, ProtocolVersion };

lib/utils/protocolVersion.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { TProtocolVersion } from '../../thrift/TCLIService_types';
2+
3+
/**
4+
* Protocol version information from Thrift TCLIService
5+
* Each version adds certain features to the Spark/Hive API
6+
*
7+
* Databricks only supports SPARK_CLI_SERVICE_PROTOCOL_V1 (0xA501) or higher
8+
*/
9+
10+
/**
11+
* Check if the current protocol version supports a specific feature
12+
* @param serverProtocolVersion The protocol version received from server in TOpenSessionResp
13+
* @param requiredVersion The minimum protocol version required for a feature
14+
* @returns boolean indicating if the feature is supported
15+
*/
16+
export function isFeatureSupported(
17+
serverProtocolVersion: TProtocolVersion | undefined | null,
18+
requiredVersion: TProtocolVersion,
19+
): boolean {
20+
if (serverProtocolVersion === undefined || serverProtocolVersion === null) {
21+
return false;
22+
}
23+
24+
return serverProtocolVersion >= requiredVersion;
25+
}
26+
27+
/**
28+
* Check if parameterized queries are supported
29+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V8 or higher)
30+
* @param serverProtocolVersion The protocol version from server
31+
* @returns boolean indicating if parameterized queries are supported
32+
*/
33+
export function supportsParameterizedQueries(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
34+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8);
35+
}
36+
37+
/**
38+
* Check if async metadata operations are supported
39+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V6 or higher)
40+
* @param serverProtocolVersion The protocol version from server
41+
* @returns boolean indicating if async metadata operations are supported
42+
*/
43+
export function supportsAsyncMetadataOperations(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
44+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6);
45+
}
46+
47+
/**
48+
* Check if result persistence mode is supported
49+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V7 or higher)
50+
* @param serverProtocolVersion The protocol version from server
51+
* @returns boolean indicating if result persistence mode is supported
52+
*/
53+
export function supportsResultPersistenceMode(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
54+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7);
55+
}
56+
57+
/**
58+
* Check if Arrow compression is supported
59+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V6 or higher)
60+
* @param serverProtocolVersion The protocol version from server
61+
* @returns boolean indicating if compressed Arrow batches are supported
62+
*/
63+
export function supportsArrowCompression(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
64+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6);
65+
}
66+
67+
/**
68+
* Check if Arrow metadata is supported
69+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V5 or higher)
70+
* @param serverProtocolVersion The protocol version from server
71+
* @returns boolean indicating if Arrow metadata is supported
72+
*/
73+
export function supportsArrowMetadata(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
74+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5);
75+
}
76+
77+
/**
78+
* Check if multiple catalogs are supported
79+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V4 or higher)
80+
* @param serverProtocolVersion The protocol version from server
81+
* @returns boolean indicating if multiple catalogs are supported
82+
*/
83+
export function supportsMultipleCatalogs(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
84+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V4);
85+
}
86+
87+
/**
88+
* Check if cloud object storage fetching is supported
89+
* (Requires SPARK_CLI_SERVICE_PROTOCOL_V3 or higher)
90+
* @param serverProtocolVersion The protocol version from server
91+
* @returns boolean indicating if cloud fetching is supported
92+
*/
93+
export function supportsCloudFetch(serverProtocolVersion: TProtocolVersion | undefined | null): boolean {
94+
return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V3);
95+
}

0 commit comments

Comments
 (0)