Skip to content

Commit d086024

Browse files
committed
SCBC-460: Support Binary Objects in Transactions (FIT)
FIT changes. Change-Id: I29993d317ebde11a60e80f602cf42a647a73f196 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/228741 Tested-by: Build Bot <[email protected]> Reviewed-by: Graham Pople <[email protected]>
1 parent 4a4c0e2 commit d086024

File tree

4 files changed

+388
-31
lines changed

4 files changed

+388
-31
lines changed

scala-fit-performer/src/main/scala/com/couchbase/client/performer/scala/ScalaPerformer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class ScalaPerformer extends CorePerformer {
9494
.setPerformerUserAgent("scala")
9595
.addPerformerCaps(Caps.CLUSTER_CONFIG_CERT)
9696
.addPerformerCaps(Caps.CLUSTER_CONFIG_INSECURE)
97+
.addPerformerCaps(Caps.CONTENT_AS_PERFORMER_VALIDATION)
9798
.addAllSdkImplementationCaps(Capabilities.sdkImplementationCaps)
9899

99100
// [if:1.7.2]

scala-fit-performer/src/main/scala/com/couchbase/client/performer/scala/transaction/TransactionBlocking.scala

Lines changed: 206 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import com.couchbase.client.performer.scala.transaction.TransactionShared.Expect
2727
import com.couchbase.client.performer.scala.util.{ClusterConnection, OptionsUtil, ResultValidation}
2828
import com.couchbase.client.protocol.shared.API
2929
import com.couchbase.client.protocol.transactions.{ExpectedResult, TransactionCommand, TransactionCreateRequest, TransactionStreamPerformerToDriver}
30+
import com.couchbase.client.scala.codec._
31+
import com.couchbase.client.scala.codec.Conversions._
32+
import com.couchbase.client.scala.json._
3033
import com.couchbase.client.scala.json.JsonObject
3134
import com.couchbase.client.scala.transactions.TransactionAttemptContext
3235
import io.grpc.stub.StreamObserver
@@ -115,7 +118,10 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
115118

116119
if (op.hasInsert) {
117120
val request = op.getInsert
118-
val content = JsonObject.fromJson(request.getContentJson)
121+
val content = readContent(
122+
if (request.hasContentJson) Some(request.getContentJson) else None,
123+
if (request.hasContent) Some(request.getContent) else None
124+
)
119125
val collection = connection.collection(request.getDocId)
120126
performOperation(
121127
dbg + "insert " + request.getDocId.getDocId,
@@ -131,7 +137,39 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
131137
request.getDocId.getBucketName,
132138
request.getDocId.getCollectionName
133139
)
134-
ctx.insert(collection, request.getDocId.getDocId, content).get
140+
// [if:1.9.0]
141+
val options = TransactionOptionsUtil.transactionInsertOptions(request)
142+
content match {
143+
case ContentString(value) =>
144+
options match {
145+
case Some(opts) => ctx.insert(collection, request.getDocId.getDocId, value, opts).get
146+
case None => ctx.insert(collection, request.getDocId.getDocId, value).get
147+
}
148+
case ContentJson(value) =>
149+
options match {
150+
case Some(opts) => ctx.insert(collection, request.getDocId.getDocId, value, opts).get
151+
case None => ctx.insert(collection, request.getDocId.getDocId, value).get
152+
}
153+
case ContentByteArray(value) =>
154+
options match {
155+
case Some(opts) => ctx.insert(collection, request.getDocId.getDocId, value, opts).get
156+
case None => ctx.insert(collection, request.getDocId.getDocId, value).get
157+
}
158+
case ContentNull(value) =>
159+
options match {
160+
case Some(opts) => ctx.insert(collection, request.getDocId.getDocId, value, opts).get
161+
case None => ctx.insert(collection, request.getDocId.getDocId, value).get
162+
}
163+
}
164+
// [end]
165+
// [if:<1.9.0]
166+
//? content match {
167+
//? case ContentString(value) => ctx.insert(collection, request.getDocId.getDocId, value).get
168+
//? case ContentJson(value) => ctx.insert(collection, request.getDocId.getDocId, value).get
169+
//? case ContentByteArray(value) => ctx.insert(collection, request.getDocId.getDocId, value).get
170+
//? case ContentNull(value) => ctx.insert(collection, request.getDocId.getDocId, value).get
171+
//? }
172+
// [end]
135173
}
136174
)
137175
} else if (op.hasInsertV2) {
@@ -145,48 +183,149 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
145183
performanceMode,
146184
() => {
147185
val collection = connection.collection(request.getLocation)
186+
val docId = executor.get.getDocId(request.getLocation)
148187
content match {
149-
case ContentString(value) =>
150-
ctx.insert(collection, executor.get.getDocId(request.getLocation), value).get
151-
case ContentJson(value) =>
152-
ctx.insert(collection, executor.get.getDocId(request.getLocation), value).get
153-
case ContentByteArray(value) =>
154-
ctx.insert(collection, executor.get.getDocId(request.getLocation), value).get
155-
case ContentNull(value) =>
156-
ctx.insert(collection, executor.get.getDocId(request.getLocation), value).get
188+
case ContentString(value) => ctx.insert(collection, docId, value).get
189+
case ContentJson(value) => ctx.insert(collection, docId, value).get
190+
case ContentByteArray(value) => ctx.insert(collection, docId, value).get
191+
case ContentNull(value) => ctx.insert(collection, docId, value).get
157192
}
158193
}
159194
)
160195
} else if (op.hasReplace) {
161196
val request = op.getReplace
162-
val content = JsonObject.fromJson(request.getContentJson)
197+
val content = readContent(
198+
if (request.hasContentJson) Some(request.getContentJson) else None,
199+
if (request.hasContent) Some(request.getContent) else None
200+
)
163201
performOperation(
164202
dbg + "replace " + request.getDocId.getDocId,
165203
ctx,
166204
request.getExpectedResultList.asScala,
167205
op.getDoNotPropagateError,
168206
performanceMode,
169207
() => {
208+
// [if:1.9.0]
209+
val options = TransactionOptionsUtil.transactionReplaceOptions(request)
170210
if (request.getUseStashedResult) {
171-
ctx.replace(stashedGet.get, content).get
211+
content match {
212+
case ContentString(value) =>
213+
options match {
214+
case Some(opts) => ctx.replace(stashedGet.get, value, opts).get
215+
case None => ctx.replace(stashedGet.get, value).get
216+
}
217+
case ContentJson(value) =>
218+
options match {
219+
case Some(opts) => ctx.replace(stashedGet.get, value, opts).get
220+
case None => ctx.replace(stashedGet.get, value).get
221+
}
222+
case ContentByteArray(value) =>
223+
options match {
224+
case Some(opts) => ctx.replace(stashedGet.get, value, opts).get
225+
case None => ctx.replace(stashedGet.get, value).get
226+
}
227+
case ContentNull(value) =>
228+
options match {
229+
case Some(opts) => ctx.replace(stashedGet.get, value, opts).get
230+
case None => ctx.replace(stashedGet.get, value).get
231+
}
232+
}
172233
} else if (request.hasUseStashedSlot) {
173234
if (!stashedGetMap.contains(request.getUseStashedSlot))
174235
throw new IllegalStateException(
175236
"Do not have a stashed get in slot " + request.getUseStashedSlot
176237
)
177-
ctx.replace(stashedGetMap(request.getUseStashedSlot), content).get
238+
content match {
239+
case ContentString(value) =>
240+
options match {
241+
case Some(opts) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value, opts).get
242+
case None => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
243+
}
244+
case ContentJson(value) =>
245+
options match {
246+
case Some(opts) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value, opts).get
247+
case None => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
248+
}
249+
case ContentByteArray(value) =>
250+
options match {
251+
case Some(opts) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value, opts).get
252+
case None => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
253+
}
254+
case ContentNull(value) =>
255+
options match {
256+
case Some(opts) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value, opts).get
257+
case None => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
258+
}
259+
}
178260
} else {
179261
val collection = connection.collection(request.getDocId)
180262
logger.info(
181-
"{} Performing replace operation on docId {} to new content {} on collection {}",
263+
"{} Performing replace operation on docId {} to new content on collection {}",
182264
dbg,
183265
request.getDocId.getDocId,
184-
request.getContentJson,
185266
request.getDocId.getCollectionName
186267
)
187268
val r = ctx.get(collection, request.getDocId.getDocId).get
188-
ctx.replace(r, content).get
269+
content match {
270+
case ContentString(value) =>
271+
options match {
272+
case Some(opts) => ctx.replace(r, value, opts).get
273+
case None => ctx.replace(r, value).get
274+
}
275+
case ContentJson(value) =>
276+
options match {
277+
case Some(opts) => ctx.replace(r, value, opts).get
278+
case None => ctx.replace(r, value).get
279+
}
280+
case ContentByteArray(value) =>
281+
options match {
282+
case Some(opts) => ctx.replace(r, value, opts).get
283+
case None => ctx.replace(r, value).get
284+
}
285+
case ContentNull(value) =>
286+
options match {
287+
case Some(opts) => ctx.replace(r, value, opts).get
288+
case None => ctx.replace(r, value).get
289+
}
290+
}
189291
}
292+
// [end]
293+
// [if:<1.9.0]
294+
//? if (request.getUseStashedResult) {
295+
//? content match {
296+
//? case ContentString(value) => ctx.replace(stashedGet.get, value).get
297+
//? case ContentJson(value) => ctx.replace(stashedGet.get, value).get
298+
//? case ContentByteArray(value) => ctx.replace(stashedGet.get, value).get
299+
//? case ContentNull(value) => ctx.replace(stashedGet.get, value).get
300+
//? }
301+
//? } else if (request.hasUseStashedSlot) {
302+
//? if (!stashedGetMap.contains(request.getUseStashedSlot))
303+
//? throw new IllegalStateException(
304+
//? "Do not have a stashed get in slot " + request.getUseStashedSlot
305+
//? )
306+
//? content match {
307+
//? case ContentString(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
308+
//? case ContentJson(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
309+
//? case ContentByteArray(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
310+
//? case ContentNull(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
311+
//? }
312+
//? } else {
313+
//? val collection = connection.collection(request.getDocId)
314+
//? logger.info(
315+
//? "{} Performing replace operation on docId {} to new content on collection {}",
316+
//? dbg,
317+
//? request.getDocId.getDocId,
318+
//? request.getDocId.getCollectionName
319+
//? )
320+
//? val r = ctx.get(collection, request.getDocId.getDocId).get
321+
//? content match {
322+
//? case ContentString(value) => ctx.replace(r, value).get
323+
//? case ContentJson(value) => ctx.replace(r, value).get
324+
//? case ContentByteArray(value) => ctx.replace(r, value).get
325+
//? case ContentNull(value) => ctx.replace(r, value).get
326+
//? }
327+
//? }
328+
// [end]
190329

