| 
32 | 32 | import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;  | 
33 | 33 | import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;  | 
34 | 34 | import com.google.cloud.spanner.Options.RpcPriority;  | 
 | 35 | +import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;  | 
35 | 36 | import com.google.cloud.spanner.connection.RandomResultSetGenerator;  | 
36 | 37 | import com.google.common.base.Stopwatch;  | 
37 | 38 | import com.google.common.collect.ImmutableList;  | 
38 | 39 | import com.google.protobuf.ByteString;  | 
 | 40 | +import com.google.spanner.v1.BeginTransactionRequest;  | 
39 | 41 | import com.google.spanner.v1.CommitRequest;  | 
40 | 42 | import com.google.spanner.v1.ExecuteSqlRequest;  | 
41 | 43 | import com.google.spanner.v1.RequestOptions.Priority;  | 
 | 
46 | 48 | import java.util.List;  | 
47 | 49 | import java.util.Set;  | 
48 | 50 | import java.util.UUID;  | 
 | 51 | +import java.util.concurrent.atomic.AtomicReference;  | 
49 | 52 | import java.util.stream.Collectors;  | 
50 | 53 | import org.junit.Before;  | 
51 | 54 | import org.junit.BeforeClass;  | 
@@ -526,55 +529,94 @@ public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithInlineBegin() thr  | 
526 | 529 | 
 
  | 
