2222import com .google .cloud .spanner .Options .QueryOption ;
2323import com .google .cloud .spanner .Options .ReadOption ;
2424import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
25+ import com .google .common .annotations .VisibleForTesting ;
2526import com .google .common .base .Preconditions ;
2627import com .google .common .collect .ImmutableList ;
2728import com .google .protobuf .Struct ;
3536import java .time .Instant ;
3637import java .util .List ;
3738import java .util .Map ;
39+ import java .util .concurrent .atomic .AtomicBoolean ;
3840import java .util .concurrent .atomic .AtomicReference ;
3941import java .util .concurrent .locks .ReentrantLock ;
4042import javax .annotation .Nullable ;
@@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
5961 @ GuardedBy ("multiplexedSessionLock" )
6062 private final AtomicReference <SessionImpl > multiplexedSessionReference ;
6163
64+ /**
65+ * This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
66+ * executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
67+ */
68+ @ VisibleForTesting
69+ static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean (false );
70+
6271 BatchClientImpl (SessionClient sessionClient , boolean isMultiplexedSessionEnabled ) {
6372 this .sessionClient = checkNotNull (sessionClient );
6473 this .isMultiplexedSessionEnabled = isMultiplexedSessionEnabled ;
@@ -85,7 +94,7 @@ public String getDatabaseRole() {
8594 @ Override
8695 public BatchReadOnlyTransaction batchReadOnlyTransaction (TimestampBound bound ) {
8796 SessionImpl session ;
88- if (isMultiplexedSessionEnabled ) {
97+ if (canUseMultiplexedSession () ) {
8998 session = getMultiplexedSession ();
9099 } else {
91100 session = sessionClient .createSession ();
@@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
131140 batchTransactionId );
132141 }
133142
143+ private boolean canUseMultiplexedSession () {
144+ return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps .get ();
145+ }
146+
134147 private SessionImpl getMultiplexedSession () {
135148 this .multiplexedSessionLock .lock ();
136149 try {
@@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
216229 builder .setPartitionOptions (pbuilder .build ());
217230
218231 final PartitionReadRequest request = builder .build ();
219- PartitionResponse response = rpc .partitionRead (request , options );
220- ImmutableList .Builder <Partition > partitions = ImmutableList .builder ();
221- for (com .google .spanner .v1 .Partition p : response .getPartitionsList ()) {
222- Partition partition =
223- Partition .createReadPartition (
224- p .getPartitionToken (), partitionOptions , table , index , keys , columns , readOptions );
225- partitions .add (partition );
232+ try {
233+ PartitionResponse response = rpc .partitionRead (request , options );
234+ ImmutableList .Builder <Partition > partitions = ImmutableList .builder ();
235+ for (com .google .spanner .v1 .Partition p : response .getPartitionsList ()) {
236+ Partition partition =
237+ Partition .createReadPartition (
238+ p .getPartitionToken (),
239+ partitionOptions ,
240+ table ,
241+ index ,
242+ keys ,
243+ columns ,
244+ readOptions );
245+ partitions .add (partition );
246+ }
247+ return partitions .build ();
248+ } catch (SpannerException e ) {
249+ maybeMarkUnimplementedForPartitionedOps (e );
250+ throw e ;
226251 }
227- return partitions .build ();
228252 }
229253
230254 @ Override
@@ -256,15 +280,27 @@ public List<Partition> partitionQuery(
256280 builder .setPartitionOptions (pbuilder .build ());
257281
258282 final PartitionQueryRequest request = builder .build ();
259- PartitionResponse response = rpc .partitionQuery (request , options );
260- ImmutableList .Builder <Partition > partitions = ImmutableList .builder ();
261- for (com .google .spanner .v1 .Partition p : response .getPartitionsList ()) {
262- Partition partition =
263- Partition .createQueryPartition (
264- p .getPartitionToken (), partitionOptions , statement , queryOptions );
265- partitions .add (partition );
283+ try {
284+ PartitionResponse response = rpc .partitionQuery (request , options );
285+ ImmutableList .Builder <Partition > partitions = ImmutableList .builder ();
286+ for (com .google .spanner .v1 .Partition p : response .getPartitionsList ()) {
287+ Partition partition =
288+ Partition .createQueryPartition (
289+ p .getPartitionToken (), partitionOptions , statement , queryOptions );
290+ partitions .add (partition );
291+ }
292+ return partitions .build ();
293+ } catch (SpannerException e ) {
294+ maybeMarkUnimplementedForPartitionedOps (e );
295+ throw e ;
296+ }
297+ }
298+
299+ void maybeMarkUnimplementedForPartitionedOps (SpannerException spannerException ) {
300+ if (MultiplexedSessionDatabaseClient .verifyErrorMessage (
301+ spannerException , "Partitioned operations are not supported with multiplexed sessions" )) {
302+ unimplementedForPartitionedOps .set (true );
266303 }
267- return partitions .build ();
268304 }
269305
270306 @ Override
0 commit comments