Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ export default class DBSQLSession implements IDBSQLSession {
request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters);
}

if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion)) {
request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4);
}

if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) {
request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch;
}

if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion) && request.canDownloadResult !== true) {
request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a debug log mentioning that even though LZ4 is set we're disabling it as cloud fetch is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a good idea to expose internal limitations. Whether data is downloaded in compressed or uncompressed in internal.


const operationPromise = driver.executeStatement(request);
const response = await this.handleResponse(operationPromise);
const operation = this.createOperation(response);
Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/arrow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ describe('Arrow support', () => {
'should handle LZ4 compressed data',
createTest(
async (session) => {
const operation = await session.executeStatement(`SELECT * FROM ${tableName}`);
const operation = await session.executeStatement(
`SELECT * FROM ${tableName}`,
{ useCloudFetch: false }, // Explicitly disable cloud fetch to test LZ4 compression
);
const result = await operation.fetchAll();
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);

Expand Down
7 changes: 4 additions & 3 deletions tests/e2e/cloudfetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ describe('CloudFetch', () => {
expect(fetchedRowCount).to.be.equal(queriedRowsCount);
});

it('should handle LZ4 compressed data', async () => {
it('should not use LZ4 compression with cloud fetch', async () => {
const cloudFetchConcurrentDownloads = 5;
const session = await openSession({
cloudFetchConcurrentDownloads,
useLZ4Compression: true,
useLZ4Compression: true, // This is ignored when cloud fetch is enabled
});

const queriedRowsCount = 10000000; // result has to be quite big to enable CloudFetch
Expand All @@ -126,7 +126,8 @@ describe('CloudFetch', () => {
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceof(ArrowResultConverter);
expect(resultHandler.source.source).to.be.instanceOf(CloudFetchResultHandler);
expect(resultHandler.source.source.isLZ4Compressed).to.be.true;
// LZ4 compression should not be enabled with cloud fetch
expect(resultHandler.source.source.isLZ4Compressed).to.be.false;

const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
expect(chunk.length).to.be.gt(0);
Expand Down
85 changes: 81 additions & 4 deletions tests/unit/DBSQLSession.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ describe('DBSQLSession', () => {
});

it('should apply defaults for Arrow options', async () => {
case1: {
// case 1
{
const session = new DBSQLSession({
handle: sessionHandleStub,
context: new ClientContextStub({ arrowEnabled: true }),
Expand All @@ -95,7 +96,8 @@ describe('DBSQLSession', () => {
expect(result).instanceOf(DBSQLOperation);
}

case2: {
// case 2
{
const session = new DBSQLSession({
handle: sessionHandleStub,
context: new ClientContextStub({ arrowEnabled: true, useArrowNativeTypes: false }),
Expand Down Expand Up @@ -158,9 +160,14 @@ describe('DBSQLSession', () => {
}

if (version >= TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6) {
expect(req.canDecompressLZ4Result).to.be.true;
// Since cloud fetch is enabled, canDecompressLZ4Result should not be set
if (req.canDownloadResult === true) {
expect(req.canDecompressLZ4Result).to.not.be.true;
} else {
expect(req.canDecompressLZ4Result).to.be.true;
}
} else {
expect(req.canDecompressLZ4Result).to.not.exist;
expect(req.canDecompressLZ4Result).to.not.be.true;
}

if (version >= TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5) {
Expand All @@ -180,6 +187,76 @@ describe('DBSQLSession', () => {
});
}
});

describe('LZ4 compression with cloud fetch', () => {
it('should not set canDecompressLZ4Result when cloud fetch is enabled (canDownloadResult=true)', async () => {
const context = new ClientContextStub({ useLZ4Compression: true });
const driver = sinon.spy(context.driver);
const statement = 'SELECT * FROM table';

// Use V6+ which supports arrow compression
const session = new DBSQLSession({
handle: sessionHandleStub,
context,
serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6,
});

// Execute with cloud fetch enabled
await session.executeStatement(statement, { useCloudFetch: true });

expect(driver.executeStatement.callCount).to.eq(1);
const req = driver.executeStatement.firstCall.args[0];

// canDownloadResult should be true and canDecompressLZ4Result should NOT be set
expect(req.canDownloadResult).to.be.true;
expect(req.canDecompressLZ4Result).to.not.be.true;
});

it('should set canDecompressLZ4Result when cloud fetch is disabled (canDownloadResult=false)', async () => {
const context = new ClientContextStub({ useLZ4Compression: true });
const driver = sinon.spy(context.driver);
const statement = 'SELECT * FROM table';

// Use V6+ which supports arrow compression
const session = new DBSQLSession({
handle: sessionHandleStub,
context,
serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6,
});

// Execute with cloud fetch disabled
await session.executeStatement(statement, { useCloudFetch: false });

expect(driver.executeStatement.callCount).to.eq(1);
const req = driver.executeStatement.firstCall.args[0];

// canDownloadResult should be false and canDecompressLZ4Result should be set
expect(req.canDownloadResult).to.be.false;
expect(req.canDecompressLZ4Result).to.be.true;
});

it('should not set canDecompressLZ4Result when server protocol does not support Arrow compression', async () => {
const context = new ClientContextStub({ useLZ4Compression: true });
const driver = sinon.spy(context.driver);
const statement = 'SELECT * FROM table';

// Use V5 which does not support arrow compression
const session = new DBSQLSession({
handle: sessionHandleStub,
context,
serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5,
});

// Execute with cloud fetch disabled
await session.executeStatement(statement, { useCloudFetch: false });

expect(driver.executeStatement.callCount).to.eq(1);
const req = driver.executeStatement.firstCall.args[0];

// canDecompressLZ4Result should NOT be set regardless of cloud fetch setting
expect(req.canDecompressLZ4Result).to.not.be.true;
});
});
});

describe('getTypeInfo', () => {
Expand Down
Loading