191330
}
192331
)
@@ -206,18 +345,20 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
206345
"Do not have a stashed get in slot " + request.getUseStashedSlot
207346
)
208347
content match {
209-
case ContentString(value) =>
210-
ctx.replace(stashedGetMap(request.getUseStashedSlot), value)
211-
case ContentJson(value) =>
212-
ctx.replace(stashedGetMap(request.getUseStashedSlot), value)
348+
case ContentString(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
349+
case ContentJson(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
350+
case ContentByteArray(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
351+
case ContentNull(value) => ctx.replace(stashedGetMap(request.getUseStashedSlot), value).get
213352
}
214353
} else {
215354
val collection = connection.collection(request.getLocation)
216355
val r =
217356
ctx.get(collection, executor.get.getDocId(request.getLocation)).get
218357
content match {
219358
case ContentString(value) => ctx.replace(r, value).get
220-
case ContentJson(value) => ctx.replace(r, value).get
359+
case ContentJson(value) => ctx.replace(r, value).get
360+
case ContentByteArray(value) => ctx.replace(r, value).get
361+
case ContentNull(value) => ctx.replace(r, value).get
221362
}
222363
}
223364

@@ -292,9 +433,18 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
292433
request.getDocId.getBucketName,
293434
request.getDocId.getCollectionName
294435
)
295-
val out =
296-
ctx.get(collection, request.getDocId.getDocId).get
297-
handleGetResult(request, out, connection)
436+
// [if:1.9.0]
437+
val options = TransactionOptionsUtil.transactionGetOptions(request)
438+
val out = options match {
439+
case Some(opts) => ctx.get(collection, request.getDocId.getDocId, opts).get
440+
case None => ctx.get(collection, request.getDocId.getDocId).get
441+
}
442+
// [end]
443+
// [if:<1.9.0]
444+
//? val out = ctx.get(collection, request.getDocId.getDocId).get
445+
// [end]
446+
val contentAsValidation = if (request.hasContentAsValidation) Some(request.getContentAsValidation) else None
447+
handleGetResult(request, out, connection, contentAsValidation)
298448

299449
}
300450
)
@@ -308,7 +458,16 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
308458
performanceMode,
309459
() => {
310460
val collection = connection.collection(request.getLocation)
311-
ctx.get(collection, executor.get.getDocId(request.getLocation)).get
461+
// [if:1.9.0]
462+
val options = TransactionOptionsUtil.transactionGetOptions(request)
463+
options match {
464+
case Some(opts) => ctx.get(collection, executor.get.getDocId(request.getLocation), opts).get
465+
case None => ctx.get(collection, executor.get.getDocId(request.getLocation)).get
466+
}
467+
// [end]
468+
// [if:<1.9.0]
469+
//? ctx.get(collection, executor.get.getDocId(request.getLocation)).get
470+
// [end]
312471
}
313472
)
314473
} else if (op.hasGetOptional) {
@@ -329,8 +488,18 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
329488
request.getDocId.getBucketName,
330489
request.getDocId.getCollectionName
331490
)
332-
val out = ctx.get(collection, request.getDocId.getDocId)
333-
handleGetOptionalResult(request, req, out, connection)
491+
// [if:1.9.0]
492+
val options = TransactionOptionsUtil.transactionGetOptions(request)
493+
val out = options match {
494+
case Some(opts) => ctx.get(collection, request.getDocId.getDocId, opts)
495+
case None => ctx.get(collection, request.getDocId.getDocId)
496+
}
497+
// [end]
498+
// [if:<1.9.0]
499+
//? val out = ctx.get(collection, request.getDocId.getDocId)
500+
// [end]
501+
val contentAsValidation = if (request.hasContentAsValidation) Some(request.getContentAsValidation) else None
502+
handleGetOptionalResult(request, req, out, connection, contentAsValidation)
334503
}
335504
)
336505
// [start:1.8.0]
@@ -344,7 +513,16 @@ class TransactionBlocking(executor: Option[TransactionCommandExecutor])
344513
performanceMode,
345514
() => {
346515
val collection = connection.collection(request.getDocId)
347-
val result = ctx.getReplicaFromPreferredServerGroup(collection, request.getDocId.getDocId).get
516+
// [if:1.9.0]
517+
val options = TransactionOptionsUtil.transactionGetReplicaFromPreferredServerGroupOptions(request)
518+
val result = options match {
519+
case Some(opts) => ctx.getReplicaFromPreferredServerGroup(collection, request.getDocId.getDocId, opts).get
520+
case None => ctx.getReplicaFromPreferredServerGroup(collection, request.getDocId.getDocId).get
521+
}
522+
// [end]
523+
// [if:<1.9.0]
524+
//? val result = ctx.getReplicaFromPreferredServerGroup(collection, request.getDocId.getDocId).get
525+
// [end]
348526
handleGetReplicaFromPreferredServerGroupResult(request, result, connection)
349527
}
350528
)

0 commit comments

Comments
 (0)