2020import com .couchbase .client .core .Reactor ;
2121import com .couchbase .client .core .annotation .Stability ;
2222import com .couchbase .client .core .api .kv .CoreKvParamValidators ;
23- import com .couchbase .client .core .config .BucketCapabilities ;
24- import com .couchbase .client .core .config .BucketConfig ;
25- import com .couchbase .client .core .config .CouchbaseBucketConfig ;
2623import com .couchbase .client .core .error .AuthenticationFailureException ;
2724import com .couchbase .client .core .error .CollectionNotFoundException ;
2825import com .couchbase .client .core .error .CouchbaseException ;
3835import com .couchbase .client .core .io .CollectionIdentifier ;
3936import com .couchbase .client .core .msg .CancellationReason ;
4037import com .couchbase .client .core .msg .ResponseStatus ;
41- import com .couchbase .client .core .msg .kv .MutationToken ;
4238import com .couchbase .client .core .msg .kv .RangeScanCancelRequest ;
4339import com .couchbase .client .core .msg .kv .RangeScanContinueRequest ;
4440import com .couchbase .client .core .msg .kv .RangeScanCreateRequest ;
41+ import com .couchbase .client .core .topology .BucketCapability ;
42+ import com .couchbase .client .core .topology .ClusterTopologyWithBucket ;
43+ import com .couchbase .client .core .topology .CouchbaseBucketTopology ;
4544import reactor .core .Exceptions ;
4645import reactor .core .publisher .Flux ;
4746import reactor .core .publisher .Mono ;
5150import java .time .Duration ;
5251import java .util .ArrayList ;
5352import java .util .List ;
54- import java .util .Map ;
5553import java .util .concurrent .atomic .AtomicBoolean ;
5654import java .util .concurrent .atomic .AtomicLong ;
5755import java .util .concurrent .atomic .AtomicReference ;
5856import java .util .function .BiFunction ;
5957
58+ import static com .couchbase .client .core .util .BucketConfigUtil .waitForBucketTopology ;
6059import static com .couchbase .client .core .util .Validators .notNull ;
6160import static java .nio .charset .StandardCharsets .UTF_8 ;
6261
@@ -72,26 +71,9 @@ public class RangeScanOrchestrator {
7271 public static final int RANGE_SCAN_DEFAULT_BATCH_BYTE_LIMIT = 15000 ;
7372 public static final int RANGE_SCAN_DEFAULT_BATCH_ITEM_LIMIT = 50 ;
7473
75- /**
76- * Holds the reference to core.
77- */
7874 private final Core core ;
79-
80- /**
81- * Holds the pointer to the keyspace that should be used.
82- */
8375 private final CollectionIdentifier collectionIdentifier ;
8476
85- /**
86- * Stores the current configuration if already set.
87- */
88- private volatile BucketConfig currentBucketConfig ;
89-
90- /**
91- * Checks if run against older clusters the feature might not be supported.
92- */
93- private volatile boolean capabilityEnabled = false ;
94-
9577 /**
9678 * Creates a new {@link RangeScanOrchestrator} which can be shared across calls.
9779 *
@@ -101,86 +83,77 @@ public class RangeScanOrchestrator {
10183 public RangeScanOrchestrator (final Core core , final CollectionIdentifier collectionIdentifier ) {
10284 this .core = notNull (core , "Core" );
10385 this .collectionIdentifier = notNull (collectionIdentifier , "CollectionIdentifier" );
86+ }
10487
105- core .configurationProvider ().configs ().subscribe (cc -> {
106- BucketConfig bucketConfig = cc .bucketConfig (collectionIdentifier .bucket ());
107- if (bucketConfig != null ) {
108- currentBucketConfig = bucketConfig ;
109- capabilityEnabled = bucketConfig .bucketCapabilities ().contains (BucketCapabilities .RANGE_SCAN );
110- }
111- });
88+ private Mono <CouchbaseBucketTopology > currentBucketTopology (CoreScanOptions options ) {
89+ Duration scanTimeout = options .commonOptions ().timeout ().orElse (core .environment ().timeoutConfig ().kvScanTimeout ());
90+ Duration timeout = min (scanTimeout , core .environment ().timeoutConfig ().connectTimeout ()); // default scan timeout is 75 seconds; don't want to wait that long!
91+
92+ return waitForBucketTopology (core , collectionIdentifier .bucket (), timeout )
93+ .map (ClusterTopologyWithBucket ::bucket )
94+ .cast (CouchbaseBucketTopology .class )
95+ .onErrorMap (ClassCastException .class , t -> new FeatureNotAvailableException ("Only Couchbase buckets are supported with KV Range Scan" , t ))
96+
97+ .filter (bucket -> bucket .hasCapability (BucketCapability .RANGE_SCAN ))
98+ .switchIfEmpty (Mono .error (FeatureNotAvailableException ::rangeScan ));
11299 }
113100
114- /**
115- * Performs a range scan between a start and an end term (reactive).
116- *
117- * @param rangeScan
118- * @param options
119- * @return a {@link Flux} of returned items, or a failed flux during errors.
120- */
101+ private static <T extends Comparable <T >> T min (T a , T b ) {
102+ return a .compareTo (b ) < 0 ? a : b ;
103+ }
121104
122- public Flux <CoreRangeScanItem > rangeScan (CoreRangeScan rangeScan , CoreScanOptions options ) {
105+ public Flux <CoreRangeScanItem > rangeScan (CoreRangeScan rangeScan , CoreScanOptions options ) {
123106 return Flux .defer (() -> {
124107 CoreKvParamValidators .validateScanParams (rangeScan , options );
125108
126- if (currentBucketConfig == null ) {
127- // We might not have a config yet if bootstrap is still in progress, wait 100ms
128- // and then try again. In a steady state this should not happen.
129- return Mono
130- .delay (Duration .ofMillis (100 ), core .context ().environment ().scheduler ())
131- .flatMapMany (ign -> rangeScan (rangeScan , options ));
132- } else if (!(currentBucketConfig instanceof CouchbaseBucketConfig )) {
133- return Flux .error (new IllegalStateException ("Only Couchbase buckets are supported with KV Range Scan" ));
134- }
135- Map <Short , MutationToken > consistencyMap =options .consistencyMap ();
136- return streamForPartitions ((partition , start ) -> {
137- byte [] actualStartTerm = start == null ? rangeScan .from ().id ().getBytes (UTF_8 ) : start ;
138- return RangeScanCreateRequest .forRangeScan (actualStartTerm , rangeScan , options , partition , core .context (),
139- collectionIdentifier , consistencyMap );
140- }, options );
109+ return currentBucketTopology (options )
110+ .flatMapMany (topology -> streamForPartitions (
111+ topology ,
112+ options ,
113+ (partition , start ) -> {
114+ byte [] actualStartTerm = start == null ? rangeScan .from ().id ().getBytes (UTF_8 ) : start ;
115+ return RangeScanCreateRequest .forRangeScan (
116+ actualStartTerm ,
117+ rangeScan ,
118+ options ,
119+ partition ,
120+ core .context (),
121+ collectionIdentifier ,
122+ options .consistencyMap ()
123+ );
124+ }));
141125 });
142126 }
143127
144- /**
145- * Performs a sampling scan (reactive).
146- *
147- * @param samplingScan
148- * @param options
149- * @return a {@link Flux} of returned items, or a failed flux during errors.
150- */
151128 public Flux <CoreRangeScanItem > samplingScan (CoreSamplingScan samplingScan , CoreScanOptions options ) {
152- return Flux
153- .defer (() -> {
154- CoreKvParamValidators .validateScanParams (samplingScan , options );
155- return Mono .just (0 );
156- })
157- .thenMany (Flux .defer (() -> {
158- if (currentBucketConfig == null ) {
159- // We might not have a config yet if bootstrap is still in progress, wait 100ms
160- // and then try again. In a steady state this should not happen.
161- return Mono
162- .delay (Duration .ofMillis (100 ), core .context ().environment ().scheduler ())
163- .flatMapMany (ign -> samplingScan (samplingScan , options ));
164- } else if (!(currentBucketConfig instanceof CouchbaseBucketConfig )) {
165- return Flux .error (new IllegalStateException ("Only Couchbase buckets are supported with KV Range Scan" ));
166- }
167- Map <Short , MutationToken > consistencyMap =options .consistencyMap ();
168- return streamForPartitions ((partition , ignored ) -> RangeScanCreateRequest .forSamplingScan (samplingScan ,
169- options , partition , core .context (), collectionIdentifier , consistencyMap ), options );
170- }).take (samplingScan .limit ()));
171- }
172-
173- @ SuppressWarnings ("unchecked" )
174- private Flux <CoreRangeScanItem > streamForPartitions (final BiFunction <Short , byte [], RangeScanCreateRequest > createSupplier ,
175- final CoreScanOptions options ) {
129+ return Flux .defer (() -> {
130+ CoreKvParamValidators .validateScanParams (samplingScan , options );
176131
177- if (!capabilityEnabled ) {
178- return Flux .error (FeatureNotAvailableException .rangeScan ());
179- }
132+ return currentBucketTopology (options )
133+ .flatMapMany (topology -> streamForPartitions (
134+ topology ,
135+ options ,
136+ (partition , ignored ) -> RangeScanCreateRequest .forSamplingScan (
137+ samplingScan ,
138+ options ,
139+ partition ,
140+ core .context (),
141+ collectionIdentifier ,
142+ options .consistencyMap ()
143+ )
144+ ))
145+ .take (samplingScan .limit ());
146+ });
147+ }
180148
149+ private Flux <CoreRangeScanItem > streamForPartitions (
150+ CouchbaseBucketTopology topology ,
151+ CoreScanOptions options ,
152+ BiFunction <Short , byte [], RangeScanCreateRequest > createSupplier
153+ ) {
181154 final AtomicLong itemsStreamed = new AtomicLong ();
182155
183- int numPartitions = (( CouchbaseBucketConfig ) currentBucketConfig ) .numberOfPartitions ();
156+ int numPartitions = topology .numberOfPartitions ();
184157
185158 List <Flux <CoreRangeScanItem >> partitionStreams = new ArrayList <>(numPartitions );
186159 for (short i = 0 ; i < numPartitions ; i ++) {
0 commit comments