@@ -67,7 +67,7 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
6767 val options = createOptions(request)
6868 result.setInitiated(getTimeNow)
6969 val start = System .nanoTime
70- val r = if (options == null ) content match {
70+ val r = if (options == null ) content match {
7171 case ContentString (value) => collection.insert(docId, value).block()
7272 case ContentJson (value) => collection.insert(docId, value).block()
7373 }
@@ -86,7 +86,7 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
8686 val options = createOptions(request)
8787 result.setInitiated(getTimeNow)
8888 val start = System .nanoTime
89- val r =
89+ val r =
9090 if (options == null ) collection.get(docId).block()
9191 else collection.get(docId, options).block()
9292 result.setElapsedNanos(System .nanoTime - start)
@@ -99,7 +99,7 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
9999 val options = createOptions(request)
100100 result.setInitiated(getTimeNow)
101101 val start = System .nanoTime
102- val r =
102+ val r =
103103 if (options == null ) collection.remove(docId).block()
104104 else collection.remove(docId, options).block()
105105 result.setElapsedNanos(System .nanoTime - start)
@@ -113,7 +113,7 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
113113 val content = convertContent(request.getContent)
114114 result.setInitiated(getTimeNow)
115115 val start = System .nanoTime
116- val r = if (options == null ) content match {
116+ val r = if (options == null ) content match {
117117 case ContentString (value) => collection.replace(docId, value).block()
118118 case ContentJson (value) => collection.replace(docId, value).block()
119119 }
@@ -133,7 +133,7 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
133133 val content = convertContent(request.getContent)
134134 result.setInitiated(getTimeNow)
135135 val start = System .nanoTime
136- val r = if (options == null ) content match {
136+ val r = if (options == null ) content match {
137137 case ContentString (value) => collection.upsert(docId, value).block()
138138 case ContentJson (value) => collection.upsert(docId, value).block()
139139 }
@@ -145,15 +145,15 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
145145 result.setElapsedNanos(System .nanoTime - start)
146146 if (op.getReturnResult) populateResult(result, r)
147147 else setSuccess(result)
148- // [start:1.5.0]
148+ // [start:1.5.0]
149149 } else if (op.hasRangeScan) {
150150 val request = op.getRangeScan
151151 val collection = connection.collection(request.getCollection).reactive
152152 val options = createOptions(request)
153153 val scanType = convertScanType(request)
154154 result.setInitiated(getTimeNow)
155155 val start = System .nanoTime
156- val flux =
156+ val flux =
157157 if (options == null ) collection.scan(scanType)
158158 else collection.scan(scanType, options)
159159 result.setElapsedNanos(System .nanoTime - start)
@@ -174,76 +174,85 @@ class ReactiveScalaSdkCommandExecutor(val connection: ClusterConnection, val cou
174174 .setStreamId(streamer.streamId)
175175 )
176176 )
177- // [end:1.5.0]
177+ // [end:1.5.0]
178178 } else if (op.hasClusterCommand) {
179- val clc = op.getClusterCommand
180- val cluster = connection.cluster.reactive
181-
182- if (clc.hasWaitUntilReady) {
183- val request = clc.getWaitUntilReady
184- logger.info(" Calling waitUntilReady with timeout " + request.getTimeoutMillis + " milliseconds." )
185- val timeout = request.getTimeoutMillis.milliseconds
186-
187- var response : SMono [Unit ] = null
188-
189- if (request.hasOptions) {
190- val options = waitUntilReadyOptions(request)
191- response = cluster.waitUntilReady(timeout, options)
192- } else {
193- response = cluster.waitUntilReady(timeout)
194- }
195-
196- response.doOnSuccess(_ => {
197- setSuccess(result)
198- result.build()
199- }).block()
200- }
201- // [start:1.4.11]
202- else if (clc.hasBucketManager) {
203- return BucketManagerHelper .handleBucketManagerReactive(cluster, op).block()
179+ val clc = op.getClusterCommand
180+ val cluster = connection.cluster.reactive
181+
182+ if (clc.hasWaitUntilReady) {
183+ val request = clc.getWaitUntilReady
184+ logger.info(
185+ " Calling waitUntilReady with timeout " + request.getTimeoutMillis + " milliseconds."
186+ )
187+ val timeout = request.getTimeoutMillis.milliseconds
188+
189+ var response : SMono [Unit ] = null
190+
191+ if (request.hasOptions) {
192+ val options = waitUntilReadyOptions(request)
193+ response = cluster.waitUntilReady(timeout, options)
194+ } else {
195+ response = cluster.waitUntilReady(timeout)
204196 }
205- // [end:1.4.11]
197+
198+ response
199+ .doOnSuccess(_ => {
200+ setSuccess(result)
201+ result.build()
202+ })
203+ .block()
204+ }
205+ // [start:1.4.11]
206+ else if (clc.hasBucketManager) {
207+ return BucketManagerHelper .handleBucketManagerReactive(cluster, op).block()
208+ }
209+ // [end:1.4.11]
206210 } else if (op.hasBucketCommand) {
207- val blc = op.getBucketCommand
208- val bucket = connection.cluster.reactive.bucket(blc.getBucketName)
209-
210- if (blc.hasWaitUntilReady) {
211- val request = blc.getWaitUntilReady
212- logger.info(" Calling waitUntilReady on bucket " + bucket + " with timeout " + request.getTimeoutMillis + " milliseconds." )
213- val timeout = request.getTimeoutMillis.milliseconds
214-
215- var response : SMono [Unit ] = null
216-
217- if (request.hasOptions) {
218- val options = waitUntilReadyOptions(request)
219- response = bucket.waitUntilReady(timeout, options)
220- } else {
221- response = bucket.waitUntilReady(timeout)
222- }
223-
224- response.`then`[Result ](SMono .fromCallable[Result ](() => {
225- setSuccess(result)
226- result.build()
227- })).block()
228- }
229- }
230- else if (op.hasCollectionCommand) {
231- val clc = op.getCollectionCommand
232- val collection = if (clc.hasCollection) {
233- val coll = clc.getCollection
234- Some (connection.cluster
235- .bucket(coll.getBucketName)
236- .scope(coll.getScopeName)
237- .collection(coll.getCollectionName))
238- }
239- else None
211+ val blc = op.getBucketCommand
212+ val bucket = connection.cluster.reactive.bucket(blc.getBucketName)
213+
214+ if (blc.hasWaitUntilReady) {
215+ val request = blc.getWaitUntilReady
216+ logger.info(
217+ " Calling waitUntilReady on bucket " + bucket + " with timeout " + request.getTimeoutMillis + " milliseconds."
218+ )
219+ val timeout = request.getTimeoutMillis.milliseconds
220+
221+ var response : SMono [Unit ] = null
240222
241- if (clc.hasLookupIn || clc.hasLookupInAllReplicas || clc.hasLookupInAnyReplica) {
242- result = LookupInHelper .handleLookupInReactive(perRun, connection, op, (loc) => getDocId(loc)).block()
223+ if (request.hasOptions) {
224+ val options = waitUntilReadyOptions(request)
225+ response = bucket.waitUntilReady(timeout, options)
226+ } else {
227+ response = bucket.waitUntilReady(timeout)
243228 }
244- else throw new UnsupportedOperationException ()
245- }
246- else throw new UnsupportedOperationException (new IllegalArgumentException (" Unknown operation" ))
229+
230+ response
231+ .`then`[Result ](SMono .fromCallable[Result ](() => {
232+ setSuccess(result)
233+ result.build()
234+ }))
235+ .block()
236+ }
237+ } else if (op.hasCollectionCommand) {
238+ val clc = op.getCollectionCommand
239+ val collection = if (clc.hasCollection) {
240+ val coll = clc.getCollection
241+ Some (
242+ connection.cluster
243+ .bucket(coll.getBucketName)
244+ .scope(coll.getScopeName)
245+ .collection(coll.getCollectionName)
246+ )
247+ } else None
248+
249+ if (clc.hasLookupIn || clc.hasLookupInAllReplicas || clc.hasLookupInAnyReplica) {
250+ result = LookupInHelper
251+ .handleLookupInReactive(perRun, connection, op, (loc) => getDocId(loc))
252+ .block()
253+ } else throw new UnsupportedOperationException ()
254+ } else
255+ throw new UnsupportedOperationException (new IllegalArgumentException (" Unknown operation" ))
247256
248257 result.build
249258 }
0 commit comments