Skip to content

Commit cdd2ee9

Browse files
authored
feat: new transaction feature (#1239)
* Move the state up, make it protected Transaction state should move up to the super class * Build new transaction options method Add a method for building transaction options. Also add a method for detecting if a request is a transaction. Build new transaction options * Correct the fn that builds transactions Corrected getTransactionRequest. Build the request options properly. * Set transaction to readOnly in system test. When this system test isn’t set to be a readOnly transaction then the error occurs which says too much contention. * Change #parseRunSuccess Change parseRunSuccess so that it can be used more universally. * Move parseRunSuccess up to protected parseRunSuccess should move up to the super class because it now needs to be used there. * Saves the transaction id This change saves the transaction id returned from the server for read calls * Add tests for testing read only transactions Read only tests are needed * Add tests that measure read time Make sure that code using the new transaction option has better performance than code that doesn’t have it. * ran linter * Add a test for testing requests Use the MockedTransactionWrapper to test requests being passed into the Gapic layer. * Final changes to make test work Mock out begin transaction. Test for newTransaction consistency type. Mock out commit. * Add the transaction.run test A transaction.run test is needed for lookup, lookup, put, commit. * run linter * runQuery, lookup, put, commit Add a test for this sequence of operations and ensure it works properly. * runAggregationQuery, lookup, put, commit Four operations that get all the results for running an aggregation query. Adding another test for these four operations. * put, put, lookup, commit Last test suite regarding new transaction unit tests. * Add tests for the commits Add a bunch of tests for the commit case. Check the commit gapic input. * Add testing to ensure begin tx is called Begin transaction should be called at least once. Add code here to increment the counter. * Document #blockWithMutex Remove commented code too. * Document transaction state * feat: new transaction feature branch * Add a check for expired Check for expired on most functions and write tests for the expired check. * Add commit and rollback blocks Check for expired state in the commit and rollback blocks. * run the linter * Don’t allow readtime to be specified in a txn Matches the python client. * throw error for both read time and consistency * Add test for specifying readtime * Remove the console logs * Refactor the test * Improve test to make sure Gapic layer isn’t called Also, fix bug with read consistency. * Run linter Reorganize tests New test for readtime and consistency * Make change to allow the runAggregationQuery go up * Should error when get is used * Move tests over and refactor initialized datastore * Remove only and remove imports * Introduce parameterized testing for errors * Use parameters in parameterized tests * Migrate error tests to parameterized testing * Change description * Prepare second describe block for tests * Revert "Prepare second describe block for tests" This reverts commit 974117b. * Always return after the error is sent back This ensures that an extra call does not get made to the server. * Add headers * Tests and implementation for expired on rollback * fix test * Wrap rollbacks with a withBeginTransaction. * Update the test so that it begins the tx before * Throw error if transaction not started on rollback * Remove only * Add a comment to the test regarding new txn.run() * Ensure that the errors get bubbled up * Run. linter * read time and consistency error * Remove unnecessary change * Eliminate the call to withBeginTransaction * Throw error reported in documentation * Work on streaming errors Fix the concurrency tests and move the error reporting outside of the stream, add two tests for the streams to make sure errors get reported. * Move the error throwing up the stack This makes it so that it is reported in the streams. * Fix linting errors * Generate unit tests for getTransactionRequest I am going to rewrite the getTransactionRequest function so it is a good idea not to change how it works. * Generate unit tests Fix the compiler error Change code so that when readOnly is specified then readwrite options are ignored. Fix the unit test to adopt this change. * Make function one ternary operation Add comments, simplify logic in the function. * Add two comments * Update the comment * Run the linter
1 parent 91237e0 commit cdd2ee9

File tree

9 files changed

+1760
-206
lines changed

9 files changed

+1760
-206
lines changed

src/request.ts

Lines changed: 195 additions & 39 deletions
Large diffs are not rendered by default.

src/transaction.ts

Lines changed: 66 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,34 @@
1515
*/
1616

1717
import {promisifyAll} from '@google-cloud/promisify';
18-
import arrify = require('arrify');
1918
import {CallOptions} from 'google-gax';
2019

2120
import {google} from '../protos/protos';
2221

2322
import {Datastore, TransactionOptions} from '.';
24-
import {entity, Entity, Entities} from './entity';
23+
import {Entities, Entity, entity} from './entity';
2524
import {
2625
Query,
2726
RunQueryCallback,
28-
RunQueryInfo,
2927
RunQueryOptions,
3028
RunQueryResponse,
3129
} from './query';
3230
import {
3331
CommitCallback,
3432
CommitResponse,
35-
DatastoreRequest,
36-
RequestOptions,
37-
PrepareEntityObjectResponse,
3833
CreateReadStreamOptions,
39-
GetResponse,
34+
DatastoreRequest,
4035
GetCallback,
36+
GetResponse,
37+
getTransactionRequest,
38+
PrepareEntityObjectResponse,
4139
RequestCallback,
40+
transactionExpiredError,
41+
TransactionState,
4242
} from './request';
4343
import {AggregateQuery} from './aggregate';
4444
import {Mutex} from 'async-mutex';
45+
import arrify = require('arrify');
4546

4647
/*
4748
* This type matches the value returned by the promise in the
@@ -53,11 +54,6 @@ interface BeginAsyncResponse {
5354
resp?: google.datastore.v1.IBeginTransactionResponse;
5455
}
5556

56-
enum TransactionState {
57-
NOT_STARTED,
58-
IN_PROGRESS, // IN_PROGRESS currently tracks the expired state as well
59-
}
60-
6157
/**
6258
* A transaction is a set of Datastore operations on one or more entities. Each
6359
* transaction is guaranteed to be atomic, which means that transactions are
@@ -85,7 +81,6 @@ class Transaction extends DatastoreRequest {
8581
modifiedEntities_: ModifiedEntities;
8682
skipCommit?: boolean;
8783
#mutex = new Mutex();
88-
#state = TransactionState.NOT_STARTED;
8984
constructor(datastore: Datastore, options?: TransactionOptions) {
9085
super();
9186
/**
@@ -115,6 +110,7 @@ class Transaction extends DatastoreRequest {
115110

116111
// Queue the requests to make when we send the transactional commit.
117112
this.requests_ = [];
113+
this.state = TransactionState.NOT_STARTED;
118114
}
119115

120116
/*! Developer Documentation
@@ -177,6 +173,10 @@ class Transaction extends DatastoreRequest {
177173
: () => {};
178174
const gaxOptions =
179175
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
176+
if (this.state === TransactionState.EXPIRED) {
177+
callback(new Error(transactionExpiredError));
178+
return;
179+
}
180180
// This ensures that the transaction is started before calling runCommit
181181
this.#withBeginTransaction(
182182
gaxOptions,
@@ -355,13 +355,9 @@ class Transaction extends DatastoreRequest {
355355
const callback =
356356
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
357357
// This ensures that the transaction is started before calling get
358-
this.#withBeginTransaction(
359-
options.gaxOptions,
360-
() => {
361-
super.get(keys, options, callback);
362-
},
363-
callback
364-
);
358+
this.#blockWithMutex(() => {
359+
super.get(keys, options, callback);
360+
});
365361
}
366362

367363
/**
@@ -434,6 +430,14 @@ class Transaction extends DatastoreRequest {
434430
const callback =
435431
typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!;
436432

433+
if (this.state === TransactionState.EXPIRED) {
434+
callback(new Error(transactionExpiredError));
435+
return;
436+
}
437+
if (this.state === TransactionState.NOT_STARTED) {
438+
callback(new Error('Transaction is not started'));
439+
return;
440+
}
437441
this.request_(
438442
{
439443
client: 'DatastoreClient',
@@ -442,6 +446,7 @@ class Transaction extends DatastoreRequest {
442446
},
443447
(err, resp) => {
444448
this.skipCommit = true;
449+
this.state = TransactionState.EXPIRED;
445450
callback(err || null, resp);
446451
}
447452
);
@@ -511,7 +516,7 @@ class Transaction extends DatastoreRequest {
511516
const callback =
512517
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
513518
this.#mutex.runExclusive(async () => {
514-
if (this.#state === TransactionState.NOT_STARTED) {
519+
if (this.state === TransactionState.NOT_STARTED) {
515520
const runResults = await this.#beginTransactionAsync(options);
516521
this.#processBeginResults(runResults, callback);
517522
} else {
@@ -635,6 +640,7 @@ class Transaction extends DatastoreRequest {
635640
return;
636641
}
637642

643+
this.state = TransactionState.EXPIRED;
638644
// The `callbacks` array was built previously. These are the callbacks
639645
// that handle the API response normally when using the
640646
// DatastoreRequest.save and .delete methods.
@@ -666,24 +672,11 @@ class Transaction extends DatastoreRequest {
666672
if (err) {
667673
callback(err, null, resp);
668674
} else {
669-
this.#parseRunSuccess(runResults);
675+
this.parseTransactionResponse(resp);
670676
callback(null, this, resp);
671677
}
672678
}
673679

674-
/**
675-
* This function saves results from a successful beginTransaction call.
676-
*
677-
* @param {BeginAsyncResponse} [response] The response from a call to
678-
* begin a transaction that completed successfully.
679-
*
680-
**/
681-
#parseRunSuccess(runResults: BeginAsyncResponse) {
682-
const resp = runResults.resp;
683-
this.id = resp!.transaction;
684-
this.#state = TransactionState.IN_PROGRESS;
685-
}
686-
687680
/**
688681
* This async function makes a beginTransaction call and returns a promise with
689682
* the information returned from the call that was made.
@@ -696,24 +689,10 @@ class Transaction extends DatastoreRequest {
696689
async #beginTransactionAsync(
697690
options: RunOptions
698691
): Promise<BeginAsyncResponse> {
699-
const reqOpts: RequestOptions = {
700-
transactionOptions: {},
701-
};
702-
703-
if (options.readOnly || this.readOnly) {
704-
reqOpts.transactionOptions!.readOnly = {};
705-
}
706-
707-
if (options.transactionId || this.id) {
708-
reqOpts.transactionOptions!.readWrite = {
709-
previousTransaction: options.transactionId || this.id,
710-
};
711-
}
712-
713-
if (options.transactionOptions) {
714-
reqOpts.transactionOptions = options.transactionOptions;
715-
}
716692
return new Promise((resolve: (value: BeginAsyncResponse) => void) => {
693+
const reqOpts = {
694+
transactionOptions: getTransactionRequest(this, options),
695+
};
717696
this.request_(
718697
{
719698
client: 'DatastoreClient',
@@ -766,13 +745,9 @@ class Transaction extends DatastoreRequest {
766745
const callback =
767746
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
768747
// This ensures that the transaction is started before calling runAggregationQuery
769-
this.#withBeginTransaction(
770-
options.gaxOptions,
771-
() => {
772-
super.runAggregationQuery(query, options, callback);
773-
},
774-
callback
775-
);
748+
this.#blockWithMutex(() => {
749+
super.runAggregationQuery(query, options, callback);
750+
});
776751
}
777752

778753
/**
@@ -805,13 +780,9 @@ class Transaction extends DatastoreRequest {
805780
const callback =
806781
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
807782
// This ensures that the transaction is started before calling runQuery
808-
this.#withBeginTransaction(
809-
options.gaxOptions,
810-
() => {
811-
super.runQuery(query, options, callback);
812-
},
813-
callback
814-
);
783+
this.#blockWithMutex(() => {
784+
super.runQuery(query, options, callback);
785+
});
815786
}
816787

817788
/**
@@ -1019,7 +990,7 @@ class Transaction extends DatastoreRequest {
1019990
* @param {CallOptions | undefined} [gaxOptions] Gax options provided by the
1020991
* user that are used for the beginTransaction grpc call.
1021992
* @param {function} [fn] A function which is run after ensuring a
1022-
* beginTransaction call is made.
993+
* transaction has begun.
1023994
* @param {function} [callback] A callback provided by the user that expects
1024995
* an error in the first argument and a custom data type for the rest of the
1025996
* arguments.
@@ -1031,10 +1002,10 @@ class Transaction extends DatastoreRequest {
10311002
callback: (...args: [Error | null, ...T] | [Error | null]) => void
10321003
): void {
10331004
(async () => {
1034-
if (this.#state === TransactionState.NOT_STARTED) {
1005+
if (this.state === TransactionState.NOT_STARTED) {
10351006
try {
10361007
await this.#mutex.runExclusive(async () => {
1037-
if (this.#state === TransactionState.NOT_STARTED) {
1008+
if (this.state === TransactionState.NOT_STARTED) {
10381009
// This sends an rpc call to get the transaction id
10391010
const runResults = await this.#beginTransactionAsync({
10401011
gaxOptions,
@@ -1044,7 +1015,7 @@ class Transaction extends DatastoreRequest {
10441015
// Do not call the wrapped function.
10451016
throw runResults.err;
10461017
}
1047-
this.#parseRunSuccess(runResults);
1018+
this.parseTransactionResponse(runResults.resp);
10481019
// The rpc saving the transaction id was successful.
10491020
// Now the wrapped function fn will be called.
10501021
}
@@ -1057,6 +1028,31 @@ class Transaction extends DatastoreRequest {
10571028
return fn();
10581029
})();
10591030
}
1031+
1032+
/*
1033+
* Some rpc calls require that the transaction has been started (i.e, has a
1034+
* valid id) before they can be sent. #withBeginTransaction acts as a wrapper
1035+
* over those functions.
1036+
*
1037+
* If the transaction has not begun yet, `#blockWithMutex` will call the
1038+
* wrapped function which will begin the transaction in the rpc call it sends.
1039+
* If the transaction has begun, the wrapped function will be called, but it
1040+
* will not begin a transaction.
1041+
*
1042+
* @param {function} [fn] A function which is run after ensuring a
1043+
* transaction has begun.
1044+
*/
1045+
#blockWithMutex(fn: () => void) {
1046+
(async () => {
1047+
if (this.state === TransactionState.NOT_STARTED) {
1048+
await this.#mutex.runExclusive(async () => {
1049+
fn();
1050+
});
1051+
} else {
1052+
fn();
1053+
}
1054+
})();
1055+
}
10601056
}
10611057

10621058
export type ModifiedEntities = Array<{

0 commit comments

Comments
 (0)