-
Notifications
You must be signed in to change notification settings - Fork 44
[PECOBLR-314] add thrift protocol version handling #292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+749
−20
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
11860bd
add thrift protocol version handling
shivam2680 c5e8d2f
added e2e test for all protocol version
shivam2680 67e7243
updated test
shivam2680 82dbddf
fix lint failure
shivam2680 ad059d7
lint fix
shivam2680 8a607c4
addressed comments
shivam2680 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ import { | |
| TSparkDirectResults, | ||
| TSparkArrowTypes, | ||
| TSparkParameter, | ||
| TProtocolVersion, | ||
| TExecuteStatementReq, | ||
| } from '../thrift/TCLIService_types'; | ||
| import IDBSQLSession, { | ||
| ExecuteStatementOptions, | ||
|
|
@@ -29,7 +31,7 @@ import IOperation from './contracts/IOperation'; | |
| import DBSQLOperation from './DBSQLOperation'; | ||
| import Status from './dto/Status'; | ||
| import InfoValue from './dto/InfoValue'; | ||
| import { definedOrError, LZ4 } from './utils'; | ||
| import { definedOrError, LZ4, ProtocolVersion } from './utils'; | ||
| import CloseableCollection from './utils/CloseableCollection'; | ||
| import { LogLevel } from './contracts/IDBSQLLogger'; | ||
| import HiveDriverError from './errors/HiveDriverError'; | ||
|
|
@@ -74,13 +76,16 @@ function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undef | |
| }; | ||
| } | ||
|
|
||
| function getArrowOptions(config: ClientConfig): { | ||
| function getArrowOptions( | ||
| config: ClientConfig, | ||
| serverProtocolVersion: TProtocolVersion | undefined | null, | ||
| ): { | ||
| canReadArrowResult: boolean; | ||
| useArrowNativeTypes?: TSparkArrowTypes; | ||
| } { | ||
| const { arrowEnabled = true, useArrowNativeTypes = true } = config; | ||
|
|
||
| if (!arrowEnabled) { | ||
| if (!arrowEnabled || !ProtocolVersion.supportsArrowMetadata(serverProtocolVersion)) { | ||
| return { | ||
| canReadArrowResult: false, | ||
| }; | ||
|
|
@@ -136,6 +141,7 @@ function getQueryParameters( | |
| interface DBSQLSessionConstructorOptions { | ||
| handle: TSessionHandle; | ||
| context: IClientContext; | ||
| serverProtocolVersion?: TProtocolVersion; | ||
| } | ||
|
|
||
| export default class DBSQLSession implements IDBSQLSession { | ||
|
|
@@ -145,14 +151,22 @@ export default class DBSQLSession implements IDBSQLSession { | |
|
|
||
| private isOpen = true; | ||
|
|
||
| private serverProtocolVersion?: TProtocolVersion; | ||
|
|
||
| public onClose?: () => void; | ||
|
|
||
| private operations = new CloseableCollection<DBSQLOperation>(); | ||
|
|
||
| constructor({ handle, context }: DBSQLSessionConstructorOptions) { | ||
| constructor({ handle, context, serverProtocolVersion }: DBSQLSessionConstructorOptions) { | ||
| this.sessionHandle = handle; | ||
| this.context = context; | ||
| // Get the server protocol version from the provided parameter (from TOpenSessionResp) | ||
| // rather than from the handle | ||
| this.serverProtocolVersion = serverProtocolVersion; | ||
| this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.id}`); | ||
| if (this.serverProtocolVersion) { | ||
| this.context.getLogger().log(LogLevel.debug, `Server protocol version: ${this.serverProtocolVersion}`); | ||
| } | ||
| } | ||
|
|
||
| public get id() { | ||
|
|
@@ -193,17 +207,29 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
| const operationPromise = driver.executeStatement({ | ||
|
|
||
| const request = new TExecuteStatementReq({ | ||
| sessionHandle: this.sessionHandle, | ||
| statement, | ||
| queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined, | ||
| runAsync: true, | ||
| ...getDirectResultsOptions(options.maxRows, clientConfig), | ||
| ...getArrowOptions(clientConfig), | ||
| canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch, | ||
| parameters: getQueryParameters(options.namedParameters, options.ordinalParameters), | ||
| canDecompressLZ4Result: (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4), | ||
| ...getArrowOptions(clientConfig, this.serverProtocolVersion), | ||
| }); | ||
|
|
||
| if (ProtocolVersion.supportsParameterizedQueries(this.serverProtocolVersion)) { | ||
| request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters); | ||
| } | ||
|
|
||
| if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion)) { | ||
| request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4); | ||
| } | ||
|
|
||
| if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) { | ||
| request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch; | ||
| } | ||
|
|
||
| const operationPromise = driver.executeStatement(request); | ||
| const response = await this.handleResponse(operationPromise); | ||
| const operation = this.createOperation(response); | ||
|
|
||
|
|
@@ -352,9 +378,13 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getTypeInfo({ | ||
| sessionHandle: this.sessionHandle, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -371,9 +401,13 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getCatalogs({ | ||
| sessionHandle: this.sessionHandle, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -390,11 +424,15 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getSchemas({ | ||
| sessionHandle: this.sessionHandle, | ||
| catalogName: request.catalogName, | ||
| schemaName: request.schemaName, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -411,13 +449,17 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getTables({ | ||
| sessionHandle: this.sessionHandle, | ||
| catalogName: request.catalogName, | ||
| schemaName: request.schemaName, | ||
| tableName: request.tableName, | ||
| tableTypes: request.tableTypes, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -434,9 +476,13 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getTableTypes({ | ||
| sessionHandle: this.sessionHandle, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -453,13 +499,17 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
||
|
|
||
| const operationPromise = driver.getColumns({ | ||
| sessionHandle: this.sessionHandle, | ||
| catalogName: request.catalogName, | ||
| schemaName: request.schemaName, | ||
| tableName: request.tableName, | ||
| columnName: request.columnName, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -476,12 +526,16 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getFunctions({ | ||
| sessionHandle: this.sessionHandle, | ||
| catalogName: request.catalogName, | ||
| schemaName: request.schemaName, | ||
| functionName: request.functionName, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -492,12 +546,16 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getPrimaryKeys({ | ||
| sessionHandle: this.sessionHandle, | ||
| catalogName: request.catalogName, | ||
| schemaName: request.schemaName, | ||
| tableName: request.tableName, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
@@ -514,6 +572,10 @@ export default class DBSQLSession implements IDBSQLSession { | |
| await this.failIfClosed(); | ||
| const driver = await this.context.getDriver(); | ||
| const clientConfig = this.context.getConfig(); | ||
|
|
||
| // Set runAsync only if supported by protocol version | ||
| const runAsync = ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; | ||
|
|
||
| const operationPromise = driver.getCrossReference({ | ||
| sessionHandle: this.sessionHandle, | ||
| parentCatalogName: request.parentCatalogName, | ||
|
|
@@ -522,7 +584,7 @@ export default class DBSQLSession implements IDBSQLSession { | |
| foreignCatalogName: request.foreignCatalogName, | ||
| foreignSchemaName: request.foreignSchemaName, | ||
| foreignTableName: request.foreignTableName, | ||
| runAsync: true, | ||
| runAsync, | ||
| ...getDirectResultsOptions(request.maxRows, clientConfig), | ||
| }); | ||
| const response = await this.handleResponse(operationPromise); | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| import { TProtocolVersion } from '../../thrift/TCLIService_types'; | ||
|
|
||
| /** | ||
| * Protocol version information from Thrift TCLIService | ||
| * Each version adds certain features to the Spark/Hive API | ||
| * | ||
| * Databricks only supports SPARK_CLI_SERVICE_PROTOCOL_V1 (0xA501) or higher | ||
| */ | ||
|
|
||
| /** | ||
| * Check if the current protocol version supports a specific feature | ||
| * @param serverProtocolVersion The protocol version received from server in TOpenSessionResp | ||
| * @param requiredVersion The minimum protocol version required for a feature | ||
| * @returns boolean indicating if the feature is supported | ||
| */ | ||
| export function isFeatureSupported( | ||
| serverProtocolVersion: TProtocolVersion | undefined | null, | ||
| requiredVersion: TProtocolVersion, | ||
| ): boolean { | ||
| if (serverProtocolVersion === undefined || serverProtocolVersion === null) { | ||
| return false; | ||
| } | ||
|
|
||
| return serverProtocolVersion >= requiredVersion; | ||
| } | ||
|
|
||
| /** | ||
| * Check if parameterized queries are supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V8 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if parameterized queries are supported | ||
| */ | ||
| export function supportsParameterizedQueries(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8); | ||
| } | ||
|
|
||
| /** | ||
| * Check if async metadata operations are supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V6 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if async metadata operations are supported | ||
| */ | ||
| export function supportsAsyncMetadataOperations(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6); | ||
| } | ||
|
|
||
| /** | ||
| * Check if result persistence mode is supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V7 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if result persistence mode is supported | ||
| */ | ||
| export function supportsResultPersistenceMode(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7); | ||
| } | ||
|
|
||
| /** | ||
| * Check if Arrow compression is supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V6 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if compressed Arrow batches are supported | ||
| */ | ||
| export function supportsArrowCompression(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6); | ||
| } | ||
|
|
||
| /** | ||
| * Check if Arrow metadata is supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V5 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if Arrow metadata is supported | ||
| */ | ||
| export function supportsArrowMetadata(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5); | ||
| } | ||
|
|
||
| /** | ||
| * Check if multiple catalogs are supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V4 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if multiple catalogs are supported | ||
| */ | ||
| export function supportsMultipleCatalogs(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V4); | ||
| } | ||
|
|
||
| /** | ||
| * Check if cloud object storage fetching is supported | ||
| * (Requires SPARK_CLI_SERVICE_PROTOCOL_V3 or higher) | ||
| * @param serverProtocolVersion The protocol version from server | ||
| * @returns boolean indicating if cloud fetching is supported | ||
| */ | ||
| export function supportsCloudFetch(serverProtocolVersion: TProtocolVersion | undefined | null): boolean { | ||
| return isFeatureSupported(serverProtocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V3); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. can we always log even if the
serverProtocolVersionis not defined, this helps us to know if it is defined or not.