Skip to content

Commit 97c61f6

Browse files
committed
SCBC-460: Support Binary Objects in Transactions
Add support for reading and writing binary content within transactions. Change-Id: I3a9b951e90ba1cd72b553562cc1880300626209f Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/228739 Tested-by: Build Bot <[email protected]> Reviewed-by: David Nault <[email protected]>
1 parent bda25f5 commit 97c61f6

10 files changed

+531
-35
lines changed

scala-client/src/main/scala/com/couchbase/client/scala/transactions/AsyncTransactionAttemptContext.scala

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import com.couchbase.client.core.transaction.CoreTransactionAttemptContext
2828
import com.couchbase.client.core.transaction.support.SpanWrapper
2929
import com.couchbase.client.scala.codec.JsonSerializer
3030
import com.couchbase.client.scala.env.ClusterEnvironment
31+
import com.couchbase.client.scala.transactions.config.{
32+
TransactionGetOptions,
33+
TransactionGetReplicaFromPreferredServerGroupOptions,
34+
TransactionInsertOptions,
35+
TransactionReplaceOptions
36+
}
3137
import com.couchbase.client.scala.transactions.internal.EncodingUtil.encode
3238
import com.couchbase.client.scala.util.FutureConversions
3339
import com.couchbase.client.scala.{AsyncCollection, AsyncScope}
@@ -59,9 +65,27 @@ class AsyncTransactionAttemptContext private[scala] (
5965
* @return a <code>TransactionGetResult</code> containing the document
6066
*/
6167
def get(collection: AsyncCollection, id: String): Future[TransactionGetResult] = {
68+
get(collection, id, TransactionGetOptions.Default)
69+
}
70+
71+
/**
72+
* Gets a document with the specified <code>id</code> and from the specified Couchbase <code>collection</code>.
73+
* <p>
74+
* If the document does not exist it will raise a [[com.couchbase.client.core.error.DocumentNotFoundException]].
75+
*
76+
* @param collection the Couchbase collection the document exists on
77+
* @param id the document's ID
78+
* @param options options controlling the operation
79+
* @return a <code>TransactionGetResult</code> containing the document
80+
*/
81+
def get(
82+
collection: AsyncCollection,
83+
id: String,
84+
options: TransactionGetOptions
85+
): Future[TransactionGetResult] = {
6286
FutureConversions
6387
.javaMonoToScalaFuture(internal.get(collection.collectionIdentifier, id))
64-
.map(TransactionGetResult)
88+
.map(result => TransactionGetResult(result, options.transcoder))
6589
}
6690

6791
/** Gets a document from the specified Couchbase <code>collection</code> matching the specified <code>id</code>.
@@ -81,12 +105,38 @@ class AsyncTransactionAttemptContext private[scala] (
81105
def getReplicaFromPreferredServerGroup(
82106
collection: AsyncCollection,
83107
id: String
108+
): Future[TransactionGetResult] =
109+
getReplicaFromPreferredServerGroup(
110+
collection,
111+
id,
112+
TransactionGetReplicaFromPreferredServerGroupOptions.Default
113+
)
114+
115+
/** Gets a document from the specified Couchbase <code>collection</code> matching the specified <code>id</code>.
116+
* <p>
117+
* It will be fetched only from document copies that on nodes in the preferred server group, which can
118+
* be configured with [[com.couchbase.client.scala.env.ClusterEnvironment.Builder.preferredServerGroup]].
119+
* <p>
120+
* If no replica can be retrieved, which can include for reasons such as this preferredServerGroup not being set,
121+
* and misconfigured server groups, then [[com.couchbase.client.core.error.DocumentUnretrievableException]]
122+
* can be raised. It is strongly recommended that this method always be used with a fallback strategy to use
123+
* ctx.get() on failure.
124+
*
125+
* @param collection the Couchbase collection the document exists on
126+
* @param id the document's ID
127+
* @param options options controlling the operation
128+
* @return a <code>TransactionGetResult</code> containing the document
129+
*/
130+
def getReplicaFromPreferredServerGroup(
131+
collection: AsyncCollection,
132+
id: String,
133+
options: TransactionGetReplicaFromPreferredServerGroupOptions
84134
): Future[TransactionGetResult] =
85135
FutureConversions
86136
.javaMonoToScalaFuture(
87137
internal.getReplicaFromPreferredServerGroup(collection.collectionIdentifier, id)
88138
)
89-
.map(TransactionGetResult)
139+
.map(result => TransactionGetResult(result, options.transcoder))
90140

91141
/**
92142
* Inserts a new document into the specified Couchbase <code>collection</code>.
@@ -98,10 +148,30 @@ class AsyncTransactionAttemptContext private[scala] (
98148
*/
99149
def insert[T](collection: AsyncCollection, id: String, content: T)(
100150
implicit serializer: JsonSerializer[T]
151+
): Future[TransactionGetResult] = {
152+
insert(collection, id, content, TransactionInsertOptions.Default)
153+
}
154+
155+
/**
156+
* Inserts a new document into the specified Couchbase <code>collection</code>.
157+
*
158+
* @param collection the Couchbase collection in which to insert the doc
159+
* @param id the document's unique ID
160+
* @param content $SupportedTypes
161+
* @param options options controlling the operation
162+
* @return the doc, updated with its new CAS value and ID, and converted to a <code>TransactionGetResult</code>
163+
*/
164+
def insert[T](
165+
collection: AsyncCollection,
166+
id: String,
167+
content: T,
168+
options: TransactionInsertOptions
169+
)(
170+
implicit serializer: JsonSerializer[T]
101171
): Future[TransactionGetResult] = {
102172
val span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_INSERT, internal.span())
103173
span.lowCardinalityAttribute(TracingIdentifiers.ATTR_OPERATION, TRANSACTION_OP_INSERT)
104-
encode(content, span, serializer, internal.core.context) match {
174+
encode(content, span, serializer, options.transcoder, internal.core.context) match {
105175
case Failure(exception) => Future.failed(exception)
106176
case Success(encoded) =>
107177
closeSpan(
@@ -111,12 +181,12 @@ class AsyncTransactionAttemptContext private[scala] (
111181
internal.insert(
112182
collection.collectionIdentifier,
113183
id,
114-
encoded,
115-
CodecFlags.JSON_COMPAT_FLAGS,
184+
encoded.encoded,
185+
encoded.flags,
116186
new SpanWrapper(span)
117187
)
118188
)
119-
.map(TransactionGetResult)
189+
.map(result => TransactionGetResult(result, options.transcoder))
120190
)
121191
}
122192
}
@@ -131,20 +201,35 @@ class AsyncTransactionAttemptContext private[scala] (
131201
*/
132202
def replace[T](doc: TransactionGetResult, content: T)(
133203
implicit serializer: JsonSerializer[T]
204+
): Future[TransactionGetResult] = {
205+
replace(doc, content, TransactionReplaceOptions.Default)
206+
}
207+
208+
/**
209+
* Mutates the specified <code>doc</code> with new content.
210+
*
211+
* @param doc the doc to be mutated
212+
* @param content $SupportedTypes
213+
* @param options options controlling the operation
214+
* @return the doc, updated with its new CAS value. For performance a copy is not created and the original doc
215+
* object is modified.
216+
*/
217+
def replace[T](doc: TransactionGetResult, content: T, options: TransactionReplaceOptions)(
218+
implicit serializer: JsonSerializer[T]
134219
): Future[TransactionGetResult] = {
135220
val span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_REPLACE, internal.span())
136221
span.lowCardinalityAttribute(TracingIdentifiers.ATTR_OPERATION, TRANSACTION_OP_REPLACE)
137-
encode(content, span, serializer, internal.core.context) match {
222+
encode(content, span, serializer, options.transcoder, internal.core.context) match {
138223
case Failure(exception) => Future.failed(exception)
139224
case Success(encoded) =>
140225
closeSpan(
141226
span,
142227
FutureConversions
143228
.javaMonoToScalaFuture(
144229
internal
145-
.replace(doc.internal, encoded, CodecFlags.JSON_COMPAT_FLAGS, new SpanWrapper(span))
230+
.replace(doc.internal, encoded.encoded, encoded.flags, new SpanWrapper(span))
146231
)
147-
.map(TransactionGetResult)
232+
.map(result => TransactionGetResult(result, options.transcoder))
148233
)
149234
}
150235
}

scala-client/src/main/scala/com/couchbase/client/scala/transactions/ReactiveTransactionAttemptContext.scala

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ import com.couchbase.client.core.msg.kv.CodecFlags
2727
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext
2828
import com.couchbase.client.core.transaction.support.SpanWrapper
2929
import com.couchbase.client.scala.codec.JsonSerializer
30+
import com.couchbase.client.scala.transactions.config.{
31+
TransactionGetOptions,
32+
TransactionGetReplicaFromPreferredServerGroupOptions,
33+
TransactionInsertOptions,
34+
TransactionReplaceOptions
35+
}
3036
import com.couchbase.client.scala.transactions.internal.EncodingUtil.encode
3137
import com.couchbase.client.scala.util.FutureConversions
3238
import com.couchbase.client.scala.{ReactiveCollection, ReactiveScope}
@@ -56,9 +62,27 @@ class ReactiveTransactionAttemptContext private[scala] (
5662
* @return a <code>TransactionGetResult</code> containing the document
5763
*/
5864
def get(collection: ReactiveCollection, id: String): SMono[TransactionGetResult] = {
65+
get(collection, id, TransactionGetOptions.Default)
66+
}
67+
68+
/**
69+
* Gets a document with the specified <code>id</code> and from the specified Couchbase <code>collection</code>.
70+
* <p>
71+
* If the document does not exist it will raise a [[com.couchbase.client.core.error.DocumentNotFoundException]].
72+
*
73+
* @param collection the Couchbase collection the document exists on
74+
* @param id the document's ID
75+
* @param options options controlling the operation
76+
* @return a <code>TransactionGetResult</code> containing the document
77+
*/
78+
def get(
79+
collection: ReactiveCollection,
80+
id: String,
81+
options: TransactionGetOptions
82+
): SMono[TransactionGetResult] = {
5983
FutureConversions
6084
.javaMonoToScalaMono(internal.get(collection.collectionIdentifier, id))
61-
.map(TransactionGetResult)
85+
.map(result => TransactionGetResult(result, options.transcoder))
6286
}
6387

6488
/** Gets a document from the specified Couchbase <code>collection</code> matching the specified <code>id</code>.
@@ -78,12 +102,38 @@ class ReactiveTransactionAttemptContext private[scala] (
78102
def getReplicaFromPreferredServerGroup(
79103
collection: ReactiveCollection,
80104
id: String
105+
): SMono[TransactionGetResult] =
106+
getReplicaFromPreferredServerGroup(
107+
collection,
108+
id,
109+
TransactionGetReplicaFromPreferredServerGroupOptions.Default
110+
)
111+
112+
/** Gets a document from the specified Couchbase <code>collection</code> matching the specified <code>id</code>.
113+
* <p>
114+
* It will be fetched only from document copies that on nodes in the preferred server group, which can
115+
* be configured with [[com.couchbase.client.scala.env.ClusterEnvironment.Builder.preferredServerGroup]].
116+
* <p>
117+
* If no replica can be retrieved, which can include for reasons such as this preferredServerGroup not being set,
118+
* and misconfigured server groups, then [[com.couchbase.client.core.error.DocumentUnretrievableException]]
119+
* can be raised. It is strongly recommended that this method always be used with a fallback strategy to use
120+
* ctx.get() on failure.
121+
*
122+
* @param collection the Couchbase collection the document exists on
123+
* @param id the document's ID
124+
* @param options options controlling the operation
125+
* @return a <code>TransactionGetResult</code> containing the document
126+
*/
127+
def getReplicaFromPreferredServerGroup(
128+
collection: ReactiveCollection,
129+
id: String,
130+
options: TransactionGetReplicaFromPreferredServerGroupOptions
81131
): SMono[TransactionGetResult] =
82132
FutureConversions
83133
.javaMonoToScalaMono(
84134
internal.getReplicaFromPreferredServerGroup(collection.collectionIdentifier, id)
85135
)
86-
.map(TransactionGetResult)
136+
.map(result => TransactionGetResult(result, options.transcoder))
87137

88138
/**
89139
* Inserts a new document into the specified Couchbase <code>collection</code>.
@@ -95,23 +145,43 @@ class ReactiveTransactionAttemptContext private[scala] (
95145
*/
96146
def insert[T](collection: ReactiveCollection, id: String, content: T)(
97147
implicit serializer: JsonSerializer[T]
148+
): SMono[TransactionGetResult] = {
149+
insert(collection, id, content, TransactionInsertOptions.Default)
150+
}
151+
152+
/**
153+
* Inserts a new document into the specified Couchbase <code>collection</code>.
154+
*
155+
* @param collection the Couchbase collection in which to insert the doc
156+
* @param id the document's unique ID
157+
* @param content $SupportedTypes
158+
* @param options options controlling the operation
159+
* @return the doc, updated with its new CAS value and ID, and converted to a <code>TransactionGetResult</code>
160+
*/
161+
def insert[T](
162+
collection: ReactiveCollection,
163+
id: String,
164+
content: T,
165+
options: TransactionInsertOptions
166+
)(
167+
implicit serializer: JsonSerializer[T]
98168
): SMono[TransactionGetResult] = {
99169
val span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_INSERT, internal.span())
100170
span.lowCardinalityAttribute(TracingIdentifiers.ATTR_OPERATION, TRANSACTION_OP_INSERT)
101-
encode(content, span, serializer, internal.core.context) match {
171+
encode(content, span, serializer, options.transcoder, internal.core.context) match {
102172
case Failure(exception) => SMono.raiseError(exception)
103173
case Success(encoded) =>
104174
FutureConversions
105175
.javaMonoToScalaMono(
106176
internal.insert(
107177
collection.collectionIdentifier,
108178
id,
109-
encoded,
110-
CodecFlags.JSON_COMPAT_FLAGS,
179+
encoded.encoded,
180+
encoded.flags,
111181
new SpanWrapper(span)
112182
)
113183
)
114-
.map(TransactionGetResult)
184+
.map(result => TransactionGetResult(result, options.transcoder))
115185
.doOnError(_ => span.status(RequestSpan.StatusCode.ERROR))
116186
.doOnTerminate(() => span.end())
117187
}
@@ -127,18 +197,33 @@ class ReactiveTransactionAttemptContext private[scala] (
127197
*/
128198
def replace[T](doc: TransactionGetResult, content: T)(
129199
implicit serializer: JsonSerializer[T]
200+
): SMono[TransactionGetResult] = {
201+
replace(doc, content, TransactionReplaceOptions.Default)
202+
}
203+
204+
/**
205+
* Mutates the specified <code>doc</code> with new content.
206+
*
207+
* @param doc the doc to be mutated
208+
* @param content $SupportedTypes
209+
* @param options options controlling the operation
210+
* @return the doc, updated with its new CAS value. For performance a copy is not created and the original doc
211+
* object is modified.
212+
*/
213+
def replace[T](doc: TransactionGetResult, content: T, options: TransactionReplaceOptions)(
214+
implicit serializer: JsonSerializer[T]
130215
): SMono[TransactionGetResult] = {
131216
val span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_REPLACE, internal.span())
132217
span.lowCardinalityAttribute(TracingIdentifiers.ATTR_OPERATION, TRANSACTION_OP_REPLACE)
133-
encode(content, span, serializer, internal.core.context) match {
218+
encode(content, span, serializer, options.transcoder, internal.core.context) match {
134219
case Failure(exception) => SMono.raiseError(exception)
135220
case Success(encoded) =>
136221
FutureConversions
137222
.javaMonoToScalaMono(
138223
internal
139-
.replace(doc.internal, encoded, CodecFlags.JSON_COMPAT_FLAGS, new SpanWrapper(span))
224+
.replace(doc.internal, encoded.encoded, encoded.flags, new SpanWrapper(span))
140225
)
141-
.map(TransactionGetResult)
226+
.map(result => TransactionGetResult(result, options.transcoder))
142227
.doOnError(_ => span.status(RequestSpan.StatusCode.ERROR))
143228
.doOnTerminate(() => span.end())
144229
}

0 commit comments

Comments
 (0)