Skip to content

Commit 39e577b

Browse files
committed
JVMCBC-1651: Support accessDeleted reads for subdoc replica reads
Sub-document reads from replicas do not support the accessDeleted flag until server 8.0 (MB-63241 and MB-66949). This combination is required for transactions; using them here. Setting accessDeleted with sub-document reads continues to be unsupported from the user-facing API, since this is niche functionality. Change-Id: I36febfe05f9add2456549e6a57614135bb998a1f Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/228666 Tested-by: Build Bot <[email protected]> Reviewed-by: David Nault <[email protected]>
1 parent d086024 commit 39e577b

File tree

7 files changed

+67
-15
lines changed

7 files changed

+67
-15
lines changed

core-io/src/main/java/com/couchbase/client/core/classic/kv/ClassicCoreKvOps.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,8 @@ public Flux<CoreSubdocGetResult> subdocGetAllReplicasReactive(CoreCommonOptions
768768
retryStrategy,
769769
common.clientContext(),
770770
common.parentSpan().orElse(null),
771-
readPreference
771+
readPreference,
772+
(byte) 0
772773
);
773774
}
774775

core-io/src/main/java/com/couchbase/client/core/config/BucketCapabilities.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ public enum BucketCapabilities {
4747
DCP_IGNORE_PURGED_TOMBSTONES("dcp.IgnorePurgedTombstones"),
4848
RANGE_SCAN("rangeScan"),
4949
SUBDOC_READ_REPLICA("subdoc.ReplicaRead"),
50-
NON_DEDUPED_HISTORY("nonDedupedHistory");
50+
NON_DEDUPED_HISTORY("nonDedupedHistory"),
51+
SUBDOC_BINARY_XATTR("subdoc.BinaryXattr"),
52+
// Added in 8.0 (MB-66949)
53+
SUBDOC_ACCESS_DELETED("subdoc.AccessDeleted");
5154

5255
private final String raw;
5356

core-io/src/main/java/com/couchbase/client/core/service/kv/ReplicaHelper.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,15 @@ public static Flux<CoreSubdocGetResult> lookupInAllReplicasReactive(
153153
final RetryStrategy retryStrategy,
154154
Map<String, Object> clientContext,
155155
RequestSpan parentSpan,
156-
CoreReadPreference readPreference
156+
CoreReadPreference readPreference,
157+
byte flags
157158
) {
158159
notNullOrEmpty(documentId, "Id", () -> ReducedKeyValueErrorContext.create(documentId, collectionIdentifier));
159160

160161
CoreEnvironment env = core.context().environment();
161162
RequestSpan getAllSpan = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ALL_REPLICAS, parentSpan);
162163
return Reactor
163-
.toMono(() -> lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, getAllSpan, readPreference))
164+
.toMono(() -> lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, getAllSpan, readPreference, flags))
164165
.flux()
165166
.flatMap(Flux::fromStream)
166167
.flatMap(request -> Reactor
@@ -239,11 +240,12 @@ public static <R> CompletableFuture<List<CompletableFuture<R>>> lookupInAllRepli
239240
final Map<String, Object> clientContext,
240241
final RequestSpan parentSpan,
241242
final CoreReadPreference readPreference,
243+
final byte flags,
242244
final Function<CoreSubdocGetResult, R> responseMapper
243245
) {
244246
RequestSpan getAllSpan = core.context().coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ALL_REPLICAS, parentSpan);
245247