527 | 530 |   // TODO(sriharshach): Uncomment test once Lock order preservation proto is published  | 
528 | 531 |   /*  | 
529 |  | -    @Test  | 
530 |  | -    public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin() throws InterruptedException {  | 
531 |  | -      DatabaseClientImpl client =  | 
532 |  | -          (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));  | 
533 |  | -      // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared  | 
534 |  | -      // after the first call, so the retry should succeed.  | 
535 |  | -      mockSpanner.setCommitExecutionTime(  | 
536 |  | -          SimulatedExecutionTime.ofException(  | 
537 |  | -              mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));  | 
538 |  | -      Thread.sleep(10000);  | 
539 |  | -      TransactionRunner runner = client.readWriteTransaction();  | 
540 |  | -      Long updateCount = runner.run(  | 
541 |  | -          transaction -> {  | 
542 |  | -            // This update statement carries the BeginTransaction, but fails. This will  | 
543 |  | -            // cause the entire transaction to be retried with an explicit  | 
544 |  | -            // BeginTransaction RPC to ensure all statements in the transaction are  | 
545 |  | -            // actually executed against the same transaction.  | 
546 |  | -            SpannerException e =  | 
547 |  | -                assertThrows(  | 
548 |  | -                    SpannerException.class,  | 
549 |  | -                    () -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT));  | 
550 |  | -            assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());  | 
551 |  | -            return transaction.executeUpdate(UPDATE_STATEMENT);  | 
552 |  | -          });  | 
553 |  | -
  | 
554 |  | -      assertThat(updateCount).isEqualTo(1L);  | 
555 |  | -      List<BeginTransactionRequest> beginTransactionRequests = mockSpanner.getRequestsOfType(BeginTransactionRequest.class);  | 
556 |  | -      assertEquals(2, beginTransactionRequests.size());  | 
557 |  | -
  | 
558 |  | -      // Verify the requests are executed using multiplexed sessions  | 
559 |  | -      for (BeginTransactionRequest request : beginTransactionRequests) {  | 
560 |  | -        assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());  | 
561 |  | -      }  | 
562 |  | -
  | 
563 |  | -      // Verify that explicit begin transaction is called during retry, and the previous transaction ID is set to ByteString.EMPTY  | 
564 |  | -      assertTrue(beginTransactionRequests.get(0).hasOptions());  | 
565 |  | -      assertTrue(beginTransactionRequests.get(0).getOptions().hasReadWrite());  | 
566 |  | -      assertNotNull(beginTransactionRequests.get(0).getOptions().getReadWrite()  | 
567 |  | -          .getMultiplexedSessionPreviousTransactionId());  | 
568 |  | -      assertThat(beginTransactionRequests.get(0).getOptions().getReadWrite().getMultiplexedSessionPreviousTransactionId()).isEqualTo(ByteString.EMPTY);  | 
569 |  | -
  | 
570 |  | -      // The previous transaction with id (txn1) fails during commit operation with ABORTED error.  | 
571 |  | -      // Verify that explicit begin transaction is called during retry, and the previous transaction ID is not ByteString.EMPTY (should be set to txn1)  | 
572 |  | -      assertTrue(beginTransactionRequests.get(1).hasOptions());  | 
573 |  | -      assertTrue(beginTransactionRequests.get(1).getOptions().hasReadWrite());  | 
574 |  | -      assertNotNull(beginTransactionRequests.get(1).getOptions().getReadWrite().getMultiplexedSessionPreviousTransactionId());  | 
575 |  | -      assertThat(beginTransactionRequests.get(1).getOptions().getReadWrite().getMultiplexedSessionPreviousTransactionId()).isNotEqualTo(ByteString.EMPTY);  | 
 | 532 | +  @Test  | 
 | 533 | +  public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin()  | 
 | 534 | +      throws InterruptedException {  | 
 | 535 | +    DatabaseClientImpl client =  | 
 | 536 | +        (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));  | 
 | 537 | +    // Force the Commit RPC to return Aborted the first time it is called. The exception is cleared  | 
 | 538 | +    // after the first call, so the retry should succeed.  | 
 | 539 | +    mockSpanner.setCommitExecutionTime(  | 
 | 540 | +        SimulatedExecutionTime.ofException(  | 
 | 541 | +            mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));  | 
 | 542 | +    Thread.sleep(10000);  | 
 | 543 | +    TransactionRunner runner = client.readWriteTransaction();  | 
 | 544 | +    AtomicReference<ByteString> validTransactionId = new AtomicReference<>();  | 
 | 545 | +    Long updateCount =  | 
 | 546 | +        runner.run(  | 
 | 547 | +            transaction -> {  | 
 | 548 | +              // This update statement carries the BeginTransaction, but fails. This will  | 
 | 549 | +              // cause the entire transaction to be retried with an explicit  | 
 | 550 | +              // BeginTransaction RPC to ensure all statements in the transaction are  | 
 | 551 | +              // actually executed against the same transaction.  | 
 | 552 | +              TransactionContextImpl impl = (TransactionContextImpl) transaction;  | 
 | 553 | +              if (validTransactionId.get() == null) {  | 
 | 554 | +                // Track the first not-null transactionId. This transaction gets ABORTED during  | 
 | 555 | +                // commit operation and gets retried.  | 
 | 556 | +                validTransactionId.set(impl.transactionId);  | 
 | 557 | +              }  | 
 | 558 | +              SpannerException e =  | 
 | 559 | +                  assertThrows(  | 
 | 560 | +                      SpannerException.class,  | 
 | 561 | +                      () -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT));  | 
 | 562 | +              assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());  | 
 | 563 | +              return transaction.executeUpdate(UPDATE_STATEMENT);  | 
 | 564 | +            });  | 
 | 565 | +
  | 
 | 566 | +    assertThat(updateCount).isEqualTo(1L);  | 
 | 567 | +    List<BeginTransactionRequest> beginTransactionRequests =  | 
 | 568 | +        mockSpanner.getRequestsOfType(BeginTransactionRequest.class);  | 
 | 569 | +    assertEquals(2, beginTransactionRequests.size());  | 
 | 570 | +
  | 
 | 571 | +    // Verify the requests are executed using multiplexed sessions  | 
 | 572 | +    for (BeginTransactionRequest request : beginTransactionRequests) {  | 
 | 573 | +      assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());  | 
576 | 574 |     }  | 
577 |  | -  */  | 
 | 575 | +
  | 
 | 576 | +    // Verify that explicit begin transaction is called during retry, and the previous transaction  | 
 | 577 | +    // ID is set to ByteString.EMPTY  | 
 | 578 | +    assertTrue(beginTransactionRequests.get(0).hasOptions());  | 
 | 579 | +    assertTrue(beginTransactionRequests.get(0).getOptions().hasReadWrite());  | 
 | 580 | +    assertNotNull(  | 
 | 581 | +        beginTransactionRequests  | 
 | 582 | +            .get(0)  | 
 | 583 | +            .getOptions()  | 
 | 584 | +            .getReadWrite()  | 
 | 585 | +            .getMultiplexedSessionPreviousTransactionId());  | 
 | 586 | +    assertEquals(  | 
 | 587 | +        ByteString.EMPTY,  | 
 | 588 | +        beginTransactionRequests  | 
 | 589 | +            .get(0)  | 
 | 590 | +            .getOptions()  | 
 | 591 | +            .getReadWrite()  | 
 | 592 | +            .getMultiplexedSessionPreviousTransactionId());  | 
 | 593 | +
  | 
 | 594 | +    // The previous transaction with id (txn1) fails during commit operation with ABORTED error.  | 
 | 595 | +    // Verify that explicit begin transaction is called during retry, and the previous transaction  | 
 | 596 | +    // ID is not ByteString.EMPTY (should be set to txn1)  | 
 | 597 | +    assertTrue(beginTransactionRequests.get(1).hasOptions());  | 
 | 598 | +    assertTrue(beginTransactionRequests.get(1).getOptions().hasReadWrite());  | 
 | 599 | +    assertNotNull(  | 
 | 600 | +        beginTransactionRequests  | 
 | 601 | +            .get(1)  | 
 | 602 | +            .getOptions()  | 
 | 603 | +            .getReadWrite()  | 
 | 604 | +            .getMultiplexedSessionPreviousTransactionId());  | 
 | 605 | +    assertNotEquals(  | 
 | 606 | +        ByteString.EMPTY,  | 
 | 607 | +        beginTransactionRequests  | 
 | 608 | +            .get(1)  | 
 | 609 | +            .getOptions()  | 
 | 610 | +            .getReadWrite()  | 
 | 611 | +            .getMultiplexedSessionPreviousTransactionId());  | 
 | 612 | +    assertEquals(  | 
 | 613 | +        validTransactionId.get(),  | 
 | 614 | +        beginTransactionRequests  | 
 | 615 | +            .get(1)  | 
 | 616 | +            .getOptions()  | 
 | 617 | +            .getReadWrite()  | 
 | 618 | +            .getMultiplexedSessionPreviousTransactionId());  | 
 | 619 | +  }*/  | 
578 | 620 | 
 
  | 
579 | 621 |   private void waitForSessionToBeReplaced(DatabaseClientImpl client) {  | 
580 | 622 |     assertNotNull(client.multiplexedSessionDatabaseClient);  | 
 | 
0 commit comments