@@ -258,6 +258,10 @@ class StatusOnlyResultSetSource : public spanner::ResultSourceInterface {
258258 absl::optional<google::spanner::v1::ResultSetStats> Stats () const override {
259259 return {};
260260 }
261+ absl::optional<google::spanner::v1::MultiplexedSessionPrecommitToken>
262+ PrecommitToken () const override {
263+ return absl::nullopt ;
264+ }
261265
262266 private:
263267 google::cloud::Status status_;
@@ -298,6 +302,14 @@ class DmlResultSetSource : public spanner::ResultSourceInterface {
298302 return {};
299303 }
300304
305+ absl::optional<google::spanner::v1::MultiplexedSessionPrecommitToken>
306+ PrecommitToken () const override {
307+ if (result_set_.has_precommit_token ()) {
308+ return result_set_.precommit_token ();
309+ }
310+ return absl::nullopt ;
311+ }
312+
301313 private:
302314 google::spanner::v1::ResultSet result_set_;
303315};
@@ -358,6 +370,15 @@ absl::variant<Status, spanner::BatchedCommitResult> FromProto(
358370 return result;
359371}
360372
373+ template <typename T>
374+ absl::optional<T> GetRandomElement (protobuf::RepeatedPtrField<T> const & m) {
375+ if (m.empty ()) return absl::nullopt ;
376+ std::uniform_int_distribution<decltype (m.size ())> d (0 , m.size () - 1 );
377+ auto rng = internal::MakeDefaultPRNG ();
378+ auto index = d (rng);
379+ return m[index];
380+ }
381+
361382} // namespace
362383
363384using ::google::cloud::Idempotency;
@@ -539,12 +560,16 @@ std::shared_ptr<SpannerStub> ConnectionImpl::GetStubBasedOnSessionMode(
539560 *
540561 * @param session identifies the Session to use.
541562 * @param options `TransactionOptions` to use in the request.
563+ * @param mutation Required for read-write transactions on a multiplexed session
564+ * that commit mutations but do not perform any reads or queries. Should be
565+ * selected at random from mutations.
542566 * @param func identifies the calling function for logging purposes.
543567 * It should generally be passed the value of `__func__`.
544568 */
545569StatusOr<google::spanner::v1::Transaction> ConnectionImpl::BeginTransaction (
546570 SessionHolder& session, google::spanner::v1::TransactionOptions options,
547- std::string request_tag, TransactionContext& ctx, char const * func) {
571+ std::string request_tag, TransactionContext& ctx,
572+ absl::optional<google::spanner::v1::Mutation> mutation, char const * func) {
548573 google::spanner::v1::BeginTransactionRequest begin;
549574 begin.set_session (session->session_name ());
550575 *begin.mutable_options () = std::move (options);
@@ -553,6 +578,9 @@ StatusOr<google::spanner::v1::Transaction> ConnectionImpl::BeginTransaction(
553578 // the transaction instead.
554579 begin.mutable_request_options ()->set_request_tag (std::move (request_tag));
555580 begin.mutable_request_options ()->set_transaction_tag (ctx.tag );
581+ if (mutation) {
582+ *begin.mutable_mutation_key () = *mutation;
583+ }
556584
557585 auto stub = GetStubBasedOnSessionMode (*session, ctx);
558586 auto const & current = internal::CurrentOptions ();
@@ -571,9 +599,20 @@ StatusOr<google::spanner::v1::Transaction> ConnectionImpl::BeginTransaction(
571599 if (IsSessionNotFound (status)) session->set_bad ();
572600 return status;
573601 }
602+
603+ if (response->has_precommit_token ()) {
604+ ctx.precommit_token = response->precommit_token ();
605+ }
574606 return *response;
575607}
576608
609+ StatusOr<google::spanner::v1::Transaction> ConnectionImpl::BeginTransaction (
610+ SessionHolder& session, google::spanner::v1::TransactionOptions options,
611+ std::string request_tag, TransactionContext& ctx, char const * func) {
612+ return BeginTransaction (session, std::move (options), std::move (request_tag),
613+ ctx, absl::nullopt , func);
614+ }
615+
577616spanner::RowStream ConnectionImpl::ReadImpl (
578617 SessionHolder& session,
579618 StatusOr<google::spanner::v1::TransactionSelector>& selector,
@@ -645,6 +684,9 @@ spanner::RowStream ConnectionImpl::ReadImpl(
645684 factory, Idempotency::kIdempotent , RetryPolicyPrototype ()->clone (),
646685 BackoffPolicyPrototype ()->clone ());
647686 auto reader = PartialResultSetSource::Create (std::move (rpc));
687+ if (reader.ok ()) {
688+ ctx.precommit_token = (*reader)->PrecommitToken ();
689+ }
648690 if (selector->has_begin ()) {
649691 if (reader.ok ()) {
650692 auto metadata = (*reader)->Metadata ();
@@ -803,6 +845,9 @@ StatusOr<ResultType> ConnectionImpl::ExecuteSqlImpl(
803845
804846 for (;;) {
805847 auto reader = retry_resume_fn (request);
848+ if (reader.ok ()) {
849+ ctx.precommit_token = (*reader)->PrecommitToken ();
850+ }
806851 if (selector->has_begin ()) {
807852 if (reader.ok ()) {
808853 auto metadata = (*reader)->Metadata ();
@@ -1097,6 +1142,9 @@ StatusOr<spanner::BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
10971142 return stub->ExecuteBatchDml (context, options, request);
10981143 },
10991144 current, request, __func__);
1145+ if (response.ok () && response->has_precommit_token ()) {
1146+ ctx.precommit_token = response->precommit_token ();
1147+ }
11001148 if (selector->has_begin ()) {
11011149 if (response.ok () && response->result_sets_size () > 0 ) {
11021150 if (!response->result_sets (0 ).metadata ().has_transaction ()) {
@@ -1210,8 +1258,16 @@ StatusOr<spanner::CommitResult> ConnectionImpl::CommitImpl(
12101258 break ;
12111259 }
12121260 case google::spanner::v1::TransactionSelector::kBegin : {
1261+ absl::optional<google::spanner::v1::Mutation> mutation = absl::nullopt ;
1262+ if (session->is_multiplexed ()) {
1263+ // Commit requests containing Mutations on multiplexed sessions require
1264+ // a random mutation key in order for the service to generate a
1265+ // precommit token.
1266+ mutation = GetRandomElement (request.mutations ());
1267+ }
1268+
12131269 auto begin = BeginTransaction (session, selector->begin (), std::string (),
1214- ctx, __func__);
1270+ ctx, std::move (mutation), __func__);
12151271 if (!begin.ok ()) {
12161272 selector = begin.status (); // invalidate the transaction
12171273 return begin.status ();
@@ -1231,20 +1287,47 @@ StatusOr<spanner::CommitResult> ConnectionImpl::CommitImpl(
12311287
12321288 auto stub = GetStubBasedOnSessionMode (*session, ctx);
12331289 auto const & current = internal::CurrentOptions ();
1234- auto response = RetryLoop (
1235- RetryPolicyPrototype (current)->clone (),
1236- BackoffPolicyPrototype (current)->clone (), Idempotency::kIdempotent ,
1237- [&stub](grpc::ClientContext& context, Options const & options,
1238- google::spanner::v1::CommitRequest const & request) {
1239- RouteToLeader (context); // always for Commit()
1240- return stub->Commit (context, options, request);
1241- },
1242- current, request, __func__);
1243- if (!response) {
1244- auto status = std::move (response).status ();
1245- if (IsSessionNotFound (status)) session->set_bad ();
1246- return status;
1247- }
1290+
1291+ char const * calling_func = __func__;
1292+ auto retry_loop_fn =
1293+ [&, func = std::move (calling_func)](
1294+ absl::optional<
1295+ google::spanner::v1::MultiplexedSessionPrecommitToken> const &
1296+ token) {
1297+ if (token.has_value ()) {
1298+ *request.mutable_precommit_token () = *token;
1299+ }
1300+
1301+ return RetryLoop (
1302+ RetryPolicyPrototype (current)->clone (),
1303+ BackoffPolicyPrototype (current)->clone (), Idempotency::kIdempotent ,
1304+ [&stub](grpc::ClientContext& context, Options const & options,
1305+ google::spanner::v1::CommitRequest const & request) {
1306+ RouteToLeader (context); // always for Commit()
1307+ return stub->Commit (context, options, request);
1308+ },
1309+ current, request, func);
1310+ };
1311+
1312+ // If the CommitResponse contains a precommit token, it's a signal from the
1313+ // SpannerFE that it wants us to retry the commit with the new token. It is
1314+ // technically possible for this to occur more than 0 or 1 times, but however
1315+ // unlikely, the SDK has to account for the possibility. Additionally, the
1316+ // mutations do not need to be sent on retries, saving some resources.
1317+ decltype (retry_loop_fn (ctx.precommit_token )) response;
1318+ do {
1319+ response = retry_loop_fn (ctx.precommit_token );
1320+ if (!response) {
1321+ auto status = std::move (response).status ();
1322+ if (IsSessionNotFound (status)) session->set_bad ();
1323+ return status;
1324+ }
1325+ if (response->has_precommit_token ()) {
1326+ ctx.precommit_token = response->precommit_token ();
1327+ request.mutable_mutations ()->Clear ();
1328+ }
1329+ } while (response->has_precommit_token ());
1330+
12481331 spanner::CommitResult r;
12491332 r.commit_timestamp = MakeTimestamp (response->commit_timestamp ());
12501333 if (response->has_commit_stats ()) {
0 commit comments