Skip to content

Commit 4006fa4

Browse files
bbrkstorcolvin
andauthored
CBG-4884: Skip removal of obsolete attachments if ECCV is set (#7783)
Co-authored-by: Tor Colvin <[email protected]>
1 parent 43129a0 commit 4006fa4

File tree

5 files changed

+260
-73
lines changed

5 files changed

+260
-73
lines changed

db/crud.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2685,6 +2685,11 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
26852685
skipObsoleteAttachmentsRemoval := false
26862686
isNewDocCreation := false
26872687

2688+
// Don't remove obsolete attachments if using ECCV - the other cluster may still need them!
2689+
if db.dbCtx.CachedCCVEnabled.Load() {
2690+
skipObsoleteAttachmentsRemoval = true
2691+
}
2692+
26882693
if db.UseXattrs() || upgradeInProgress {
26892694
var casOut uint64
26902695
// Update the document, storing metadata in extended attribute

rest/attachment_test.go

Lines changed: 96 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,60 +2055,76 @@ func TestAttachmentRemovalWithConflicts(t *testing.T) {
20552055
rt := NewRestTester(t, nil)
20562056

20572057
defer rt.Close()
2058-
20592058
rt.GetDatabase().EnableAllowConflicts(rt.TB())
2060-
const docID = "doc"
2061-
// Create doc rev 1
2062-
version := rt.PutDoc(docID, `{"test": "x"}`)
20632059

2064-
// Create doc rev 2 with attachment
2065-
version = rt.UpdateDoc(docID, version, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)
2060+
testCases := []struct {
2061+
name string
2062+
eccv bool
2063+
}{
2064+
{name: "no eccv", eccv: false},
2065+
{name: "eccv", eccv: true},
2066+
}
2067+
for _, tc := range testCases {
2068+
t.Run(tc.name, func(t *testing.T) {
2069+
rt.GetDatabase().CachedCCVEnabled.Store(tc.eccv)
20662070

2067-
// Create doc rev 3 referencing previous attachment
2068-
losingVersion3 := rt.UpdateDoc(docID, version, `{"_attachments": {"hello.txt": {"revpos":2,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`)
2071+
docID := db.SafeDocumentName(t, t.Name())
2072+
// Create doc rev 1
2073+
version := rt.PutDoc(docID, `{"test": "x"}`)
20692074

2070-
// Create doc conflicting with previous revid referencing previous attachment too
2071-
winningVersion3 := rt.PutNewEditsFalse(docID, NewDocVersionFromFakeRev("3-b"), &version, `{"_attachments": {"hello.txt": {"revpos":2,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}, "Winning Rev": true}`)
2075+
// Create doc rev 2 with attachment
2076+
version = rt.UpdateDoc(docID, version, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)
20722077

2073-
// Update the winning rev 3 and ensure attachment remains around as the other leaf still references this attachment
2074-
finalVersion4 := rt.UpdateDoc(docID, *winningVersion3, `{"update": 2}`)
2078+
// Create doc rev 3 referencing previous attachment
2079+
losingVersion3 := rt.UpdateDoc(docID, version, `{"_attachments": {"hello.txt": {"revpos":2,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`)
20752080

2076-
type docResp struct {
2077-
Attachments db.AttachmentMap `json:"_attachments"`
2078-
}
2081+
// Create doc conflicting with previous revid referencing previous attachment too
2082+
winningVersion3 := rt.PutNewEditsFalse(docID, NewDocVersionFromFakeRev("3-b"), &version, `{"_attachments": {"hello.txt": {"revpos":2,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}, "Winning Rev": true}`)
20792083

2080-
var doc1 docResp
2081-
// Get losing rev and ensure attachment is still there and has not been deleted
2082-
resp := rt.SendAdminRequestWithHeaders("GET", "/{{.keyspace}}/doc?attachments=true&rev="+losingVersion3.RevTreeID, "", map[string]string{"Accept": "application/json"})
2083-
RequireStatus(t, resp, http.StatusOK)
2084+
// Update the winning rev 3 and ensure attachment remains around as the other leaf still references this attachment
2085+
finalVersion4 := rt.UpdateDoc(docID, *winningVersion3, `{"update": 2}`)
20842086

2085-
err := base.JSONUnmarshal(resp.BodyBytes(), &doc1)
2086-
assert.NoError(t, err)
2087-
require.Contains(t, doc1.Attachments, "hello.txt")
2088-
require.Equal(t, db.DocAttachment{
2089-
Digest: "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=",
2090-
Length: 11,
2091-
Revpos: 2,
2092-
Data: []byte("hello world"),
2093-
}, doc1.Attachments["hello.txt"])
2094-
2095-
attachmentKey := db.MakeAttachmentKey(2, "doc", doc1.Attachments["hello.txt"].Digest)
2096-
2097-
var doc2 docResp
2098-
// Get winning rev and ensure attachment is indeed removed from this rev
2099-
resp = rt.SendAdminRequestWithHeaders("GET", "/{{.keyspace}}/doc?attachments=true&rev="+finalVersion4.RevTreeID, "", map[string]string{"Accept": "application/json"})
2100-
RequireStatus(t, resp, http.StatusOK)
2087+
type docResp struct {
2088+
Attachments db.AttachmentMap `json:"_attachments"`
2089+
}
21012090

2102-
err = base.JSONUnmarshal(resp.BodyBytes(), &doc2)
2103-
assert.NoError(t, err)
2104-
require.NotContains(t, doc2.Attachments, "hello.txt")
2091+
var doc1 docResp
2092+
// Get losing rev and ensure attachment is still there and has not been deleted
2093+
resp := rt.SendAdminRequestWithHeaders("GET", "/{{.keyspace}}/"+docID+"?attachments=true&rev="+losingVersion3.RevTreeID, "", map[string]string{"Accept": "application/json"})
2094+
RequireStatus(t, resp, http.StatusOK)
2095+
2096+
err := base.JSONUnmarshal(resp.BodyBytes(), &doc1)
2097+
assert.NoError(t, err)
2098+
require.Contains(t, doc1.Attachments, "hello.txt")
2099+
require.Equal(t, db.DocAttachment{
2100+
Digest: "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=",
2101+
Length: 11,
2102+
Revpos: 2,
2103+
Data: []byte("hello world"),
2104+
}, doc1.Attachments["hello.txt"])
2105+
2106+
attachmentKey := db.MakeAttachmentKey(2, docID, doc1.Attachments["hello.txt"].Digest)
2107+
2108+
var doc2 docResp
2109+
// Get winning rev and ensure attachment is indeed removed from this rev
2110+
resp = rt.SendAdminRequestWithHeaders("GET", "/{{.keyspace}}/"+docID+"?attachments=true&rev="+finalVersion4.RevTreeID, "", map[string]string{"Accept": "application/json"})
2111+
RequireStatus(t, resp, http.StatusOK)
2112+
2113+
err = base.JSONUnmarshal(resp.BodyBytes(), &doc2)
2114+
assert.NoError(t, err)
2115+
require.NotContains(t, doc2.Attachments, "hello.txt")
21052116

2106-
// Now remove the attachment in the losing rev by deleting the revision and ensure the attachment gets deleted
2107-
rt.DeleteDoc(docID, losingVersion3)
2117+
// Now remove the attachment in the losing rev by deleting the revision and ensure the attachment gets deleted
2118+
rt.DeleteDoc(docID, losingVersion3)
21082119

2109-
_, _, err = rt.GetSingleDataStore().GetRaw(attachmentKey)
2110-
assert.Error(t, err)
2111-
assert.True(t, base.IsDocNotFoundError(err))
2120+
_, _, err = rt.GetSingleDataStore().GetRaw(attachmentKey)
2121+
if rt.GetDatabase().CachedCCVEnabled.Load() {
2122+
require.NoError(t, err, "Attachment should not be deleted as eccv is enabled")
2123+
} else {
2124+
base.RequireDocNotFoundError(t, err)
2125+
}
2126+
})
2127+
}
21122128
}
21132129

21142130
func TestAttachmentsMissing(t *testing.T) {
@@ -2208,36 +2224,45 @@ func TestAttachmentDeleteOnExpiry(t *testing.T) {
22082224

22092225
dataStore := rt.GetSingleDataStore()
22102226

2211-
// Create doc with attachment and expiry
2212-
resp := rt.SendAdminRequest("PUT", "/{{.keyspace}}/"+t.Name(), `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}, "_exp": 1}`)
2213-
RequireStatus(t, resp, http.StatusCreated)
2214-
2215-
// Wait for document to be expired - this bucket get should also trigger the expiry purge interval
2216-
require.EventuallyWithT(t, func(c *assert.CollectT) {
2217-
_, _, err := dataStore.GetRaw(t.Name())
2218-
assert.True(c, base.IsDocNotFoundError(err), "expected err %v to be doc not found", err)
2219-
}, time.Second*10, time.Millisecond*10)
2220-
2221-
if base.TestUseXattrs() {
2222-
require.EventuallyWithT(t, func(c *assert.CollectT) {
2223-
assert.Equal(c, int64(1), rt.GetDatabase().DbStats.SharedBucketImport().ImportCount.Value())
2224-
}, time.Second*10, time.Millisecond*5)
2225-
} else {
2226-
// Trigger OnDemand Import for that doc to trigger tombstone
2227-
resp := rt.SendAdminRequest("GET", "/{{.keyspace}}/"+t.Name(), "")
2228-
RequireStatus(t, resp, http.StatusNotFound)
2227+
testCases := []struct {
2228+
name string
2229+
eccv bool
2230+
}{
2231+
{name: "no-eccv", eccv: false},
2232+
{name: "eccv", eccv: true},
22292233
}
2230-
att2Key := db.MakeAttachmentKey(db.AttVersion2, t.Name(), "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=")
2231-
2232-
// With xattrs doc will be imported and will be captured as tombstone and therefore purge attachments
2233-
// Otherwise attachment will not be purged
2234-
_, _, err := dataStore.GetRaw(att2Key)
2235-
if base.TestUseXattrs() {
2236-
base.RequireDocNotFoundError(t, err)
2237-
} else {
2238-
assert.NoError(t, err)
2234+
for _, tc := range testCases {
2235+
t.Run(tc.name, func(t *testing.T) {
2236+
lastImportCount := rt.GetDatabase().DbStats.SharedBucketImport().ImportCount.Value()
2237+
2238+
rt.GetDatabase().CachedCCVEnabled.Store(tc.eccv)
2239+
docID := db.SafeDocumentName(t, t.Name())
2240+
// Create doc with attachment and expiry
2241+
resp := rt.SendAdminRequest("PUT", "/{{.keyspace}}/"+docID, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}, "_exp": 1}`)
2242+
RequireStatus(t, resp, http.StatusCreated)
2243+
2244+
// Wait for document to be expired - this bucket get should also trigger the expiry purge interval
2245+
require.EventuallyWithT(t, func(c *assert.CollectT) {
2246+
_, _, err := dataStore.GetRaw(docID)
2247+
assert.True(c, base.IsDocNotFoundError(err), "expected err %v to be doc not found", err)
2248+
}, time.Second*10, time.Millisecond*10)
2249+
2250+
require.EventuallyWithT(t, func(c *assert.CollectT) {
2251+
newImportCount := rt.GetDatabase().DbStats.SharedBucketImport().ImportCount.Value() - lastImportCount
2252+
assert.Equal(c, int64(1), newImportCount)
2253+
}, time.Second*10, time.Millisecond*5)
2254+
att2Key := db.MakeAttachmentKey(db.AttVersion2, docID, "sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0=")
2255+
2256+
// With xattrs doc will be imported and will be captured as tombstone and therefore purge attachments
2257+
// Otherwise attachment will not be purged
2258+
_, _, err := dataStore.GetRaw(att2Key)
2259+
if rt.GetDatabase().CachedCCVEnabled.Load() {
2260+
assert.NoError(t, err)
2261+
} else {
2262+
base.RequireDocNotFoundError(t, err)
2263+
}
2264+
})
22392265
}
2240-
22412266
}
22422267

22432268
// TestUpdateViaBlipMigrateAttachment:

rest/audit_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,10 @@ func TestAuditAttachmentEvents(t *testing.T) {
11261126
requireAttachmentEvents(rt, base.AuditIDAttachmentCreate, output, docID, postAttachmentVersion.RevTreeID, attachmentName, testCase.attachmentCreateCount)
11271127
requireAttachmentEvents(rt, base.AuditIDAttachmentRead, output, docID, postAttachmentVersion.RevTreeID, attachmentName, testCase.attachmentReadCount)
11281128
requireAttachmentEvents(rt, base.AuditIDAttachmentUpdate, output, docID, postAttachmentVersion.RevTreeID, attachmentName, testCase.attachmentUpdateCount)
1129-
requireAttachmentEvents(rt, base.AuditIDAttachmentDelete, output, docID, postAttachmentVersion.RevTreeID, attachmentName, testCase.attachmentDeleteCount)
1129+
// attachments get auto-deleted only without CCV
1130+
if !rt.GetDatabase().CachedCCVEnabled.Load() {
1131+
requireAttachmentEvents(rt, base.AuditIDAttachmentDelete, output, docID, postAttachmentVersion.RevTreeID, attachmentName, testCase.attachmentDeleteCount)
1132+
}
11301133

11311134
})
11321135
}

xdcr/attachment_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Copyright 2025-Present Couchbase, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License included
4+
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
5+
// in that file, in accordance with the Business Source License, use of this
6+
// software will be governed by the Apache License, Version 2.0, included in
7+
// the file licenses/APL2.txt.
8+
9+
package xdcr
10+
11+
import (
12+
"encoding/base64"
13+
"net/http"
14+
"testing"
15+
"time"
16+
17+
"github.com/couchbase/sync_gateway/base"
18+
"github.com/couchbase/sync_gateway/rest"
19+
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
21+
)
22+
23+
// TestMultiActorLosingConflictUpdateRemovingAttachments
24+
// Removes attachments on a losing conflicting document and ensures that attachments remain present in the bucket and accessible on both documents on either side.
25+
//
26+
// 1. Create a document with an attachment on Actor A
27+
// 2. Replicate to Actor B
28+
// 3. Stop replications
29+
// 4. Update the document on Actor A, removing the attachment
30+
// 5. Update the document on Actor B, changing the body (twice to ensure MWW resolves this as the winner as well as LWW)
31+
// 6. Start replications
32+
// 7. Observe resolved conflict on both Actor A and Actor B, with the attachment still present
33+
func TestMultiActorLosingConflictUpdateRemovingAttachments(t *testing.T) {
34+
base.RequireNumTestBuckets(t, 2)
35+
36+
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)
37+
38+
// turn off auto import - since we want reliable XDCR stats and don't want MOU/import echos to interfere
39+
rtA := rest.NewRestTester(t, &rest.RestTesterConfig{AutoImport: base.Ptr(false)})
40+
defer rtA.Close()
41+
rtB := rest.NewRestTester(t, &rest.RestTesterConfig{AutoImport: base.Ptr(false)})
42+
defer rtB.Close()
43+
44+
ctx := base.TestCtx(t)
45+
opts := XDCROptions{Mobile: MobileOn}
46+
47+
// Set up bi-directional XDCR
48+
xdcrAtoB, err := NewXDCR(ctx, rtA.Bucket(), rtB.Bucket(), opts)
49+
require.NoError(t, err)
50+
require.NoError(t, xdcrAtoB.Start(ctx))
51+
xdcrBtoA, err := NewXDCR(ctx, rtB.Bucket(), rtA.Bucket(), opts)
52+
require.NoError(t, err)
53+
require.NoError(t, xdcrBtoA.Start(ctx))
54+
55+
defer func() {
56+
// stop XDCR, will already be stopped if test doesn't fail early
57+
if err := xdcrAtoB.Stop(ctx); err != nil {
58+
assert.Equal(t, ErrReplicationNotRunning, err)
59+
}
60+
if err := xdcrBtoA.Stop(ctx); err != nil {
61+
assert.Equal(t, ErrReplicationNotRunning, err)
62+
}
63+
}()
64+
65+
const (
66+
docID = "doc1"
67+
attachmentID = "hello.txt"
68+
)
69+
attachment := base64.StdEncoding.EncodeToString([]byte("Hello World!"))
70+
71+
rtAVersion := rtA.PutDocWithAttachment(docID, `{"key":"value"}`, attachmentID, attachment)
72+
73+
// fetch attachment via REST API
74+
attAResp := rtA.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+docID+"/"+attachmentID, "")
75+
rest.RequireStatus(t, attAResp, http.StatusOK)
76+
77+
// wait for doc to replicate to rtB
78+
var rtBVersion rest.DocVersion
79+
require.EventuallyWithT(t, func(c *assert.CollectT) {
80+
rtBVersion, _ = rtB.GetDoc(docID)
81+
assert.Equal(c, rtAVersion.CV.String(), rtBVersion.CV.String())
82+
assert.Equal(c, rtAVersion.RevTreeID, rtBVersion.RevTreeID)
83+
}, time.Second*5, time.Millisecond*100)
84+
85+
// wait for XDCR stats to ensure attachment data also made it over
86+
require.EventuallyWithT(t, func(c *assert.CollectT) {
87+
stats, err := xdcrAtoB.Stats(ctx)
88+
require.NoError(c, err)
89+
// doc and attachment doc
90+
assert.Equalf(c, uint64(2), stats.DocsWritten, "expected doc and attachment to be replicated")
91+
}, time.Second*5, time.Millisecond*100)
92+
93+
// stop replication
94+
require.NoError(t, xdcrAtoB.Stop(ctx))
95+
require.NoError(t, xdcrBtoA.Stop(ctx))
96+
97+
// fetch attachment via REST API
98+
attBResp := rtB.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+docID+"/"+attachmentID, "")
99+
rest.RequireStatus(t, attBResp, http.StatusOK)
100+
101+
// update doc on A, removing attachment
102+
rtAVersion = rtA.UpdateDoc(docID, rtAVersion, `{"key":"value2"}`)
103+
104+
// update doc on B, changing body but keeping attachment stub (twice to ensure MWW resolves this as the winner as well as LWW)
105+
rtBVersion = rtB.UpdateDoc(docID, rtBVersion, `{"key":"value3","_attachments":{"`+attachmentID+`":{"stub":true}}}`)
106+
rtBVersion = rtB.UpdateDoc(docID, rtBVersion, `{"key":"value4","_attachments":{"`+attachmentID+`":{"stub":true}}}`)
107+
108+
// start replication
109+
require.NoError(t, xdcrAtoB.Start(ctx))
110+
require.NoError(t, xdcrBtoA.Start(ctx))
111+
112+
// wait for XDCR stats to ensure attachment data also made it over
113+
var expectedDocsWritten uint64 = 0 // attachment deletion (or lack of)
114+
// Rosmar's XDCR implementation differs in two ways:
115+
// 1. Stats don't get reset on restart
116+
// 2. No DCP checkpointing - so there's always more TargetNewerDocs than expected even if we reset stats
117+
if !base.TestUseCouchbaseServer() {
118+
expectedDocsWritten = expectedDocsWritten + 2 // (old replication stats: 1 doc + 1 attachment)
119+
}
120+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
121+
stats, err := xdcrAtoB.Stats(ctx)
122+
require.NoError(c, err)
123+
assert.Equalf(c, expectedDocsWritten, stats.DocsWritten, "unexpected additional mutation replicated (an attachment delete?)")
124+
}, time.Second*5, time.Millisecond*100)
125+
126+
// wait for XDCR stats to ensure attachment data also made it over
127+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
128+
stats, err := xdcrBtoA.Stats(ctx)
129+
require.NoError(c, err)
130+
assert.Equalf(c, uint64(1), stats.DocsWritten, "expected resolved conflict to be replicated back to A")
131+
}, time.Second*5, time.Millisecond*100)
132+
133+
// wait for doc (resolved conflict) to replicate back to rtA
134+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
135+
currentRtAVersion, _ := rtA.GetDoc(docID)
136+
assert.Equal(c, rtBVersion.CV.String(), currentRtAVersion.CV.String())
137+
}, time.Second*10, time.Millisecond*100)
138+
139+
// check attachment metadata exists
140+
docA := rtA.GetDocument(docID)
141+
docB := rtB.GetDocument(docID)
142+
assert.Equal(t, docA.Attachments(), docB.Attachments())
143+
assert.Contains(t, docA.Attachments(), attachmentID)
144+
assert.Contains(t, docB.Attachments(), attachmentID)
145+
146+
// check attachment contents are retrievable
147+
attAResp = rtA.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+docID+"/"+attachmentID, "")
148+
rest.AssertStatus(t, attAResp, http.StatusOK)
149+
attBResp = rtB.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/"+docID+"/"+attachmentID, "")
150+
rest.AssertStatus(t, attBResp, http.StatusOK)
151+
152+
require.NoError(t, xdcrAtoB.Stop(ctx))
153+
require.NoError(t, xdcrBtoA.Stop(ctx))
154+
}

xdcr/rosmar_xdcr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
178178
*/
179179

180180
if conflictResolutionSourceCas <= conflictResolutionTargetCas {
181-
base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s skipping replication since sourceCas (%d) < targetCas (%d)", docID, conflictResolutionSourceCas, conflictResolutionTargetCas)
181+
base.InfofCtx(ctx, base.KeyVV, "XDCR doc:%s skipping replication since sourceCas (%d) <= targetCas (%d)", docID, conflictResolutionSourceCas, conflictResolutionTargetCas)
182182
r.targetNewerDocs.Add(1)
183183
return true
184184
} /* else if sourceCas == targetCas {

0 commit comments

Comments
 (0)