Skip to content

Commit 0afb808

Browse files
authored
CBG-4578: have maximum threshold for releasing sequences in nextSequenceGreaterThan (#7436)
1 parent 06ce005 commit 0afb808

File tree

8 files changed

+227
-0
lines changed

8 files changed

+227
-0
lines changed

base/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ var (
7777

7878
// ErrSkippedSequencesMissing is returned when attempting to remove a sequence range form the skipped sequence list and at least one sequence in that range is not present
7979
ErrSkippedSequencesMissing = &sgError{"Sequence range has sequences that aren't present in skipped list"}
80+
81+
// ErrMaxSequenceReleasedExceeded is returned when the maximum number of sequences to be released as part of nextSequenceGreaterThan is exceeded
82+
ErrMaxSequenceReleasedExceeded = &sgError{"Maximum number of sequences to release to catch up with document sequence exceeded"}
8083
)
8184

8285
func (e *sgError) Error() string {

base/stats.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,8 @@ type DatabaseStats struct {
645645
SequenceReleasedCount *SgwIntStat `json:"sequence_released_count"`
646646
// The total number of sequences reserved by Sync Gateway.
647647
SequenceReservedCount *SgwIntStat `json:"sequence_reserved_count"`
648+
// The total number of corrupt sequences above the MaxSequencesToRelease threshold seen at the sequence allocator
649+
CorruptSequenceCount *SgwIntStat `json:"corrupt_sequence_count"`
648650
// The total number of warnings relating to the channel name size.
649651
WarnChannelNameSizeCount *SgwIntStat `json:"warn_channel_name_size_count"`
650652
// The total number of warnings relating to the channel count exceeding the channel count threshold.
@@ -1754,6 +1756,10 @@ func (d *DbStats) initDatabaseStats() error {
17541756
if err != nil {
17551757
return err
17561758
}
1759+
resUtil.CorruptSequenceCount, err = NewIntStat(SubsystemDatabaseKey, "corrupt_sequence_count", StatUnitNoUnits, CorruptSequenceCountDesc, StatAddedVersion3dot2dot4, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
1760+
if err != nil {
1761+
return err
1762+
}
17571763
resUtil.WarnChannelNameSizeCount, err = NewIntStat(SubsystemDatabaseKey, "warn_channel_name_size_count", StatUnitNoUnits, WarnChannelNameSizeCountDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0)
17581764
if err != nil {
17591765
return err
@@ -1839,6 +1845,7 @@ func (d *DbStats) unregisterDatabaseStats() {
18391845
prometheus.Unregister(d.DatabaseStats.SequenceIncrCount)
18401846
prometheus.Unregister(d.DatabaseStats.SequenceReleasedCount)
18411847
prometheus.Unregister(d.DatabaseStats.SequenceReservedCount)
1848+
prometheus.Unregister(d.DatabaseStats.CorruptSequenceCount)
18421849
prometheus.Unregister(d.DatabaseStats.WarnChannelNameSizeCount)
18431850
prometheus.Unregister(d.DatabaseStats.WarnChannelsPerDocCount)
18441851
prometheus.Unregister(d.DatabaseStats.WarnGrantsPerDocCount)

base/stats_descriptions.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ const (
320320

321321
TotalInitFatalErrorsDesc = "The total number of errors that occurred that prevented the database from being initialized."
322322
TotalOnlineFatalErrorsDesc = "The total number of errors that occurred that prevented the database from being brought online."
323+
324+
CorruptSequenceCountDesc = "The total number of corrupt sequences detected at the sequence allocator. Documents that have a corrupt " +
325+
"sequence that trigger release of sequences above the MaxSequenceToRelease threshold will have their update cancelled."
323326
)
324327

325328
// Delta Sync stats descriptions

db/crud.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1882,6 +1882,9 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d
18821882

18831883
unusedSequences, err = col.assignSequence(ctx, previousDocSequenceIn, doc, unusedSequences)
18841884
if err != nil {
1885+
if errors.Is(err, base.ErrMaxSequenceReleasedExceeded) {
1886+
base.ErrorfCtx(ctx, "Doc %s / %s had a much larger sequence (%d) than the current sequence number. Document update will be cancelled, since we don't want to allocate sequences to fill a gap this large. This may indicate document metadata being migrated between databases where it should've been stripped and re-imported.", base.UD(newDoc.ID), prevCurrentRev, doc.Sequence)
1887+
}
18851888
return
18861889
}
18871890

db/crud_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,3 +1761,46 @@ func TestReleaseSequenceOnDocWriteFailure(t *testing.T) {
17611761
assert.Equal(t, int64(1), db.DbStats.Database().SequenceReleasedCount.Value())
17621762
}, time.Second*10, time.Millisecond*100)
17631763
}
1764+
1765+
func TestDocUpdateCorruptSequence(t *testing.T) {
1766+
if !base.TestUseXattrs() {
1767+
t.Skip("This test only works with XATTRS enabled")
1768+
}
1769+
base.SetUpTestLogging(t, base.LevelInfo, base.KeyCache, base.KeyChanges, base.KeyCRUD, base.KeyDCP)
1770+
1771+
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{})
1772+
defer db.Close(ctx)
1773+
1774+
// create a sequence much higher than _syc:seqs value
1775+
const corruptSequence = MaxSequencesToRelease + 1000
1776+
1777+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
1778+
rev, doc, err := collection.Put(ctx, "doc1", Body{"foo": "bar"})
1779+
require.NoError(t, err)
1780+
docRev := doc.RevID
1781+
t.Logf("doc sequence: %d", doc.Sequence)
1782+
1783+
// but we can fiddle with the sequence in the metadata of the doc write to simulate a doc from a different cluster (with a higher sequence)
1784+
_, xattrs, _, err := collection.dataStore.GetWithXattrs(ctx, "doc1", []string{base.SyncXattrName})
1785+
require.NoError(t, err)
1786+
var newSyncData map[string]interface{}
1787+
err = json.Unmarshal(xattrs[base.SyncXattrName], &newSyncData)
1788+
require.NoError(t, err)
1789+
newSyncData["sequence"] = corruptSequence
1790+
_, err = collection.dataStore.UpdateXattrs(ctx, doc.ID, 0, doc.Cas, map[string][]byte{base.SyncXattrName: base.MustJSONMarshal(t, newSyncData)}, DefaultMutateInOpts())
1791+
require.NoError(t, err)
1792+
1793+
_, _, err = collection.Put(ctx, "doc1", Body{"foo": "buzz", BodyRev: rev})
1794+
require.Error(t, err)
1795+
require.ErrorIs(t, err, base.ErrMaxSequenceReleasedExceeded)
1796+
1797+
// assert update to doc was cancelled thus doc1 is its original version
1798+
doc, err = collection.GetDocument(ctx, "doc1", DocUnmarshalAll)
1799+
require.NoError(t, err)
1800+
assert.Equal(t, uint64(corruptSequence), doc.Sequence)
1801+
assert.Equal(t, docRev, doc.RevID)
1802+
1803+
base.RequireWaitForStat(t, func() int64 {
1804+
return db.DbStats.Database().CorruptSequenceCount.Value()
1805+
}, 1)
1806+
}

db/import_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ package db
1212

1313
import (
1414
"context"
15+
"encoding/json"
1516
"fmt"
1617
"log"
18+
"net/http"
1719
"testing"
1820
"time"
1921

@@ -1019,3 +1021,99 @@ func getSyncAndMou(t *testing.T, collection *DatabaseCollectionWithUser, key str
10191021
return syncData, mou, cas
10201022

10211023
}
1024+
1025+
func TestImportCancelOnDocWithCorruptSequenceOverImportFeed(t *testing.T) {
1026+
if !base.TestUseXattrs() {
1027+
t.Skip("This test only works with XATTRS enabled")
1028+
}
1029+
1030+
base.SetUpTestLogging(t, base.LevelInfo, base.KeyImport, base.KeyCRUD)
1031+
db, ctx := setupTestDBWithOptionsAndImport(t, nil, DatabaseContextOptions{})
1032+
defer db.Close(ctx)
1033+
1034+
// create a sequence much higher than _syc:seqs value
1035+
const corruptSequence = MaxSequencesToRelease + 1000
1036+
1037+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
1038+
1039+
key := t.Name()
1040+
bodyBytes := []byte(`{"foo":"bar"}`)
1041+
// Create via the SDK
1042+
_, err := collection.dataStore.AddRaw(key, 0, bodyBytes)
1043+
require.NoError(t, err)
1044+
1045+
base.RequireWaitForStat(t, func() int64 {
1046+
return db.DbStats.SharedBucketImport().ImportCount.Value()
1047+
}, 1)
1048+
1049+
_, xattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, key, []string{base.SyncXattrName})
1050+
require.NoError(t, err)
1051+
1052+
// corrupt the document sequence
1053+
var newSyncData map[string]interface{}
1054+
err = json.Unmarshal(xattrs[base.SyncXattrName], &newSyncData)
1055+
require.NoError(t, err)
1056+
newSyncData["sequence"] = corruptSequence
1057+
_, err = collection.dataStore.UpdateXattrs(ctx, key, 0, cas, map[string][]byte{base.SyncXattrName: base.MustJSONMarshal(t, newSyncData)}, DefaultMutateInOpts())
1058+
require.NoError(t, err)
1059+
1060+
// sdk update to trigger import
1061+
require.NoError(t, collection.dataStore.SetRaw(key, 0, nil, []byte(`{"foo":"baz"}`)))
1062+
1063+
base.RequireWaitForStat(t, func() int64 {
1064+
return db.DbStats.SharedBucketImport().ImportErrorCount.Value()
1065+
}, 1)
1066+
base.RequireWaitForStat(t, func() int64 {
1067+
return db.DbStats.Database().CorruptSequenceCount.Value()
1068+
}, 1)
1069+
}
1070+
1071+
func TestImportCancelOnDocWithCorruptSequenceOndemand(t *testing.T) {
1072+
if !base.TestUseXattrs() {
1073+
t.Skip("This test only works with XATTRS enabled")
1074+
}
1075+
1076+
base.SetUpTestLogging(t, base.LevelInfo, base.KeyImport, base.KeyCRUD)
1077+
tb := base.GetTestBucket(t)
1078+
defer tb.Close(base.TestCtx(t))
1079+
db, ctx := SetupTestDBForDataStoreWithOptions(t, tb, DatabaseContextOptions{})
1080+
key := t.Name()
1081+
1082+
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
1083+
1084+
_, _, err := collection.Put(ctx, key, Body{"foo": "bar"})
1085+
require.NoError(t, err)
1086+
1087+
// create a sequence much higher than _syc:seqs value
1088+
const corruptSequence = MaxSequencesToRelease + 1000
1089+
1090+
_, xattrs, cas, err := collection.dataStore.GetWithXattrs(ctx, key, []string{base.SyncXattrName})
1091+
require.NoError(t, err)
1092+
1093+
// corrupt the document sequence
1094+
var newSyncData map[string]interface{}
1095+
err = json.Unmarshal(xattrs[base.SyncXattrName], &newSyncData)
1096+
require.NoError(t, err)
1097+
newSyncData["sequence"] = corruptSequence
1098+
_, err = collection.dataStore.UpdateXattrs(ctx, key, 0, cas, map[string][]byte{base.SyncXattrName: base.MustJSONMarshal(t, newSyncData)}, DefaultMutateInOpts())
1099+
require.NoError(t, err)
1100+
1101+
// sdk update
1102+
require.NoError(t, collection.dataStore.SetRaw(key, 0, nil, []byte(`{"foo":"baz"}`)))
1103+
1104+
// trigger on demand import
1105+
_, err = collection.GetDocument(ctx, key, DocUnmarshalAll)
1106+
require.Error(t, err)
1107+
var httpErr *base.HTTPError
1108+
require.ErrorAs(t, err, &httpErr)
1109+
assert.Equal(t, http.StatusNotFound, httpErr.Status)
1110+
1111+
// verify that the document was not imported
1112+
base.RequireWaitForStat(t, func() int64 {
1113+
return db.DbStats.SharedBucketImport().ImportErrorCount.Value()
1114+
}, 1)
1115+
base.RequireWaitForStat(t, func() int64 {
1116+
return db.DbStats.Database().CorruptSequenceCount.Value()
1117+
}, 1)
1118+
1119+
}

db/sequence_allocator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ const (
3939
// Accounts for a maximum SG cluster size of 50 nodes, each allocating a full batch size of 10
4040
// if this value is too low and this correction has potential to allocate sequences that other nodes have already reserved a batch for
4141
syncSeqCorrectionValue = 500
42+
43+
// Maximum number of sequences to release as part of nextSequenceGreaterThan
44+
MaxSequencesToRelease = 10000000
4245
)
4346

4447
// MaxSequenceIncrFrequency is the maximum frequency we want to perform incr operations.
@@ -286,6 +289,14 @@ func (s *sequenceAllocator) nextSequenceGreaterThan(ctx context.Context, existin
286289
numberToRelease := existingSequence - syncSeq
287290
numberToAllocate := s.sequenceBatchSize
288291
incrVal := numberToRelease + numberToAllocate
292+
293+
// if sequences to release are above the max allowed, return error to cancel update
294+
if numberToRelease > MaxSequencesToRelease {
295+
s.mutex.Unlock()
296+
s.dbStats.CorruptSequenceCount.Add(1) // increment corrupt sequence count
297+
return 0, 0, base.ErrMaxSequenceReleasedExceeded
298+
}
299+
289300
allocatedToSeq, err := s.incrementSequence(incrVal)
290301
if err != nil {
291302
base.WarnfCtx(ctx, "Error from incrementSequence in nextSequenceGreaterThan(%d): %v", existingSequence, err)

rest/adminapitest/resync_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package adminapitest
1010

1111
import (
12+
"encoding/json"
1213
"fmt"
1314
"net/http"
1415
"testing"
@@ -69,6 +70,64 @@ func TestResyncRollback(t *testing.T) {
6970
status = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
7071
}
7172

73+
func TestResyncRegenerateSequencesCorruptDocumentSequence(t *testing.T) {
74+
if base.UnitTestUrlIsWalrus() {
75+
t.Skip("This test doesn't works with walrus")
76+
}
77+
base.SetUpTestLogging(t, base.LevelInfo, base.KeyCRUD, base.KeyChanges, base.KeyAccess)
78+
rt := rest.NewRestTester(t, &rest.RestTesterConfig{
79+
AutoImport: base.BoolPtr(false),
80+
})
81+
82+
defer rt.Close()
83+
ctx := base.TestCtx(t)
84+
ds := rt.GetSingleDataStore()
85+
86+
// create a sequence much higher than _syc:seqs value
87+
const corruptSequence = db.MaxSequencesToRelease + 1000
88+
89+
numDocs := 10
90+
for i := 0; i < numDocs; i++ {
91+
rt.CreateTestDoc(fmt.Sprintf("doc%v", i))
92+
}
93+
94+
// corrupt doc0 sequence
95+
_, xattrs, cas, err := ds.GetWithXattrs(ctx, "doc0", []string{base.SyncXattrName})
96+
require.NoError(t, err)
97+
98+
// corrupt the document sequence
99+
var newSyncData map[string]interface{}
100+
err = json.Unmarshal(xattrs[base.SyncXattrName], &newSyncData)
101+
require.NoError(t, err)
102+
newSyncData["sequence"] = corruptSequence
103+
_, err = ds.UpdateXattrs(ctx, "doc0", 0, cas, map[string][]byte{base.SyncXattrName: base.MustJSONMarshal(t, newSyncData)}, nil)
104+
require.NoError(t, err)
105+
106+
response := rt.SendAdminRequest("POST", "/{{.db}}/_offline", "")
107+
rest.RequireStatus(t, response, http.StatusOK)
108+
rt.WaitForDBState(db.RunStateString[db.DBOffline])
109+
110+
// we need to wait for the resync to start and not finish
111+
resp := rt.SendAdminRequest("POST", "/{{.db}}/_resync?action=start&regenerate_sequences=true", "")
112+
rest.RequireStatus(t, resp, http.StatusOK)
113+
_ = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateRunning)
114+
115+
_ = rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)
116+
117+
_, xattrs, _, err = ds.GetWithXattrs(ctx, "doc0", []string{base.SyncXattrName})
118+
require.NoError(t, err)
119+
// assert doc sequence wasn't changed
120+
var bucketSync map[string]interface{}
121+
err = json.Unmarshal(xattrs[base.SyncXattrName], &bucketSync)
122+
require.NoError(t, err)
123+
seq := newSyncData["sequence"].(int)
124+
require.Equal(t, uint64(corruptSequence), uint64(seq))
125+
126+
base.RequireWaitForStat(t, func() int64 {
127+
return rt.GetDatabase().DbStats.Database().CorruptSequenceCount.Value()
128+
}, 1)
129+
}
130+
72131
func TestResyncRegenerateSequencesPrincipals(t *testing.T) {
73132
base.TestRequiresDCPResync(t)
74133
if !base.TestsUseNamedCollections() {

0 commit comments

Comments
 (0)