Skip to content

Commit 5ca878f

Browse files
committed
feat: add promise based signatures for execute method
1 parent 7b8a1f7 commit 5ca878f

File tree

4 files changed

+154
-18
lines changed

4 files changed

+154
-18
lines changed

src/batch-transaction.ts

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@ import {PreciseDate} from '@google-cloud/precise-date';
1818
import {promisifyAll} from '@google-cloud/promisify';
1919
import * as extend from 'extend';
2020
import * as is from 'is';
21-
import {ReadRequest, ExecuteSqlRequest, Snapshot} from './transaction';
21+
import {
22+
ExecuteSqlRequest,
23+
ReadCallback,
24+
ReadRequest,
25+
Rows,
26+
RunCallback,
27+
Snapshot,
28+
} from './transaction';
2229
import {google} from '../protos/protos';
2330
import {Session, Database} from '.';
2431
import {
@@ -55,6 +62,12 @@ export type CreateQueryPartitionsCallback = ResourceCallback<
5562
google.spanner.v1.IPartitionResponse
5663
>;
5764

65+
type executeResponse = [
66+
Rows,
67+
google.spanner.v1.ResultSetStats?,
68+
google.spanner.v1.ResultSetMetadata?,
69+
];
70+
5871
/**
5972
* Use a BatchTransaction object to create partitions and read/query against
6073
* your Cloud Spanner database.
@@ -364,12 +377,27 @@ class BatchTransaction extends Snapshot {
364377
* @example <caption>include:samples/batch.js</caption>
365378
* region_tag:spanner_batch_execute_partitions
366379
*/
367-
execute(partition, callback) {
368-
if (is.string(partition.table)) {
369-
this.read(partition.table, partition, callback);
380+
execute(partition: ReadRequest | ExecuteSqlRequest): Promise<executeResponse>;
381+
execute(
382+
partition: ReadRequest | ExecuteSqlRequest,
383+
callback: ReadCallback | RunCallback,
384+
): void;
385+
execute(
386+
partition: ReadRequest | ExecuteSqlRequest,
387+
cb?: ReadCallback | RunCallback,
388+
): void | Promise<executeResponse> {
389+
const isRead = typeof (partition as ReadRequest).table === 'string';
390+
391+
if (isRead) {
392+
this.read(
393+
(partition as ReadRequest).table!,
394+
partition as ReadRequest,
395+
cb as ReadCallback,
396+
);
370397
return;
371398
}
372-
this.run(partition, callback);
399+
400+
this.run(partition as ExecuteSqlRequest, cb as RunCallback);
373401
}
374402
/**
375403
* Executes partition in streaming mode.

test/batch-transaction.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ describe('BatchTransaction', () => {
441441
},
442442
};
443443

444-
it('should make read requests for read partitions', () => {
444+
it('should make read requests for read partitions using callback', () => {
445445
const partition = {table: 'abc'};
446446
const stub = sandbox.stub(batchTransaction, 'read');
447447

@@ -490,6 +490,17 @@ describe('BatchTransaction', () => {
490490
const query = stub.lastCall.args[0];
491491
assert.strictEqual(query, partition);
492492
});
493+
494+
it('should make read requests for read partitions using await', async () => {
495+
const partition = {table: 'abc'};
496+
const stub = sandbox.stub(batchTransaction, 'read');
497+
498+
await batchTransaction.execute(partition);
499+
500+
const [table, options] = stub.lastCall.args;
501+
assert.strictEqual(table, partition.table);
502+
assert.strictEqual(options, partition);
503+
});
493504
});
494505

495506
describe('executeStream', () => {

test/mockserver/mockspanner.ts

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -863,11 +863,102 @@ export class MockSpanner {
863863

864864
streamingRead(call: grpc.ServerWritableStream<protobuf.ReadRequest, {}>) {
865865
this.pushRequest(call.request!, call.metadata);
866-
call.emit(
867-
'error',
868-
createUnimplementedError('StreamingRead is not yet implemented'),
869-
);
870-
call.end();
866+
867+
this.simulateExecutionTime(this.streamingRead.name)
868+
.then(() => {
869+
if (call.request!.transaction) {
870+
const fullTransactionId = `${call.request!.session}/transactions/${
871+
call.request!.transaction.id
872+
}`;
873+
if (this.abortedTransactions.has(fullTransactionId)) {
874+
call.sendMetadata(new Metadata());
875+
call.emit(
876+
'error',
877+
MockSpanner.createTransactionAbortedError(`${fullTransactionId}`),
878+
);
879+
call.end();
880+
return;
881+
}
882+
}
883+
const key = `${call.request!.table}|${JSON.stringify(call.request!.keySet)}`;
884+
const res = this.statementResults.get(key);
885+
if (res) {
886+
if (call.request!.transaction?.begin) {
887+
const txn = this._updateTransaction(
888+
call.request!.session,
889+
call.request!.transaction.begin,
890+
);
891+
if (txn instanceof Error) {
892+
call.sendMetadata(new Metadata());
893+
call.emit('error', txn);
894+
call.end();
895+
return;
896+
}
897+
if (res.type === StatementResultType.RESULT_SET) {
898+
(res.resultSet as protobuf.ResultSet).metadata!.transaction = txn;
899+
}
900+
}
901+
let partialResultSets;
902+
let resumeIndex;
903+
let streamErr;
904+
switch (res.type) {
905+
case StatementResultType.RESULT_SET:
906+
if (Array.isArray(res.resultSet)) {
907+
partialResultSets = res.resultSet;
908+
} else {
909+
partialResultSets = MockSpanner.toPartialResultSets(
910+
res.resultSet,
911+
'NORMAL',
912+
);
913+
}
914+
// Resume on the next index after the last one seen by the client.
915+
resumeIndex =
916+
call.request!.resumeToken.length === 0
917+
? 0
918+
: Number.parseInt(call.request!.resumeToken.toString(), 10) +
919+
1;
920+
for (
921+
let index = resumeIndex;
922+
index < partialResultSets.length;
923+
index++
924+
) {
925+
const streamErr = this.shiftStreamError(
926+
this.executeStreamingSql.name,
927+
index,
928+
);
929+
if (streamErr) {
930+
call.sendMetadata(new Metadata());
931+
call.emit('error', streamErr);
932+
break;
933+
}
934+
call.write(partialResultSets[index]);
935+
}
936+
break;
937+
case StatementResultType.ERROR:
938+
call.sendMetadata(new Metadata());
939+
call.emit('error', res.error);
940+
break;
941+
default:
942+
call.emit(
943+
'error',
944+
new Error(`Unknown StatementResult type: ${res.type}`),
945+
);
946+
}
947+
} else {
948+
call.emit(
949+
'error',
950+
new Error(
951+
`There is no result registered for ${call.request!.table}`,
952+
),
953+
);
954+
}
955+
call.end();
956+
})
957+
.catch(err => {
958+
call.sendMetadata(new Metadata());
959+
call.emit('error', err);
960+
call.end();
961+
});
871962
}
872963

873964
beginTransaction(

test/spanner.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import {TEST_INSTANCE_NAME} from './mockserver/mockinstanceadmin';
4040
import * as mockDatabaseAdmin from './mockserver/mockdatabaseadmin';
4141
import * as sinon from 'sinon';
4242
import {google} from '../protos/protos';
43-
import {ExecuteSqlRequest, RunResponse} from '../src/transaction';
43+
import {ExecuteSqlRequest, ReadRequest, RunResponse} from '../src/transaction';
4444
import {Row} from '../src/partial-result-stream';
4545
import {GetDatabaseOperationsOptions} from '../src/instance';
4646
import {
@@ -420,9 +420,7 @@ describe('Spanner with mock server', () => {
420420
},
421421
});
422422
} catch (e) {
423-
// Ignore the fact that streaming read is unimplemented on the mock
424-
// server. We just want to verify that the correct request is sent.
425-
assert.strictEqual((e as ServiceError).code, Status.UNIMPLEMENTED);
423+
assert.strictEqual((e as ServiceError).code, Status.UNKNOWN);
426424
assert.deepStrictEqual(
427425
(e as RequestIDError).requestID,
428426
`1.${randIdForProcess}.1.1.3.1`,
@@ -583,9 +581,7 @@ describe('Spanner with mock server', () => {
583581
},
584582
});
585583
} catch (e) {
586-
// Ignore the fact that streaming read is unimplemented on the mock
587-
// server. We just want to verify that the correct request is sent.
588-
assert.strictEqual((e as ServiceError).code, Status.UNIMPLEMENTED);
584+
assert.strictEqual((e as ServiceError).code, Status.UNKNOWN);
589585
assert.deepStrictEqual(
590586
(e as RequestIDError).requestID,
591587
`1.${randIdForProcess}.1.1.2.1`,
@@ -3683,6 +3679,16 @@ describe('Spanner with mock server', () => {
36833679
});
36843680
});
36853681

3682+
describe('execute', () => {
3683+
it('should create set of read partitions', async () => {
3684+
const database = newTestDatabase({min: 0, incStep: 1});
3685+
const [transaction] = await database.createBatchTransaction();
3686+
const [partitions] = await transaction.createQueryPartitions(selectSql);
3687+
const [resp] = await transaction.execute(partitions[0]);
3688+
assert.strictEqual(resp.length, 3);
3689+
});
3690+
});
3691+
36863692
describe('pdml', () => {
36873693
it('should retry on aborted error', async () => {
36883694
const database = newTestDatabase();

0 commit comments

Comments
 (0)