Skip to content

Commit 764b111

Browse files
Brett Lawsonbrett19
authored andcommitted
ING-1072: Added initial support for XDCR RPCs.
1 parent 1d4680a commit 764b111

File tree

12 files changed

+2030
-43
lines changed

12 files changed

+2030
-43
lines changed

docs/XDCR.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# XDCR Protostellar Behaviour
2+
3+
### PushDocument
4+
5+
PushDocument exposes two values for CAS, one is the CheckCas and one is the StoreCAS. The
6+
CheckCas represents the typical Compare And Swap operation of other memcached commands. If
7+
a CheckCas is specified, conflict resolution is disabled and assuming the CAS check completes,
8+
the write will be accepted. If CheckCas is set to the value of '0' explicitly, it is expected
9+
that the document must not exist and a AddWithMeta will be performed. StoreCAS represents the
10+
new CAS value for the document being pushed, and in cases where conflict resolution is in use,
11+
this is the value that is compared for conflict resolution.
12+
13+
A pseudocode explanation of the logic is:
14+
15+
```
16+
if req.CheckCas != nil:
17+
if *req.CheckCas == 0:
18+
AddWithMeta()
19+
else:
20+
SetWithMeta(CheckCas: req.CheckCas, Options: SkipConflictResolution)
21+
else:
22+
SetWithMeta(CheckCas: 0, Options: 0)
23+
```
24+
25+
Additionally, if IsDeleted is specified on the PushDocument operation, it is assumed that this
26+
is an attempt to delete the document, and as such CheckCas will optionally control whether a
27+
pure CAS check is performed (and 0 is an invalid value).
28+
29+
There are a number of conflict related errors that can be returned by the PushDocument operation
30+
based on a number of different factors. Namely whether or not CheckCas is being applied as well as
31+
whether conflict logging is enabled for the operation.
32+
33+
ConflictLogging=true:
34+
35+
- Error:Aborted Code:TRUE_CONFLICT
36+
This case will occur when the operation would otherwise succeed, but a true conflict was detected
37+
and the client has not provided an explicit CAS value (to verify they have the existing document
38+
already available to them locally). This error will also include additional error context which
39+
includes the conflicted documents meta-data and contents. Note that this error can only be
40+
returned if the TrueConflicts parameter is set to true.
41+
42+
- Error:Aborted Code:CAS_MISMATCH
43+
This case will occur when a CAS was explicitly provided to the operation, but the target document
44+
has been updated and has a different CAS value.
45+
46+
- Error:Aborted Code:DOC_NEWER
47+
This case occurs when a new document is rejected because it is older than the document that
48+
already exists. The concept of 'newer' uses the buckets configured conflict resolution mode.
49+
50+
ConflictLogging=false:
51+
52+
### CheckDocument
53+
54+
CheckDocument has precisely the same mechanics as PushDocument does, with the exception that in the
55+
cases where a PushDocument operation would write changes to the document, CheckDocument will return
56+
success and do no write.

gateway/dataimpl/dataimpl.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Servers struct {
2626
AdminQueryIndexV1Server *server_v1.QueryIndexAdminServer
2727
AdminSearchIndexV1Server *server_v1.SearchIndexAdminServer
2828
TransactionsV1Server *server_v1.TransactionsServer
29+
XdcrV1Server *server_v1.XdcrServer
2930
}
3031

3132
func New(opts *NewOptions) *Servers {
@@ -87,5 +88,10 @@ func New(opts *NewOptions) *Servers {
8788
v1ErrHandler,
8889
v1AuthHandler,
8990
),
91+
XdcrV1Server: server_v1.NewXdcrServer(
92+
opts.Logger.Named("xdcr"),
93+
v1ErrHandler,
94+
v1AuthHandler,
95+
),
9096
}
9197
}

gateway/dataimpl/server_v1/errorhandler.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,17 @@ func (e ErrorHandler) NewDocCasMismatchStatus(baseErr error, bucketName, scopeNa
602602
return st
603603
}
604604

605+
func (e ErrorHandler) NewDocConflictStatus(baseErr error, bucketName, scopeName, collectionName, docId string) *status.Status {
606+
st := status.New(codes.Aborted,
607+
fmt.Sprintf("Conflict resolution rejected '%s' in '%s/%s/%s'.",
608+
docId, bucketName, scopeName, collectionName))
609+
st = e.tryAttachStatusDetails(st, &epb.ErrorInfo{
610+
Reason: "DOC_NEWER",
611+
})
612+
st = e.tryAttachExtraContext(st, baseErr)
613+
return st
614+
}
615+
605616
func (e ErrorHandler) NewZeroCasStatus() *status.Status {
606617
st := status.New(codes.InvalidArgument, "CAS value cannot be zero.")
607618
return st
@@ -1002,3 +1013,21 @@ func (e ErrorHandler) NewInvalidKeyLengthStatus(key string) *status.Status {
10021013
fmt.Sprintf("Length of document key '%s' must be between 1 and 251 characters.", key))
10031014
return st
10041015
}
1016+
1017+
func (e ErrorHandler) NewVbUuidDivergenceStatus(baseErr error, bucketName, scopeName, collectionName, docId string) *status.Status {
1018+
st := status.New(codes.Aborted,
1019+
fmt.Sprintf("The specified vbuuid for '%s' in '%s/%s/%s' did not match.",
1020+
docId, bucketName, scopeName, collectionName))
1021+
st = e.tryAttachStatusDetails(st, &epb.ErrorInfo{
1022+
Reason: "VBUUID_MISMATCH",
1023+
})
1024+
st = e.tryAttachExtraContext(st, baseErr)
1025+
return st
1026+
}
1027+
1028+
func (e ErrorHandler) NewUnimplementedServerVersionStatus() *status.Status {
1029+
st := status.New(
1030+
codes.Unimplemented,
1031+
"The requested feature is not available on this server version.")
1032+
return st
1033+
}

gateway/dataimpl/server_v1/kvserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (s *KvServer) GetAndLock(ctx context.Context, in *kv_v1.GetAndLockRequest)
192192
opts.ScopeName = in.ScopeName
193193
opts.CollectionName = in.CollectionName
194194
opts.Key = []byte(in.Key)
195-
opts.LockTime = in.LockTime
195+
opts.LockTime = in.LockTimeSecs
196196

197197
result, err := bucketAgent.GetAndLock(ctx, &opts)
198198
if err != nil {

0 commit comments

Comments
 (0)