Skip to content

Commit 8d8ef78

Browse files
committed
wait for rev reply when necessary
1 parent 36110d4 commit 8d8ef78

File tree

4 files changed

+53
-70
lines changed

4 files changed

+53
-70
lines changed

rest/attachment_test.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,11 +2443,10 @@ func TestAttachmentWithErroneousRevPos(t *testing.T) {
24432443
btcRunner.AttachmentsLock(btc.id).Unlock()
24442444

24452445
// Put doc with an erroneous revpos 1 but with a different digest, referring to the above attachment
2446-
_, err := btcRunner.PushRevWithHistory(btc.id, docID, &version, []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0)
2446+
updatedVersion, err := btcRunner.PushRevWithHistory(btc.id, docID, &version, []byte(`{"_attachments": {"hello.txt": {"revpos":1,"stub":true,"length": 19,"digest":"sha1-l+N7VpXGnoxMm8xfvtWPbz2YvDc="}}}`), 1, 0)
24472447
require.NoError(t, err)
24482448

2449-
// Ensure message and attachment is pushed up
2450-
btc.pushReplication.WaitForMessage(2)
2449+
rt.WaitForVersion(docID, *updatedVersion)
24512450

24522451
// Get the attachment and ensure the data is updated
24532452
resp := btc.rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/doc/hello.txt", "")
@@ -2610,24 +2609,24 @@ func TestCBLRevposHandling(t *testing.T) {
26102609
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &opts)
26112610
defer btc.Close()
26122611

2613-
doc1Version := btc.rt.PutDoc(doc1ID, `{}`)
2614-
doc2Version := btc.rt.PutDoc(doc2ID, `{}`)
2612+
doc1Version1 := btc.rt.PutDoc(doc1ID, `{}`)
2613+
doc2Version1 := btc.rt.PutDoc(doc2ID, `{}`)
26152614

26162615
btc.rt.WaitForPendingChanges()
26172616
btcRunner.StartOneshotPull(btc.id)
2618-
btcRunner.WaitForVersion(btc.id, doc1ID, doc1Version)
2619-
btcRunner.WaitForVersion(btc.id, doc2ID, doc2Version)
2617+
btcRunner.WaitForVersion(btc.id, doc1ID, doc1Version1)
2618+
btcRunner.WaitForVersion(btc.id, doc2ID, doc2Version1)
26202619

26212620
btcRunner.StartPush(btc.id)
26222621

26232622
attachmentAData := base64.StdEncoding.EncodeToString([]byte("attachmentA"))
26242623
attachmentBData := base64.StdEncoding.EncodeToString([]byte("attachmentB"))
26252624

2626-
doc1Version = btcRunner.AddRev(btc.id, doc1ID, &doc1Version, []byte(`{"key": "val", "_attachments": {"attachment": {"data": "`+attachmentAData+`"}}}`))
2627-
doc2Version = btcRunner.AddRev(btc.id, doc2ID, &doc2Version, []byte(`{"key": "val", "_attachments": {"attachment": {"data": "`+attachmentBData+`"}}}`))
2625+
doc1Version2 := btcRunner.AddRev(btc.id, doc1ID, &doc1Version1, []byte(`{"key": "val", "_attachments": {"attachment": {"data": "`+attachmentAData+`"}}}`))
2626+
doc2Version2 := btcRunner.AddRev(btc.id, doc2ID, &doc2Version1, []byte(`{"key": "val", "_attachments": {"attachment": {"data": "`+attachmentBData+`"}}}`))
26282627

2629-
btc.rt.WaitForVersion(doc1ID, doc1Version)
2630-
btc.rt.WaitForVersion(doc2ID, doc2Version)
2628+
btc.rt.WaitForVersion(doc1ID, doc1Version2)
2629+
btc.rt.WaitForVersion(doc2ID, doc2Version2)
26312630

26322631
collection, ctx := btc.rt.GetSingleTestDatabaseCollection()
26332632
_, err := collection.GetDocument(ctx, "doc1", db.DocUnmarshalAll)
@@ -2636,12 +2635,12 @@ func TestCBLRevposHandling(t *testing.T) {
26362635
require.NoError(t, err)
26372636

26382637
// Update doc1, don't change attachment, use correct revpos
2639-
doc1Version = btcRunner.AddRev(btc.id, doc1ID, &doc1Version, []byte(`{"key": "val", "_attachments":{"attachment":{"digest":"sha1-wzp8ZyykdEuZ9GuqmxQ7XDrY7Co=","length":11,"content_type":"","stub":true,"revpos":2}}}`))
2640-
btc.rt.WaitForVersion(doc1ID, doc1Version)
2638+
doc1Version3 := btcRunner.AddRev(btc.id, doc1ID, &doc1Version2, []byte(`{"key": "val", "_attachments":{"attachment":{"digest":"sha1-wzp8ZyykdEuZ9GuqmxQ7XDrY7Co=","length":11,"content_type":"","stub":true,"revpos":2}}}`))
2639+
btc.rt.WaitForVersion(doc1ID, doc1Version3)
26412640

26422641
// Update doc1, don't change attachment, use revpos=generation of revid, as CBL 2.x does. Should not proveAttachment on digest match.
2643-
doc1Version = btcRunner.AddRev(btc.id, doc1ID, &doc1Version, []byte(`{"key": "val", "_attachments":{"attachment":{"digest":"sha1-wzp8ZyykdEuZ9GuqmxQ7XDrY7Co=","length":11,"content_type":"","stub":true,"revpos":4}}}`))
2644-
rt.WaitForVersion(doc1ID, doc1Version)
2642+
doc1Version4 := btcRunner.AddRev(btc.id, doc1ID, &doc1Version3, []byte(`{"key": "val", "_attachments":{"attachment":{"digest":"sha1-wzp8ZyykdEuZ9GuqmxQ7XDrY7Co=","length":11,"content_type":"","stub":true,"revpos":4}}}`))
2643+
rt.WaitForVersion(doc1ID, doc1Version4)
26452644

26462645
// Validate attachment exists
26472646
attResponse := btc.rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1/attachment", "")
@@ -2650,8 +2649,8 @@ func TestCBLRevposHandling(t *testing.T) {
26502649

26512650
attachmentPushCount := btc.rt.GetDatabase().DbStats.CBLReplicationPushStats.AttachmentPushCount.Value()
26522651
// Update doc1, change attachment digest with CBL revpos=generation. Should getAttachment
2653-
doc1Version = btcRunner.AddRev(btc.id, doc1ID, &doc1Version, []byte(`{"key": "val", "_attachments":{"attachment":{"digest":"sha1-SKk0IV40XSHW37d3H0xpv2+z9Ck=","length":11,"content_type":"","stub":true,"revpos":5}}}`))
2654-
rt.WaitForVersion(doc1ID, doc1Version)
2652+
doc1Version5 := btcRunner.AddRev(btc.id, doc1ID, &doc1Version4, []byte(`{"key": "val", "_attachments":{"attachment":{"digest":"sha1-SKk0IV40XSHW37d3H0xpv2+z9Ck=","length":11,"content_type":"","stub":true,"revpos":5}}}`))
2653+
rt.WaitForVersion(doc1ID, doc1Version5)
26552654

26562655
// Validate attachment exists and is updated
26572656
attResponse = btc.rt.SendAdminRequest("GET", "/{{.keyspace}}/doc1/attachment", "")

rest/blip_api_attachment_test.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,18 @@ func TestBlipPushPullV2AttachmentV2Client(t *testing.T) {
6464

6565
// Create doc revision with attachment on SG.
6666
bodyText := `{"greetings":[{"hi": "alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}`
67-
version := btc.rt.PutDoc(docID, bodyText)
67+
version1 := btc.rt.PutDoc(docID, bodyText)
6868

69-
data := btcRunner.WaitForVersion(btc.id, docID, version)
69+
data := btcRunner.WaitForVersion(btc.id, docID, version1)
7070
bodyTextExpected := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
7171
require.JSONEq(t, bodyTextExpected, string(data))
7272

7373
// Update the replicated doc at client along with keeping the same attachment stub.
7474
bodyText = `{"greetings":[{"hi":"bob"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
75-
version = btcRunner.AddRev(btc.id, docID, &version, []byte(bodyText))
75+
version2 := btcRunner.AddRev(btc.id, docID, &version1, []byte(bodyText))
7676

77-
// TODO: Replace with rt.WaitForVersion
78-
// Wait for the document to be replicated at SG
79-
btc.pushReplication.WaitForMessage(2)
80-
81-
respBody := btc.rt.GetDocVersion(docID, version)
77+
rt.WaitForVersion(docID, version2)
78+
respBody := btc.rt.GetDocVersion(docID, version2)
8279

8380
assert.Equal(t, docID, respBody[db.BodyId])
8481
greetings := respBody["greetings"].([]interface{})
@@ -135,20 +132,19 @@ func TestBlipPushPullV2AttachmentV3Client(t *testing.T) {
135132

136133
// Create doc revision with attachment on SG.
137134
bodyText := `{"greetings":[{"hi": "alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}`
138-
version := btc.rt.PutDoc(docID, bodyText)
135+
version1 := btc.rt.PutDoc(docID, bodyText)
139136

140-
data := btcRunner.WaitForVersion(btc.id, docID, version)
137+
data := btcRunner.WaitForVersion(btc.id, docID, version1)
141138
bodyTextExpected := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
142139
require.JSONEq(t, bodyTextExpected, string(data))
143140

144141
// Update the replicated doc at client along with keeping the same attachment stub.
145142
bodyText = `{"greetings":[{"hi":"bob"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
146-
version = btcRunner.AddRev(btc.id, docID, &version, []byte(bodyText))
143+
version2 := btcRunner.AddRev(btc.id, docID, &version1, []byte(bodyText))
147144

148-
// Wait for the document to be replicated at SG
149-
btc.pushReplication.WaitForMessage(2)
145+
rt.WaitForVersion(docID, version2)
150146

151-
respBody := btc.rt.GetDocVersion(docID, version)
147+
respBody := btc.rt.GetDocVersion(docID, version2)
152148

153149
assert.Equal(t, docID, respBody[db.BodyId])
154150
greetings := respBody["greetings"].([]interface{})

rest/blip_api_delta_sync_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,20 +130,18 @@ func TestBlipDeltaSyncPushPullNewAttachment(t *testing.T) {
130130

131131
// Create doc1 rev 1-77d9041e49931ceef58a1eef5fd032e8 on SG with an attachment
132132
bodyText := `{"greetings":[{"hi": "alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}`
133-
version := rt.PutDoc(docID, bodyText)
134-
data := btcRunner.WaitForVersion(btc.id, docID, version)
133+
version1 := rt.PutDoc(docID, bodyText)
134+
data := btcRunner.WaitForVersion(btc.id, docID, version1)
135135

136136
bodyTextExpected := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
137137
require.JSONEq(t, bodyTextExpected, string(data))
138138

139139
// Update the replicated doc at client by adding another attachment.
140140
bodyText = `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":1,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="},"world.txt":{"data":"bGVsbG8gd29ybGQ="}}}`
141-
version = btcRunner.AddRev(btc.id, docID, &version, []byte(bodyText))
141+
version2 := btcRunner.AddRev(btc.id, docID, &version1, []byte(bodyText))
142142

143-
// Wait for the document to be replicated at SG
144-
btc.pushReplication.WaitForMessage(2)
145-
146-
respBody := rt.GetDocVersion(docID, version)
143+
rt.WaitForVersion(docID, version2)
144+
respBody := rt.GetDocVersion(docID, version2)
147145

148146
assert.Equal(t, docID, respBody[db.BodyId])
149147
greetings := respBody["greetings"].([]interface{})
@@ -848,6 +846,8 @@ func TestBlipDeltaSyncPush(t *testing.T) {
848846
assert.NotEqual(t, `{"greetings":{"2-":[{"howdy":"bob"}]}}`, string(msgBody))
849847
assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`, string(msgBody))
850848
}
849+
// wait for response body, indicating rev was written to server
850+
_ = msg.Response()
851851

852852
respBody := rt.GetDocVersion(docID, newRev)
853853
assert.Equal(t, "doc1", respBody[db.BodyId])
@@ -922,24 +922,28 @@ func TestBlipNonDeltaSyncPush(t *testing.T) {
922922
btcRunner.StartPush(client.id)
923923

924924
// create doc1 rev 1-0335a345b6ffed05707ccc4cbc1b67f4
925-
version := rt.PutDoc(docID, `{"greetings": [{"hello": "world!"}, {"hi": "alice"}]}`)
925+
version1 := rt.PutDoc(docID, `{"greetings": [{"hello": "world!"}, {"hi": "alice"}]}`)
926926

927-
data := btcRunner.WaitForVersion(client.id, docID, version)
927+
data := btcRunner.WaitForVersion(client.id, docID, version1)
928928
assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"}]}`, string(data))
929929
// create doc1 rev 2-abcxyz on client
930-
newRev := btcRunner.AddRev(client.id, docID, &version, []byte(`{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`))
931-
// Check EE is delta, and CE is full-body replication
930+
version2 := btcRunner.AddRev(client.id, docID, &version1, []byte(`{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`))
931+
// MSG1: proposeChanges
932+
// MSG2: rev
932933
msg := client.waitForReplicationMessage(collection, 2)
934+
require.Equal(t, db.MessageRev, msg.Profile())
935+
936+
// wait for the reply, indicating the message was written
937+
_ = msg.Response()
933938

934939
// Check the request was NOT sent with a deltaSrc property
935940
assert.Equal(t, "", msg.Properties[db.RevMessageDeltaSrc])
936941
// Check the request body was NOT the delta
937942
msgBody, err := msg.Body()
938943
assert.NoError(t, err)
939-
assert.NotEqual(t, `{"greetings":{"2-":[{"howdy":"bob"}]}}`, string(msgBody))
940944
assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`, string(msgBody))
941945

942-
body := rt.GetDocVersion("doc1", newRev)
946+
body := rt.GetDocVersion("doc1", version2)
943947
require.Equal(t, "bob", body["greetings"].([]interface{})[2].(map[string]interface{})["howdy"])
944948
})
945949
}

rest/blip_client_test.go

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,12 +1089,8 @@ func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOpt
10891089
proposeChangesResponse := proposeChangesRequest.Response()
10901090
rspBody, err := proposeChangesResponse.Body()
10911091
require.NoError(btcc.TB(), err)
1092-
errorDomain := proposeChangesResponse.Properties["Error-Domain"]
1093-
errorCode := proposeChangesResponse.Properties["Error-Code"]
1094-
if errorDomain != "" && errorCode != "" {
1095-
btcc.TB().Errorf("error %s %s from proposeChanges with body: %s", errorDomain, errorCode, string(rspBody))
1096-
return
1097-
}
1092+
require.NotContains(btcc.TB(), proposeChangesResponse.Properties, "Error-Domain", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody)
1093+
require.NotContains(btcc.TB(), proposeChangesResponse.Properties, "Error-Code", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody)
10981094

10991095
base.DebugfCtx(ctx, base.KeySGTest, "proposeChanges response: %s", string(rspBody))
11001096

@@ -1125,10 +1121,7 @@ func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOpt
11251121
revRequest.Properties[db.RevMessageHistory] = change.historyStr()
11261122

11271123
doc, ok := btcc.getClientDoc(change.docID)
1128-
if !ok {
1129-
btcc.TB().Errorf("doc %s not found in _seqFromDocID", change.docID)
1130-
return
1131-
}
1124+
require.True(btcc.TB(), ok, "docID %q not found in _seqFromDocID", change.docID)
11321125
doc.lock.RLock()
11331126
serverRev := doc._revisionsBySeq[doc._seqsByVersions[change.latestServerVersion]]
11341127
docBody := doc._revisionsBySeq[doc._seqsByVersions[change.version]].body
@@ -1153,17 +1146,11 @@ func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOpt
11531146
base.DebugfCtx(ctx, base.KeySGTest, "sent doc %s / %v", change.docID, change.version)
11541147
// block until remote has actually processed the rev and sent a response
11551148
revResp := revRequest.Response()
1156-
if revResp.Properties[db.BlipErrorCode] != "" {
1157-
btcc.TB().Errorf("error response from rev: %s", revResp.Properties["Error-Domain"])
1158-
return
1159-
}
1149+
require.NotContains(btcc.TB(), revResp.Properties, "Error-Domain", "unexpected error response from rev %v: %s", revResp)
11601150
base.DebugfCtx(ctx, base.KeySGTest, "peer acked rev %s / %v", change.docID, change.version)
11611151
btcc.updateLastReplicatedRev(change.docID, change.version)
11621152
doc, ok = btcc.getClientDoc(change.docID)
1163-
if !ok {
1164-
btcc.TB().Errorf("doc %s not found in _seqFromDocID", change.docID)
1165-
return
1166-
}
1153+
require.True(btcc.TB(), ok, "docID %q not found in _seqFromDocID", change.docID)
11671154
doc.lock.Lock()
11681155
rev := doc._revisionsBySeq[doc._seqsByVersions[change.version]]
11691156
rev.message = revRequest
@@ -1390,14 +1377,9 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe
13901377
proposeChangesResponse := proposeChangesRequest.Response()
13911378
rspBody, err := proposeChangesResponse.Body()
13921379
require.NoError(btc.TB(), err)
1393-
errorDomain := proposeChangesResponse.Properties["Error-Domain"]
1394-
errorCode := proposeChangesResponse.Properties["Error-Code"]
1395-
if errorDomain != "" && errorCode != "" {
1396-
return nil, fmt.Errorf("error %s %s from proposeChanges with body: %s", errorDomain, errorCode, string(rspBody))
1397-
}
1398-
if string(rspBody) != `[]` {
1399-
return nil, fmt.Errorf("unexpected body in proposeChangesResponse: %s", string(rspBody))
1400-
}
1380+
require.NotContains(btc.TB(), proposeChangesResponse.Properties, "Error-Domain", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody)
1381+
require.NotContains(btc.TB(), proposeChangesResponse.Properties, "Error-Code", "unexpected error response from proposeChanges: %v, %s", proposeChangesResponse, rspBody)
1382+
require.Equal(btc.TB(), "[]", string(rspBody))
14011383

14021384
// send msg rev with new doc
14031385
revRequest := blip.NewRequest()
@@ -1591,6 +1573,7 @@ func (btc *BlipTesterCollectionClient) WaitForBlipRevMessage(docID string, docVe
15911573
msg, ok = btc.GetBlipRevMessage(docID, docVersion)
15921574
assert.True(c, ok, "Could not find docID:%+v, RevID: %+v", docID, docVersion.RevID)
15931575
}, 10*time.Second, 5*time.Millisecond, "BlipTesterReplicator timed out waiting for BLIP message")
1576+
require.NotNil(btc.TB(), msg)
15941577
return msg
15951578
}
15961579

@@ -1604,6 +1587,7 @@ func (btc *BlipTesterCollectionClient) GetBlipRevMessage(docID string, version D
16041587
defer doc.lock.RUnlock()
16051588
if seq, ok := doc._seqsByVersions[version]; ok {
16061589
if rev, ok := doc._revisionsBySeq[seq]; ok {
1590+
require.NotNil(btc.TB(), rev.message, "rev.message is nil for docID:%+v, version: %+v", docID, version)
16071591
return rev.message, true
16081592
}
16091593
}

0 commit comments

Comments
 (0)