246-
return lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, getAllSpan, readPreference)
248+
return lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, getAllSpan, readPreference, flags)
247249
.thenApply(stream ->
248250
stream.map(request ->
249251
get(core, request)
@@ -312,13 +314,14 @@ public static <R> CompletableFuture<R> lookupInAnyReplicaAsync(
312314
final Map<String, Object> clientContext,
313315
final RequestSpan parentSpan,
314316
final CoreReadPreference readPreference,
317+
final byte flags,
315318
final Function<CoreSubdocGetResult, R> responseMapper) {
316319

317320
RequestSpan getAnySpan = core.context().coreResources().requestTracer()
318321
.requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ANY_REPLICA, parentSpan);
319322

320323
CompletableFuture<List<CompletableFuture<R>>> listOfFutures = lookupInAllReplicasAsync(
321-
core, collectionIdentifier, documentId, commands, timeout, retryStrategy, clientContext, getAnySpan, readPreference, responseMapper
324+
core, collectionIdentifier, documentId, commands, timeout, retryStrategy, clientContext, getAnySpan, readPreference, flags, responseMapper
322325
);
323326

324327
// Aggregating the futures here will discard the individual errors, which we don't need
@@ -476,7 +479,8 @@ public static CompletableFuture<Stream<SubdocGetRequest>> lookupInAllReplicasReq
476479
final RetryStrategy retryStrategy,
477480
final Duration timeout,
478481
final RequestSpan parent,
479-
final CoreReadPreference readPreference
482+
final CoreReadPreference readPreference,
483+
final byte flags
480484
) {
481485
notNullOrEmpty(documentId, "Id");
482486

@@ -490,7 +494,7 @@ public static CompletableFuture<Stream<SubdocGetRequest>> lookupInAllReplicasReq
490494
return failedFuture(FeatureNotAvailableException.subdocReadReplica());
491495
}
492496

493-
List<SubdocGetRequest> requests = lookupInAllReplicasRequestsWithFallback(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, parent, readPreference, topology);
497+
List<SubdocGetRequest> requests = lookupInAllReplicasRequestsWithFallback(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, parent, readPreference, topology, flags);
494498
if (requests.isEmpty()) {
495499
return failedFuture(DocumentUnretrievableException.noReplicasSuitable());
496500
}
@@ -501,7 +505,7 @@ public static CompletableFuture<Stream<SubdocGetRequest>> lookupInAllReplicasReq
501505
final Duration retryDelay = Duration.ofMillis(100);
502506
final CompletableFuture<Stream<SubdocGetRequest>> future = new CompletableFuture<>();
503507
coreContext.environment().timer().schedule(() -> {
504-
lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout.minus(retryDelay), parent, readPreference).whenComplete((getRequestStream, throwable) -> {
508+
lookupInAllReplicasRequests(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout.minus(retryDelay), parent, readPreference, flags).whenComplete((getRequestStream, throwable) -> {
505509
if (throwable != null) {
506510
future.completeExceptionally(throwable);
507511
} else {
@@ -524,15 +528,16 @@ private static List<SubdocGetRequest> lookupInAllReplicasRequestsWithFallback(Co
524528
Duration timeout,
525529
RequestSpan parent,
526530
CoreReadPreference readPreference,
527-
CouchbaseBucketConfig topology) {
531+
CouchbaseBucketConfig topology,
532+
byte flags) {
528533
CoreContext coreContext = core.context();
529534

530535
int numReplicas = topology.numberOfReplicas();
531536
List<SubdocGetRequest> requests = new ArrayList<>(numReplicas + 1);
532537
NodeIndexCalculator allowedNodeIndexes = new NodeIndexCalculator(readPreference, topology, coreContext);
533538
if (allowedNodeIndexes.canUseNodeForActive(documentId)) {
534539
RequestSpan span = coreContext.coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_LOOKUP_IN, parent);
535-
SubdocGetRequest activeRequest = SubdocGetRequest.create(timeout, coreContext, collectionIdentifier, retryStrategy, documentId, (byte) 0, commands, span);
540+
SubdocGetRequest activeRequest = SubdocGetRequest.create(timeout, coreContext, collectionIdentifier, retryStrategy, documentId, flags, commands, span);
536541
activeRequest.context().clientContext(clientContext);
537542
requests.add(activeRequest);
538543
}
@@ -541,15 +546,15 @@ private static List<SubdocGetRequest> lookupInAllReplicasRequestsWithFallback(Co
541546
if (allowedNodeIndexes.canUseNodeForReplica(documentId, replica - 1)) {
542547
RequestSpan replicaSpan = coreContext.coreResources().requestTracer().requestSpan(TracingIdentifiers.SPAN_LOOKUP_IN_ALL_REPLICAS, parent);
543548
ReplicaSubdocGetRequest replicaRequest = ReplicaSubdocGetRequest.create(
544-
timeout, coreContext, collectionIdentifier, retryStrategy, documentId, (byte) 0, commands, replica, replicaSpan
549+
timeout, coreContext, collectionIdentifier, retryStrategy, documentId, flags, commands, replica, replicaSpan
545550
);
546551
replicaRequest.context().clientContext(clientContext);
547552
requests.add(replicaRequest);
548553
}
549554
}
550555
if (requests.isEmpty() && readPreference == CoreReadPreference.PREFERRED_SERVER_GROUP_OR_ALL_AVAILABLE) {
551556
logger.trace("Unable to find suitable replicas with PREFERRED_SERVER_GROUP_WITH_FALLBACK, falling back to NO_PREFERENCE");
552-
return lookupInAllReplicasRequestsWithFallback(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, parent, CoreReadPreference.NO_PREFERENCE, topology);
557+
return lookupInAllReplicasRequestsWithFallback(core, collectionIdentifier, documentId, commands, clientContext, retryStrategy, timeout, parent, CoreReadPreference.NO_PREFERENCE, topology, flags);
553558
}
554559
return requests;
555560
}

core-io/src/main/java/com/couchbase/client/core/topology/BucketCapability.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public enum BucketCapability {
4141
RANGE_SCAN("rangeScan"),
4242
SUBDOC_READ_REPLICA("subdoc.ReplicaRead"),
4343
NON_DEDUPED_HISTORY("nonDedupedHistory"),
44+
SUBDOC_BINARY_XATTR("subdoc.BinaryXattr"),
45+
// Added in 8.0 (MB-66949)
46+
SUBDOC_ACCESS_DELETED("subdoc.AccessDeleted")
4447
;
4548

4649
private final String wireName;

core-io/src/main/java/com/couchbase/client/core/transaction/util/TransactionKVHandler.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
import static com.couchbase.client.core.error.DefaultErrorUtil.keyValueStatusToException;
5555
import static com.couchbase.client.core.msg.kv.SubdocGetRequest.convertCommandsToCore;
56+
import static com.couchbase.client.core.topology.BucketCapability.SUBDOC_ACCESS_DELETED;
5657

5758
/**
5859
* Transactions does a lot of KV work from core-io. This logic is essentially a mini version of java-client, providing
@@ -153,8 +154,20 @@ public static Mono<CoreSubdocGetResult> lookupIn(final Core core,
153154

154155

155156
if (preferredReplicaMode) {
156-
CompletableFuture<CoreSubdocGetResult> replicas = ReplicaHelper.lookupInAnyReplicaAsync(core, collectionIdentifier, id, convertCommandsToCore(commands), timeout, BestEffortRetryStrategy.INSTANCE,
157-
clientContext, pspan == null ? null : pspan.span(), CoreReadPreference.PREFERRED_SERVER_GROUP_OR_ALL_AVAILABLE, (r) -> r);
157+
CompletableFuture<CoreSubdocGetResult> replicas =
158+
BucketConfigUtil.waitForBucketTopology(core, collectionIdentifier.bucket(), timeout).toFuture()
159+
.thenCompose(bucketConfig -> {
160+
byte flags = 0;
161+
if (accessDeleted) {
162+
// We can only accessDeleted when the server supports it (8.0+).
163+
// Otherwise we will proceed with the operation though it is sub-optimal.
164+
if (bucketConfig.bucket().capabilities().contains(SUBDOC_ACCESS_DELETED)) {
165+
flags = SubdocMutateRequest.SUBDOC_DOC_FLAG_ACCESS_DELETED;
166+
}
167+
}
168+
return ReplicaHelper.lookupInAnyReplicaAsync(core, collectionIdentifier, id, convertCommandsToCore(commands), timeout, BestEffortRetryStrategy.INSTANCE,
169+
clientContext, pspan == null ? null : pspan.span(), CoreReadPreference.PREFERRED_SERVER_GROUP_OR_ALL_AVAILABLE, flags, (r) -> r);
170+
});
158171

159172
return Reactor.wrap(replicas, () -> {})
160173
.switchIfEmpty(Mono.error(new DocumentUnretrievableException(ReducedKeyValueErrorContext.create(id, collectionIdentifier))))

core-io/src/main/java/com/couchbase/client/core/util/BucketConfigUtil.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.couchbase.client.core.retry.reactor.Backoff;
2323
import com.couchbase.client.core.retry.reactor.Retry;
2424
import com.couchbase.client.core.retry.reactor.RetryExhaustedException;
25+
import com.couchbase.client.core.topology.ClusterTopologyWithBucket;
26+
import reactor.core.Exceptions;
2527
import reactor.core.publisher.Mono;
2628

2729
import java.time.Duration;
@@ -39,6 +41,7 @@ private BucketConfigUtil() {}
3941
* A bucket config can be null while the bucket has not been opened. This method allows easily for a config to be
4042
* available.
4143
*/
44+
@Deprecated
4245
public static Mono<BucketConfig> waitForBucketConfig(final Core core,
4346
final String bucketName,
4447
final Duration timeout) {
@@ -60,4 +63,26 @@ public static Mono<BucketConfig> waitForBucketConfig(final Core core,
6063
}
6164
});
6265
}
66+
67+
public static Mono<ClusterTopologyWithBucket> waitForBucketTopology(final Core core,
68+
final String bucketName,
69+
final Duration timeout) {
70+
return Mono.fromCallable(() -> {
71+
final ClusterTopologyWithBucket bucketConfig = core.clusterConfig().bucketTopology(bucketName);
72+
if (bucketConfig == null) {
73+
throw new NullPointerException();
74+
}
75+
return bucketConfig;
76+
}).retryWhen(Retry.anyOf(NullPointerException.class)
77+
.timeout(timeout)
78+
.backoff(Backoff.fixed(retryDelay))
79+
.toReactorRetry())
80+
.onErrorResume(err -> {
81+
if (Exceptions.isRetryExhausted(err)) {
82+
return Mono.error(new UnambiguousTimeoutException("Timed out while waiting for bucket config", null));
83+
} else {
84+
return Mono.error(err);
85+
}
86+
});
87+
}
6388
}

java-client/src/main/java/com/couchbase/client/java/AsyncCollection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ public CompletableFuture<List<CompletableFuture<LookupInReplicaResult>>> lookupI
712712
opts.clientContext(),
713713
opts.parentSpan().orElse(null),
714714
opts.readPreference(),
715+
(byte) 0,
715716
response -> LookupInReplicaResult.from(response, serializer));
716717
}
717718

@@ -752,6 +753,7 @@ public CompletableFuture<LookupInReplicaResult> lookupInAnyReplica(final String
752753
opts.clientContext(),
753754
opts.parentSpan().orElse(null),
754755
opts.readPreference(),
756+
(byte) 0,
755757
response -> LookupInReplicaResult.from(response, serializer));
756758
}
757759

0 commit comments

Comments
 (0)