From d93054abb9d1ee816a5ce7415aefa8433c2d6909 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 1 Aug 2025 20:52:59 +0100 Subject: [PATCH 01/18] Add version_type option for REST API changes feeds to allow consumers to get CV instead of RevIDs --- db/change_cache_test.go | 6 +- db/changes.go | 118 ++++++++++++------- db/changes_test.go | 10 +- db/crud.go | 32 +++-- db/database_test.go | 4 +- db/sequence_id.go | 4 +- docs/api/components/schemas.yaml | 7 +- docs/api/paths/admin/keyspace-_changes.yaml | 16 +++ rest/blip_api_crud_test.go | 12 +- rest/changes_api.go | 10 ++ rest/changes_test.go | 122 ++++++++++++++++++++ rest/replicatortest/replicator_test.go | 8 +- rest/utilities_testing.go | 17 ++- 13 files changed, 290 insertions(+), 76 deletions(-) diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 14879b7e6f..efb283e42e 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -534,7 +534,7 @@ func TestChannelCacheBackfill(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1, TriggeredBy: 0, LowSeq: 2}, ID: "doc-1", - Changes: []ChangeRev{{"rev": "1-a"}}, + Changes: []ChangeByVersionType{{"rev": "1-a"}}, collectionID: collectionID}, changes[0]) lastSeq := changes[len(changes)-1].Seq @@ -568,7 +568,7 @@ func TestChannelCacheBackfill(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 3, LowSeq: 3}, ID: "doc-3", - Changes: []ChangeRev{{"rev": "1-a"}}, + Changes: []ChangeByVersionType{{"rev": "1-a"}}, collectionID: collectionID, }, changes[0]) @@ -722,7 +722,7 @@ func TestLowSequenceHandling(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1, TriggeredBy: 0, LowSeq: 2}, ID: "doc-1", - Changes: []ChangeRev{{"rev": "1-a"}}, + Changes: []ChangeByVersionType{{"rev": "1-a"}}, collectionID: collectionID}, changes[0]) // Test backfill clear - sequence numbers go back to standard handling diff --git a/db/changes.go b/db/changes.go index 974905dfed..2efda9ca0b 100644 --- a/db/changes.go +++ b/db/changes.go @@ -26,32 +26,51 @@ import ( // Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as // changes processing currently assumes a deep copy when doing chanOpts := changesOptions. type ChangesOptions struct { - Since SequenceID // sequence # to start _after_ - Limit int // Max number of changes to return, if nonzero - Conflicts bool // Show all conflicting revision IDs, not just winning one? - IncludeDocs bool // Include doc body of each change? - Wait bool // Wait for results, instead of immediately returning empty result? - Continuous bool // Run continuously until terminated? - RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq - HeartbeatMs uint64 // How often to send a heartbeat to the client - TimeoutMs uint64 // After this amount of time, close the longpoll connection - ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions - Revocations bool // Specifies whether revocation messages should be sent on the changes feed - clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client - ChangesCtx context.Context // Used for cancelling checking the changes feed should stop + Since SequenceID // sequence # to start _after_ + Limit int // Max number of changes to return, if nonzero + Conflicts bool // Show all conflicting revision IDs, not just winning one? + IncludeDocs bool // Include doc body of each change? + Wait bool // Wait for results, instead of immediately returning empty result? + Continuous bool // Run continuously until terminated? + RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq + HeartbeatMs uint64 // How often to send a heartbeat to the client + TimeoutMs uint64 // After this amount of time, close the longpoll connection + ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions + Revocations bool // Specifies whether revocation messages should be sent on the changes feed + clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client + ChangesCtx context.Context // Used for cancelling checking the changes feed should stop + VersionType ChangesVersionType // The type of version to use for the changes feed. This is used to determine whether to use send revtree IDs or CV in the changes feed entries. +} + +type ChangesVersionType string + +const ( + ChangesVersionTypeRevTreeID ChangesVersionType = "rev" // Use revtree IDs in changes feed entries + ChangesVersionTypeCV ChangesVersionType = "cv" // Use current version in changes feed entries +) + +func ParseChangesVersionType(s string) (ChangesVersionType, error) { + switch ChangesVersionType(s) { + case "", ChangesVersionTypeRevTreeID: + return ChangesVersionTypeRevTreeID, nil + case ChangesVersionTypeCV: + return ChangesVersionTypeCV, nil + default: + return "", fmt.Errorf("unknown changes version type: %q", s) + } } // A changes entry; Database.GetChanges returns an array of these. // Marshals into the standard CouchDB _changes format. type ChangeEntry struct { - Seq SequenceID `json:"seq"` - ID string `json:"id"` - Deleted bool `json:"deleted,omitempty"` - Removed base.Set `json:"removed,omitempty"` - Doc json.RawMessage `json:"doc,omitempty"` - Changes []ChangeRev `json:"changes"` - Err error `json:"err,omitempty"` // Used to notify feed consumer of errors - allRemoved bool // Flag to track whether an entry is a removal in all channels visible to the user. + Seq SequenceID `json:"seq"` + ID string `json:"id"` + Deleted bool `json:"deleted,omitempty"` + Removed base.Set `json:"removed,omitempty"` + Doc json.RawMessage `json:"doc,omitempty"` + Changes []ChangeByVersionType `json:"changes"` + Err error `json:"err,omitempty"` // Used to notify feed consumer of errors + allRemoved bool // Flag to track whether an entry is a removal in all channels visible to the user. branched bool backfill backfillFlag // Flag used to identify non-client entries used for backfill synchronization (di only) principalDoc bool // Used to indicate _user/_role docs @@ -74,16 +93,12 @@ const ( BackfillFlag_Complete ) -type ChangeRev map[string]string // Key is always "rev", value is rev ID +type ChangeByVersionType map[ChangesVersionType]string // Keyed by the type of version in the value type ViewDoc struct { Json json.RawMessage // should be type 'document', but that fails to unmarshal correctly } -func (db *DatabaseCollectionWithUser) AddDocToChangeEntry(ctx context.Context, entry *ChangeEntry, options ChangesOptions) { - db.addDocToChangeEntry(ctx, entry, options) -} - // Adds a document body and/or its conflicts to a ChangeEntry func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, entry *ChangeEntry, options ChangesOptions) { @@ -159,7 +174,7 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co entry.Deleted = false } if !(options.ActiveOnly && leaf.Deleted) { - entry.Changes = append(entry.Changes, ChangeRev{"rev": leaf.ID}) + entry.Changes = append(entry.Changes, ChangeByVersionType{"rev": leaf.ID}) } } }) @@ -300,7 +315,7 @@ func (db *DatabaseCollectionWithUser) buildRevokedFeed(ctx context.Context, ch c continue } - change := makeRevocationChangeEntry(logEntry, seqID, singleChannelCache.ChannelID()) + change := makeRevocationChangeEntry(ctx, logEntry, seqID, singleChannelCache.ChannelID(), options.VersionType) base.DebugfCtx(ctx, base.KeyChanges, "Channel feed processing revocation seq: %v in channel %s ", seqID, base.UD(singleChannelCache.ChannelID().Name)) @@ -459,7 +474,7 @@ func (db *DatabaseCollectionWithUser) changesFeed(ctx context.Context, singleCha TriggeredBy: options.Since.TriggeredBy, } - change := makeChangeEntry(logEntry, seqID, singleChannelCache.ChannelID()) + change := makeChangeEntry(ctx, logEntry, seqID, singleChannelCache.ChannelID(), options.VersionType) lastSeq = logEntry.Sequence // Don't include deletes or removals during initial channel backfill @@ -494,20 +509,33 @@ func (db *DatabaseCollectionWithUser) changesFeed(ctx context.Context, singleCha return feed } -func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) ChangeEntry { +func makeChangeEntry(ctx context.Context, logEntry *LogEntry, seqID SequenceID, channel channels.ID, versionType ChangesVersionType) ChangeEntry { change := ChangeEntry{ Seq: seqID, ID: logEntry.DocID, Deleted: (logEntry.Flags & channels.Deleted) != 0, - Changes: []ChangeRev{{"rev": logEntry.RevID}}, + Changes: []ChangeByVersionType{{ChangesVersionTypeRevTreeID: logEntry.RevID}}, branched: (logEntry.Flags & channels.Branched) != 0, principalDoc: logEntry.IsPrincipal, collectionID: logEntry.CollectionID, } + + switch versionType { + case ChangesVersionTypeCV: + if logEntry.SourceID != "" { + change.Changes[0] = ChangeByVersionType{versionType: Version{SourceID: logEntry.SourceID, Value: logEntry.Version}.String()} + } + case ChangesVersionTypeRevTreeID: + fallthrough + default: + // already initialized with a 'rev' change entry + } + // populate CurrentVersion entry if log entry has sourceID and Version populated // This allows current version to be nil in event of CV not being populated on log entry // allowing omitempty to work as expected if logEntry.SourceID != "" { + // TODO: Remove this if we change BLIP generateBlipSyncChanges and tests to use versionType and read the value from the normal Changes array... no reason to store this info in two places. change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version} } if logEntry.Flags&channels.Removed != 0 { @@ -517,8 +545,8 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) return change } -func makeRevocationChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) ChangeEntry { - entry := makeChangeEntry(logEntry, seqID, channel) +func makeRevocationChangeEntry(ctx context.Context, logEntry *LogEntry, seqID SequenceID, channel channels.ID, versionType ChangesVersionType) ChangeEntry { + entry := makeChangeEntry(ctx, logEntry, seqID, channel, versionType) entry.Revoked = true return entry @@ -603,7 +631,7 @@ func (db *DatabaseCollectionWithUser) appendUserFeed(feeds []<-chan *ChangeEntry entry := ChangeEntry{ Seq: userSeq, ID: "_user/" + name, - Changes: []ChangeRev{}, + Changes: []ChangeByVersionType{}, principalDoc: true, } userFeed := make(chan *ChangeEntry, 1) @@ -799,7 +827,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex if useLateSequenceFeeds { lateSequenceFeedHandler := lateSequenceFeeds[chanID] if lateSequenceFeedHandler != nil { - latefeed, err := col.getLateFeed(options.ChangesCtx, lateSequenceFeedHandler, singleChannelCache) + latefeed, err := col.getLateFeed(options.ChangesCtx, lateSequenceFeedHandler, singleChannelCache, options.VersionType) if err != nil { base.WarnfCtx(ctx, "MultiChangesFeed got error reading late sequence feed %q, rolling back channel changes feed to last sent low sequence #%d.", base.UD(chanName), lastSentLowSeq) chanOpts.Since.LowSeq = lastSentLowSeq @@ -1188,7 +1216,7 @@ func (db *DatabaseCollectionWithUser) newLateSequenceFeed(singleChannelCache Sin // Feed to process late sequences for the channel. Updates lastSequence as it works the feed. Error indicates // previous position in late sequence feed isn't available, and caller should reset to low sequence. -func (db *DatabaseCollectionWithUser) getLateFeed(ctx context.Context, feedHandler *lateSequenceFeed, singleChannelCache SingleChannelCache) (<-chan *ChangeEntry, error) { +func (db *DatabaseCollectionWithUser) getLateFeed(ctx context.Context, feedHandler *lateSequenceFeed, singleChannelCache SingleChannelCache, versionType ChangesVersionType) (<-chan *ChangeEntry, error) { if !singleChannelCache.SupportsLateFeed() { return nil, errors.New("Cache doesn't support late feeds") @@ -1227,7 +1255,7 @@ func (db *DatabaseCollectionWithUser) getLateFeed(ctx context.Context, feedHandl seqID := SequenceID{ Seq: logEntry.Sequence, } - change := makeChangeEntry(logEntry, seqID, singleChannelCache.ChannelID()) + change := makeChangeEntry(ctx, logEntry, seqID, singleChannelCache.ChannelID(), versionType) select { case <-ctx.Done(): return @@ -1308,9 +1336,16 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio return nil } - changes := make([]ChangeRev, 1) - changes[0] = ChangeRev{"rev": populatedDoc.CurrentRev} - row.Changes = changes + row.Changes = []ChangeByVersionType{{ChangesVersionTypeRevTreeID: populatedDoc.CurrentRev}} + switch options.VersionType { + case ChangesVersionTypeCV: + row.Changes[0] = ChangeByVersionType{options.VersionType: populatedDoc.HLV.GetCurrentVersionString()} + case ChangesVersionTypeRevTreeID: + fallthrough + default: + // already initialized with a 'rev' change entry above + } + row.Deleted = populatedDoc.Deleted row.Seq = SequenceID{Seq: populatedDoc.Sequence} row.SetBranched((populatedDoc.Flags & channels.Branched) != 0) @@ -1360,7 +1395,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio func (options ChangesOptions) String() string { return fmt.Sprintf( - `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, Revocations: %t, RequestPlusSeq: %d}`, + `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, Revocations: %t, RequestPlusSeq: %d, VersionType: %s}`, options.Since, options.Limit, options.Conflicts, @@ -1372,6 +1407,7 @@ func (options ChangesOptions) String() string { options.ActiveOnly, options.Revocations, options.RequestPlusSeq, + options.VersionType, ) } diff --git a/db/changes_test.go b/db/changes_test.go index 0447893c0f..b2bc8d2d19 100644 --- a/db/changes_test.go +++ b/db/changes_test.go @@ -162,7 +162,7 @@ func TestChangesAfterChannelAdded(t *testing.T) { require.Len(t, changes, 1) assert.Equal(t, "doc2", changes[0].ID) - assert.Equal(t, []ChangeRev{{"rev": revid}}, changes[0].Changes) + assert.Equal(t, []ChangeByVersionType{{"rev": revid}}, changes[0].Changes) // validate from zero changes = getChanges(t, collection, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) @@ -234,7 +234,7 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1}, ID: "alpha", - Changes: []ChangeRev{{"rev": revid}}, + Changes: []ChangeByVersionType{{"rev": revid}}, collectionID: collectionID}, changes[0]) lastSeq := getLastSeq(changes) @@ -279,7 +279,7 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) { ID: "alpha", Removed: base.SetOf("A"), allRemoved: true, - Changes: []ChangeRev{{"rev": "2-e99405a23fa102238fa8c3fd499b15bc"}}, + Changes: []ChangeByVersionType{{"rev": "2-e99405a23fa102238fa8c3fd499b15bc"}}, collectionID: collectionID}, changes[0]) printChanges(changes) @@ -352,7 +352,7 @@ func TestDocDeletionFromChannelCoalesced(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1}, ID: "alpha", - Changes: []ChangeRev{{"rev": revid}}, + Changes: []ChangeByVersionType{{"rev": revid}}, collectionID: collectionID}, changes[0]) lastSeq := getLastSeq(changes) @@ -392,7 +392,7 @@ func TestDocDeletionFromChannelCoalesced(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 3}, ID: "alpha", - Changes: []ChangeRev{{"rev": "3-e99405a23fa102238fa8c3fd499b15bc"}}, + Changes: []ChangeByVersionType{{"rev": "3-e99405a23fa102238fa8c3fd499b15bc"}}, collectionID: collectionID}, changes[0]) printChanges(changes) diff --git a/db/crud.go b/db/crud.go index 171f2f5406..2ba302cf2f 100644 --- a/db/crud.go +++ b/db/crud.go @@ -361,20 +361,36 @@ func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Contex // - attachmentsSince is nil to return no attachment bodies, otherwise a (possibly empty) list of // revisions for which the client already has attachments and doesn't need bodies. Any attachment // that hasn't changed since one of those revisions will be returned as a stub. -func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string) (revision DocumentRevision, err error) { - if revid != "" { +func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revOrCV string, maxHistory int, historyFrom []string) (DocumentRevision, error) { + var ( + revID *string + cv *Version + ) + + var ( + revision DocumentRevision + getErr error + ) + if revOrCV != "" { // Get a specific revision body and history from the revision cache // (which will load them if necessary, by calling revCacheLoader, above) - revision, err = db.revisionCache.GetWithRev(ctx, docid, revid, RevCacheOmitDelta) + if currentVersion, parseErr := ParseVersion(revOrCV); parseErr != nil { + // try as a rev ID + revID = &revOrCV + revision, getErr = db.revisionCache.GetWithRev(ctx, docid, *revID, RevCacheOmitDelta) + } else { + cv = ¤tVersion + revision, getErr = db.revisionCache.GetWithCV(ctx, docid, cv, RevCacheOmitDelta) + } } else { - // No rev ID given, so load active revision - revision, err = db.revisionCache.GetActive(ctx, docid) + // No rev given, so load active revision + revision, getErr = db.revisionCache.GetActive(ctx, docid) } - if err != nil { - return DocumentRevision{}, err + if getErr != nil { + return DocumentRevision{}, getErr } - return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom) + return db.documentRevisionForRequest(ctx, docid, revision, revID, cv, maxHistory, historyFrom) } // documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc. diff --git a/db/database_test.go b/db/database_test.go index ff955263e2..59fdbf0473 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -1718,7 +1718,7 @@ func TestConflicts(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 3}, ID: "doc", - Changes: []ChangeRev{{"rev": "2-b"}, {"rev": "2-a"}}, + Changes: []ChangeByVersionType{{"rev": "2-b"}, {"rev": "2-a"}}, branched: true, collectionID: collectionID, CurrentVersion: &Version{SourceID: source, Value: version}, @@ -1753,7 +1753,7 @@ func TestConflicts(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 4}, ID: "doc", - Changes: []ChangeRev{{"rev": "2-a"}, {"rev": rev3}}, + Changes: []ChangeByVersionType{{"rev": "2-a"}, {"rev": rev3}}, branched: true, collectionID: collectionID, CurrentVersion: &Version{SourceID: bucketUUID, Value: doc.Cas}, diff --git a/db/sequence_id.go b/db/sequence_id.go index 3afeabea97..c7ef78984b 100644 --- a/db/sequence_id.go +++ b/db/sequence_id.go @@ -120,11 +120,11 @@ func parseIntegerSequenceID(str string) (SequenceID, error) { return SequenceID{}, err } } else { - return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence") + return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence: %q", str) } if err != nil { - return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence") + return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence: %q", str) } return s, nil } diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 090b33d8d5..48638462d9 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -551,8 +551,13 @@ Changes-feed: type: object properties: rev: - description: The new revision that was caused by that change. + description: The new Revision ID that was caused by that change. This is omitted when the `version_type` parameter is `cv`. type: string + optional: true + cv: + description: The new Current Version that was caused by that change. This is present only when the `version_type` parameter is `cv`. + type: string + optional: true uniqueItems: true uniqueItems: true last_seq: diff --git a/docs/api/paths/admin/keyspace-_changes.yaml b/docs/api/paths/admin/keyspace-_changes.yaml index 91ae88ca11..f00d02e23e 100644 --- a/docs/api/paths/admin/keyspace-_changes.yaml +++ b/docs/api/paths/admin/keyspace-_changes.yaml @@ -102,6 +102,15 @@ get: schema: type: boolean default: false + - name: version_type + in: query + description: The type of document versioning to use for the changes feed. + schema: + type: string + default: rev + enum: + - rev + - cv responses: '200': $ref: ../../components/responses.yaml#/changes-feed @@ -166,6 +175,13 @@ post: description: 'When true, ensures all valid documents written prior to the request being issued are included in the response. This is only applicable for non-continuous feeds.' type: boolean default: false + version_type: + description: The type of document versioning to use for the changes feed. + type: string + default: rev + enum: + - rev + - cv responses: '200': $ref: ../../components/responses.yaml#/changes-feed diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 9f60fbbeb6..86b78cc965 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -2342,7 +2342,7 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0&revocations=true", "user", true) assert.Equal(t, "doc", changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0]) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2351,7 +2351,7 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes = rt.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%s&revocations=true", changes.Last_Seq), "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0]) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2362,11 +2362,11 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes = rt.WaitForChanges(2, fmt.Sprintf("/{{.keyspace}}/_changes?since=%s&revocations=true", changes.Last_Seq), "user", true) assert.Equal(t, "doc", changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0]) assert.Equal(t, "3-1bc9dd04c8a257ba28a41eaad90d32de", changes.Results[0].Changes[0]["rev"]) assert.False(t, changes.Results[0].Revoked) assert.Equal(t, "docmarker", changes.Results[1].ID) - RequireChangeRevVersion(t, docMarkerVersion, changes.Results[1].Changes[0]) + RequireChangeRev(t, docMarkerVersion, changes.Results[1].Changes[0]) assert.Equal(t, "1-999bcad4aab47f0a8a24bd9d3598060c", changes.Results[1].Changes[0]["rev"]) assert.False(t, changes.Results[1].Revoked) @@ -2427,7 +2427,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0&revocations=true", "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0]) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2438,7 +2438,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi // At this point changes should send revocation, as document isn't in any of the user's channels changes = rt.WaitForChanges(1, "/{{.keyspace}}/_changes?filter=sync_gateway/bychannel&channels=A&since=0&revocations=true", "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0]) btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false}) _ = btcRunner.WaitForVersion(btc.id, docID, version) diff --git a/rest/changes_api.go b/rest/changes_api.go index 78ce4a03fb..697af9edf1 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -185,6 +185,10 @@ func (h *handler) handleChanges() error { options.ActiveOnly = h.getBoolQuery("active_only") options.IncludeDocs = h.getBoolQuery("include_docs") options.Revocations = h.getBoolQuery("revocations") + options.VersionType, err = db.ParseChangesVersionType(h.getQuery("version_type")) + if err != nil { + return base.HTTPErrorf(http.StatusBadRequest, "Invalid version_type: %v", err) + } useRequestPlus, _ := h.getOptBoolQuery("request_plus", h.db.Options.ChangesRequestPlus) if useRequestPlus && feed != feedTypeContinuous { @@ -602,6 +606,7 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti AcceptEncoding string `json:"accept_encoding"` ActiveOnly bool `json:"active_only"` // Return active revisions only RequestPlus *bool `json:"request_plus"` // Wait for sequence buffering to catch up to database seq value at time request was issued + VersionType string `json:"version_type"` // Version type to use for changes feed } // Initialize since clock and hasher ahead of unmarshalling sequence @@ -643,6 +648,11 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti true, ) + options.VersionType, err = db.ParseChangesVersionType(input.VersionType) + if err != nil { + return "", options, "", nil, nil, false, base.HTTPErrorf(http.StatusBadRequest, "Invalid version_type: %v", err) + } + compress = (input.AcceptEncoding == "gzip") if h.db != nil && feed != feedTypeContinuous { diff --git a/rest/changes_test.go b/rest/changes_test.go index 2b1c716c17..57bfb70efa 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -449,3 +449,125 @@ func TestCVPopulationOnDocIDChanges(t *testing.T) { assert.Equal(t, bucketUUID, changes.Results[0].CurrentVersion.SourceID) assert.Equal(t, fetchedDoc.Cas, changes.Results[0].CurrentVersion.Value) } + +// TestChangesVersionType tests the /_changes REST endpoint with different version_type parameters for each possible underlying feed type and HTTP method. +func TestChangesVersionType(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + + rt.PutDoc("doc1", `{"foo":"bar"}`) + rt.PutDoc("doc2", `{"buzz":"quux"}`) + + rt.WaitForPendingChanges() + + tests := []struct { + name string + changesRequestMethod string + changesRequestQueryParams string + changesRequestBody string + expectedStatus int + expectedVersionType string + expectedDocs int + }{ + { + name: "invalid version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=invalid", + expectedStatus: http.StatusBadRequest, + }, + { + name: "empty version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "", + expectedStatus: http.StatusOK, + expectedVersionType: "rev", + expectedDocs: 2, + }, + { + name: "rev version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=rev", + expectedStatus: http.StatusOK, + expectedVersionType: "rev", + expectedDocs: 2, + }, + { + name: "cv version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=cv", + expectedStatus: http.StatusOK, + expectedVersionType: "cv", + expectedDocs: 2, + }, + { + name: "rev docid filter", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=rev&filter=_doc_ids&doc_ids=doc1", + expectedStatus: http.StatusOK, + expectedVersionType: "rev", + expectedDocs: 1, + }, + { + name: "cv docid filter", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=cv&filter=_doc_ids&doc_ids=doc1", + expectedStatus: http.StatusOK, + expectedVersionType: "cv", + expectedDocs: 1, + }, + { + name: "rev post", + changesRequestMethod: http.MethodPost, + changesRequestQueryParams: "", + changesRequestBody: `{"version_type":"rev"}`, + expectedStatus: http.StatusOK, + expectedVersionType: "rev", + expectedDocs: 2, + }, + { + name: "cv post", + changesRequestMethod: http.MethodPost, + changesRequestQueryParams: "", + changesRequestBody: `{"version_type":"cv"}`, + expectedStatus: http.StatusOK, + expectedVersionType: "cv", + expectedDocs: 2, + }, + { + name: "cv docid filter post", + changesRequestMethod: http.MethodPost, + changesRequestQueryParams: "", + changesRequestBody: `{"version_type":"cv", "filter":"_doc_ids", "doc_ids":["doc1"]}`, + expectedStatus: http.StatusOK, + expectedVersionType: "cv", + expectedDocs: 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require.NotEmptyf(t, test.changesRequestMethod, "Test case %q requires a changesRequestMethod to be set", test.name) + + if test.expectedStatus != http.StatusOK { + resp := rt.SendAdminRequest(test.changesRequestMethod, fmt.Sprintf("/{{.keyspace}}/_changes%s", test.changesRequestQueryParams), test.changesRequestBody) + RequireStatus(t, resp, test.expectedStatus) + return + } + + resp := rt.SendAdminRequest(test.changesRequestMethod, fmt.Sprintf("/{{.keyspace}}/_changes%s", test.changesRequestQueryParams), test.changesRequestBody) + RequireStatus(t, resp, test.expectedStatus) + var changesResults ChangesResults + require.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &changesResults)) + require.Len(t, changesResults.Results, test.expectedDocs) + for _, changeEntry := range changesResults.Results { + for _, change := range changeEntry.Changes { + require.Len(t, change, 1) // ensure only one version type is present + // and that it was the expected one (and we have a value) + versionValue, ok := change[db.ChangesVersionType(test.expectedVersionType)] + require.Truef(t, ok, "Expected version type %s, got %v", test.expectedVersionType, change) + require.NotEmpty(t, versionValue) + } + } + }) + } +} diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 5f1b9e6867..acfbdf0d5d 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -4088,7 +4088,7 @@ func TestActiveReplicatorPullConflict(t *testing.T) { changesResults := rt1.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) t.Logf("Changes response is %+v", changesResults) rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() @@ -4307,7 +4307,7 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { // Validate results on the local (rt1) changesResults := rt1.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d", localDoc.Sequence), "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0]) t.Logf("Changes response is %+v", changesResults) rawDocResponse := rt1.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_raw/"+docID, "") @@ -4351,7 +4351,7 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { } changesResults = rt2.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d", rt2Since), "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0]) t.Logf("Changes response is %+v", changesResults) doc, err = rt2collection.GetDocument(rt2ctx, docID, db.DocUnmarshalAll) @@ -5922,7 +5922,7 @@ func TestActiveReplicatorPullConflictReadWriteIntlProps(t *testing.T) { // Should end up as winner under default conflict resolution. changesResults := rt1.WaitForChanges(1, "/{{.keyspace}}/_changes?&since=0", "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) t.Logf("Changes response is %+v", changesResults) rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 02a3c01625..ba29af3c66 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -883,10 +883,19 @@ func (cr ChangesResults) Summary() string { return strings.Join(revs, ", ") } -// RequireChangeRevVersion asserts that the given ChangeRev has the expected version for a given entry returned by _changes feed -func RequireChangeRevVersion(t *testing.T, expected DocVersion, changeRev db.ChangeRev) { - // CV will only be populated if changes requests it - require.Equal(t, expected.RevTreeID, changeRev["rev"], "Expected rev %s, got %s", expected.RevTreeID, changeRev["rev"]) +// RequireChangeRev asserts that the given db.ChangeByVersionType returned a /_changes feed has the expected DocVersion entry, for a given versionType (rev or cv) +func RequireChangeRev(t *testing.T, expected DocVersion, changeRev db.ChangeByVersionType, versionType db.ChangesVersionType) { + // Only one version type will be populated on a changes feed, based on what the original request demanded and what version types are available on that particular revision. + var expectedStr string + switch versionType { + case db.ChangesVersionTypeRevTreeID: + expectedStr = expected.RevTreeID + case db.ChangesVersionTypeCV: + expectedStr = expected.CV.String() + default: + t.Fatalf("Unexpected version type: %q", versionType) + } + require.Equalf(t, expectedStr, changeRev[versionType], "Expected changeRev[%q]==%s, got %s", versionType, expected.RevTreeID, changeRev[versionType]) } func (rt *RestTester) WaitForChanges(numChangesExpected int, changesURL, username string, useAdminPort bool) ChangesResults { From 6535ba7348c21c02f30dd08f63d92133923c266f Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 1 Aug 2025 22:49:17 +0100 Subject: [PATCH 02/18] Support sending changes for old rev-only docs on a cv changes feed --- db/changes.go | 42 +++++++++++++++++---- docs/api/components/schemas.yaml | 11 ++++-- docs/api/paths/admin/keyspace-_changes.yaml | 4 +- rest/changes_api.go | 10 +++-- rest/changes_test.go | 39 +++++++++++++++++++ 5 files changed, 89 insertions(+), 17 deletions(-) diff --git a/db/changes.go b/db/changes.go index 2efda9ca0b..546b501e15 100644 --- a/db/changes.go +++ b/db/changes.go @@ -42,6 +42,8 @@ type ChangesOptions struct { VersionType ChangesVersionType // The type of version to use for the changes feed. This is used to determine whether to use send revtree IDs or CV in the changes feed entries. } +// ChangesVersionType determines the preferred version type to use in changes feed entries. +// If the requested version type is not available, the feed will attempt to fall back to ChangesVersionTypeRevTreeID which should always be available. type ChangesVersionType string const ( @@ -60,6 +62,15 @@ func ParseChangesVersionType(s string) (ChangesVersionType, error) { } } +// ChangeVersionString attempts to return the version string for the preferred ChangesVersionType, but will fall back to rev if cv is not available when requested. +func (ce *ChangeEntry) ChangeVersionString(versionType ChangesVersionType) string { + if s, ok := ce.Changes[0][versionType]; ok { + return s + } + // requested version type not found, return `rev` as a fallback. + return ce.Changes[0][ChangesVersionTypeRevTreeID] +} + // A changes entry; Database.GetChanges returns an array of these. // Marshals into the standard CouchDB _changes format. type ChangeEntry struct { @@ -128,7 +139,10 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e base.WarnfCtx(ctx, "Changes feed: error getting doc %q: %v", base.UD(entry.ID), err) return } - db.AddDocInstanceToChangeEntry(ctx, entry, doc, options) + if err := db.AddDocInstanceToChangeEntry(ctx, entry, doc, options); err != nil { + base.WarnfCtx(ctx, "Changes feed: error adding doc %q to change entry: %v", base.UD(entry.ID), err) + return + } } else if includeConflicts { // Load doc metadata only @@ -139,11 +153,14 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e base.WarnfCtx(ctx, "Changes feed: error getting doc sync data %q: %v", base.UD(entry.ID), err) return } - db.AddDocInstanceToChangeEntry(ctx, entry, doc, options) + if err := db.AddDocInstanceToChangeEntry(ctx, entry, doc, options); err != nil { + base.WarnfCtx(ctx, "Changes feed: error adding doc %q to change entry: %v", base.UD(entry.ID), err) + return + } } else if options.IncludeDocs { // Retrieve document via rev cache - revID := entry.Changes[0]["rev"] + revID := entry.ChangeVersionString(options.VersionType) err := db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID) if err != nil { base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err) @@ -162,12 +179,16 @@ func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx conte } // Adds a document body and/or its conflicts to a ChangeEntry -func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Context, entry *ChangeEntry, doc *Document, options ChangesOptions) { +func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Context, entry *ChangeEntry, doc *Document, options ChangesOptions) error { includeConflicts := options.Conflicts && entry.branched - revID := entry.Changes[0]["rev"] + revID := entry.ChangeVersionString(options.VersionType) if includeConflicts { + // should've been validated in the handler layer but be defensive + if options.VersionType == ChangesVersionTypeCV { + return fmt.Errorf("changes feed does not support showing in-conflict revisions when using version_type=cv") + } doc.History.forEachLeaf(func(leaf *RevInfo) { if leaf.ID != revID { if !leaf.Deleted { @@ -187,6 +208,8 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co base.WarnfCtx(ctx, "Changes feed: error getting doc %q/%q: %v", base.UD(doc.ID), revID, err) } } + + return nil } // Parameters @@ -553,7 +576,7 @@ func makeRevocationChangeEntry(ctx context.Context, logEntry *LogEntry, seqID Se } // AuditReadEvent issues a read event for this change entry. If there is no document body, there will be no event used. -func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) { +func (ce *ChangeEntry) AuditReadEvent(ctx context.Context, versionType ChangesVersionType) { if ce.Err != nil { return } @@ -562,7 +585,7 @@ func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) { } base.Audit(ctx, base.AuditIDDocumentRead, base.AuditFields{ base.AuditFieldDocID: ce.ID, - base.AuditFieldDocVersion: ce.Changes[0]["rev"], + base.AuditFieldDocVersion: ce.ChangeVersionString(versionType), }) } @@ -1387,7 +1410,10 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio row.Removed = base.SetFromArray(removedChannels) if options.IncludeDocs || options.Conflicts { - db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options) + if err := db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options); err != nil { + base.WarnfCtx(ctx, "Unable to add doc instance to change entry for %s: %v", base.UD(docid), err) + return nil + } } return row diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 48638462d9..2ca73f6036 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -549,15 +549,18 @@ Changes-feed: type: array items: type: object + minProperties: 1 + maxProperties: 1 properties: rev: - description: The new Revision ID that was caused by that change. This is omitted when the `version_type` parameter is `cv`. + description: | + The new Revision ID associated with the change. + This is usually omitted in favour of the `cv` property when the `version_type` preference is set to `cv`, + however data written prior to Sync Gateway 4.0 can still emit `rev` only change entries if we do not have a `cv` available for that document revision. type: string - optional: true cv: - description: The new Current Version that was caused by that change. This is present only when the `version_type` parameter is `cv`. + description: The new Current Version associated with the change. This value requires the `version_type` preference set to `cv`. type: string - optional: true uniqueItems: true uniqueItems: true last_seq: diff --git a/docs/api/paths/admin/keyspace-_changes.yaml b/docs/api/paths/admin/keyspace-_changes.yaml index f00d02e23e..c51b2768cd 100644 --- a/docs/api/paths/admin/keyspace-_changes.yaml +++ b/docs/api/paths/admin/keyspace-_changes.yaml @@ -104,7 +104,7 @@ get: default: false - name: version_type in: query - description: The type of document versioning to use for the changes feed. + description: The preferred type of document versioning to use for the changes feed. schema: type: string default: rev @@ -176,7 +176,7 @@ post: type: boolean default: false version_type: - description: The type of document versioning to use for the changes feed. + description: The preferred type of document versioning to use for the changes feed. type: string default: rev enum: diff --git a/rest/changes_api.go b/rest/changes_api.go index 697af9edf1..81414c39eb 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -190,6 +190,10 @@ func (h *handler) handleChanges() error { return base.HTTPErrorf(http.StatusBadRequest, "Invalid version_type: %v", err) } + if options.Conflicts && options.VersionType == db.ChangesVersionTypeCV { + return base.HTTPErrorf(http.StatusBadRequest, "Cannot use 'style=all_docs' with 'version_type=cv'") + } + useRequestPlus, _ := h.getOptBoolQuery("request_plus", h.db.Options.ChangesRequestPlus) if useRequestPlus && feed != feedTypeContinuous { var seqErr error @@ -424,7 +428,7 @@ func (h *handler) sendSimpleChanges(channels base.Set, options db.ChangesOptions } _ = encoder.Encode(entry) lastSeq = entry.Seq - entry.AuditReadEvent(h.ctx()) + entry.AuditReadEvent(h.ctx(), options.VersionType) } case <-heartbeat: @@ -497,7 +501,7 @@ func (h *handler) sendContinuousChangesByHTTP(inChannels base.Set, options db.Ch break } - change.AuditReadEvent(h.ctx()) + change.AuditReadEvent(h.ctx(), options.VersionType) } } else { _, err = h.response.Write([]byte("\n")) @@ -574,7 +578,7 @@ func (h *handler) sendContinuousChangesByWebSocket(inChannels base.Set, options return err } for _, change := range changes { - change.AuditReadEvent(h.ctx()) + change.AuditReadEvent(h.ctx(), options.VersionType) } return err }) diff --git a/rest/changes_test.go b/rest/changes_test.go index 57bfb70efa..1b670869bd 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -571,3 +571,42 @@ func TestChangesVersionType(t *testing.T) { }) } } + +func TestChangesFeedCVWithOldRevOnlyData(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + + seq, err := db.AllocateTestSequence(rt.GetDatabase()) + require.NoError(t, err) + oldDoc := "oldDoc" + oldDocBody := []byte(`{"body_field":"1234"}`) + oldDocSyncData := []byte(fmt.Sprintf(`{"sequence":%d,"rev":{"rev": "1-abc"},"value_crc32c":"%s"}`, seq, base.Crc32cHashString(oldDocBody))) + _, err = rt.GetSingleDataStore().WriteWithXattrs(t.Context(), oldDoc, 0, 0, oldDocBody, map[string][]byte{base.SyncXattrName: oldDocSyncData}, nil, nil) + require.NoError(t, err) + + rt.PutDoc("newDoc", `{"foo":"bar"}`) + + rt.WaitForPendingChanges() + + resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_changes?version_type=cv", "") + RequireStatus(t, resp, http.StatusOK) + var changesResults ChangesResults + require.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &changesResults)) + require.Len(t, changesResults.Results, 2) + for i, changeEntry := range changesResults.Results { + for _, change := range changeEntry.Changes { + require.Len(t, change, 1) // ensure only one version type is present + // and that it was the expected one (and we have a value) + var expectedType db.ChangesVersionType + if i == 0 { + // first doc was written with a RevID and no CV available + expectedType = db.ChangesVersionTypeRevTreeID + } else { + expectedType = db.ChangesVersionTypeCV + } + versionValue, ok := change[expectedType] + require.Truef(t, ok, "Expected version type %s, got %v", expectedType, change) + require.NotEmpty(t, versionValue) + } + } +} From f2b25f80e332200a0d099ad84b920f91c544f653 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 1 Aug 2025 23:23:44 +0100 Subject: [PATCH 03/18] Use typed consts for expectedVersionType in subtest fields --- rest/changes_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/rest/changes_test.go b/rest/changes_test.go index 1b670869bd..6fc1212686 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -466,7 +466,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestQueryParams string changesRequestBody string expectedStatus int - expectedVersionType string + expectedVersionType db.ChangesVersionType expectedDocs int }{ { @@ -480,7 +480,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestMethod: http.MethodGet, changesRequestQueryParams: "", expectedStatus: http.StatusOK, - expectedVersionType: "rev", + expectedVersionType: db.ChangesVersionTypeRevTreeID, expectedDocs: 2, }, { @@ -488,7 +488,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestMethod: http.MethodGet, changesRequestQueryParams: "?version_type=rev", expectedStatus: http.StatusOK, - expectedVersionType: "rev", + expectedVersionType: db.ChangesVersionTypeRevTreeID, expectedDocs: 2, }, { @@ -496,7 +496,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestMethod: http.MethodGet, changesRequestQueryParams: "?version_type=cv", expectedStatus: http.StatusOK, - expectedVersionType: "cv", + expectedVersionType: db.ChangesVersionTypeCV, expectedDocs: 2, }, { @@ -504,7 +504,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestMethod: http.MethodGet, changesRequestQueryParams: "?version_type=rev&filter=_doc_ids&doc_ids=doc1", expectedStatus: http.StatusOK, - expectedVersionType: "rev", + expectedVersionType: db.ChangesVersionTypeRevTreeID, expectedDocs: 1, }, { @@ -512,7 +512,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestMethod: http.MethodGet, changesRequestQueryParams: "?version_type=cv&filter=_doc_ids&doc_ids=doc1", expectedStatus: http.StatusOK, - expectedVersionType: "cv", + expectedVersionType: db.ChangesVersionTypeCV, expectedDocs: 1, }, { @@ -521,7 +521,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestQueryParams: "", changesRequestBody: `{"version_type":"rev"}`, expectedStatus: http.StatusOK, - expectedVersionType: "rev", + expectedVersionType: db.ChangesVersionTypeRevTreeID, expectedDocs: 2, }, { @@ -530,7 +530,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestQueryParams: "", changesRequestBody: `{"version_type":"cv"}`, expectedStatus: http.StatusOK, - expectedVersionType: "cv", + expectedVersionType: db.ChangesVersionTypeCV, expectedDocs: 2, }, { @@ -539,7 +539,7 @@ func TestChangesVersionType(t *testing.T) { changesRequestQueryParams: "", changesRequestBody: `{"version_type":"cv", "filter":"_doc_ids", "doc_ids":["doc1"]}`, expectedStatus: http.StatusOK, - expectedVersionType: "cv", + expectedVersionType: db.ChangesVersionTypeCV, expectedDocs: 1, }, } @@ -563,7 +563,7 @@ func TestChangesVersionType(t *testing.T) { for _, change := range changeEntry.Changes { require.Len(t, change, 1) // ensure only one version type is present // and that it was the expected one (and we have a value) - versionValue, ok := change[db.ChangesVersionType(test.expectedVersionType)] + versionValue, ok := change[test.expectedVersionType] require.Truef(t, ok, "Expected version type %s, got %v", test.expectedVersionType, change) require.NotEmpty(t, versionValue) } From f6a16817853c97d762c1122f102909424cd73473 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 14 Aug 2025 15:13:47 +0100 Subject: [PATCH 04/18] Update OpenAPI spec --- docs/api/components/schemas.yaml | 26 ++++++++++---------- docs/api/paths/admin/keyspace-_changes.yaml | 6 +++++ docs/api/paths/public/keyspace-_changes.yaml | 14 ++++++++++- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 2ca73f6036..0fcf259904 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -548,19 +548,19 @@ Changes-feed: description: List of document leafs with each leaf containing only a `rev` field. type: array items: - type: object - minProperties: 1 - maxProperties: 1 - properties: - rev: - description: | - The new Revision ID associated with the change. - This is usually omitted in favour of the `cv` property when the `version_type` preference is set to `cv`, - however data written prior to Sync Gateway 4.0 can still emit `rev` only change entries if we do not have a `cv` available for that document revision. - type: string - cv: - description: The new Current Version associated with the change. This value requires the `version_type` preference set to `cv`. - type: string + oneOf: + - type: object + properties: + rev: + description: The Revision Tree ID associated with the change. This may be returned even when the `version_type` preference is set to `cv` if this document was an old revision written prior to upgrading to SG 4.0+ + type: string + title: Revision Tree ID + - type: object + properties: + cv: + description: The HLV Current Version associated with the change. This value requires the `version_type` preference set to `cv` and this document revision being written through SG 4.0+ + type: string + title: HLV Current Version uniqueItems: true uniqueItems: true last_seq: diff --git a/docs/api/paths/admin/keyspace-_changes.yaml b/docs/api/paths/admin/keyspace-_changes.yaml index c51b2768cd..9f328fb4ac 100644 --- a/docs/api/paths/admin/keyspace-_changes.yaml +++ b/docs/api/paths/admin/keyspace-_changes.yaml @@ -111,6 +111,9 @@ get: enum: - rev - cv + x-enumDescriptions: + rev: 'Revision Tree IDs. For example: 1-293a80ce8f4874724732f27d35b3959a13cd96e0' + cv: 'HLV Current Version. For example: 1854e4e557cc0000@zTWkmBiYZgNQo7BHVZrB/Q' responses: '200': $ref: ../../components/responses.yaml#/changes-feed @@ -182,6 +185,9 @@ post: enum: - rev - cv + x-enumDescriptions: + rev: 'Revision Tree IDs. For example: 1-293a80ce8f4874724732f27d35b3959a13cd96e0' + cv: 'HLV Current Version. For example: 1854e4e557cc0000@zTWkmBiYZgNQo7BHVZrB/Q' responses: '200': $ref: ../../components/responses.yaml#/changes-feed diff --git a/docs/api/paths/public/keyspace-_changes.yaml b/docs/api/paths/public/keyspace-_changes.yaml index b2a631d724..019f52f7ba 100644 --- a/docs/api/paths/public/keyspace-_changes.yaml +++ b/docs/api/paths/public/keyspace-_changes.yaml @@ -81,7 +81,7 @@ get: minimum: 0 - name: feed in: query - description: 'The type of changes feed to use. ' + description: 'The type of changes feed to use.' schema: type: string default: normal @@ -90,6 +90,18 @@ get: - longpoll - continuous - websocket + - name: version_type + in: query + description: 'The preferred type of document versioning to use for the changes feed.' + schema: + type: string + default: rev + enum: + - rev + - cv + x-enumDescriptions: + rev: 'Revision Tree IDs. For example: 1-293a80ce8f4874724732f27d35b3959a13cd96e0' + cv: 'HLV Current Version. For example: 1854e4e557cc0000@zTWkmBiYZgNQo7BHVZrB/Q' responses: '200': $ref: ../../components/responses.yaml#/changes-feed From c1d85c1c4f9e8f8430ff36a9062d3bcf04c8368f Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 14 Aug 2025 15:16:48 +0100 Subject: [PATCH 05/18] Pass through version type from RequireChangeRev helper --- rest/blip_api_crud_test.go | 12 ++++++------ rest/replicatortest/replicator_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 86b78cc965..a00769d0ff 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -2342,7 +2342,7 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0&revocations=true", "user", true) assert.Equal(t, "doc", changes.Results[0].ID) - RequireChangeRev(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2351,7 +2351,7 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes = rt.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%s&revocations=true", changes.Last_Seq), "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRev(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2362,11 +2362,11 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes = rt.WaitForChanges(2, fmt.Sprintf("/{{.keyspace}}/_changes?since=%s&revocations=true", changes.Last_Seq), "user", true) assert.Equal(t, "doc", changes.Results[0].ID) - RequireChangeRev(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) assert.Equal(t, "3-1bc9dd04c8a257ba28a41eaad90d32de", changes.Results[0].Changes[0]["rev"]) assert.False(t, changes.Results[0].Revoked) assert.Equal(t, "docmarker", changes.Results[1].ID) - RequireChangeRev(t, docMarkerVersion, changes.Results[1].Changes[0]) + RequireChangeRev(t, docMarkerVersion, changes.Results[1].Changes[0], db.ChangesVersionTypeRevTreeID) assert.Equal(t, "1-999bcad4aab47f0a8a24bd9d3598060c", changes.Results[1].Changes[0]["rev"]) assert.False(t, changes.Results[1].Revoked) @@ -2427,7 +2427,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0&revocations=true", "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRev(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2438,7 +2438,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi // At this point changes should send revocation, as document isn't in any of the user's channels changes = rt.WaitForChanges(1, "/{{.keyspace}}/_changes?filter=sync_gateway/bychannel&channels=A&since=0&revocations=true", "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRev(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false}) _ = btcRunner.WaitForVersion(btc.id, docID, version) diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index acfbdf0d5d..3c7372be63 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -4088,7 +4088,7 @@ func TestActiveReplicatorPullConflict(t *testing.T) { changesResults := rt1.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() @@ -4307,7 +4307,7 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { // Validate results on the local (rt1) changesResults := rt1.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d", localDoc.Sequence), "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) rawDocResponse := rt1.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_raw/"+docID, "") @@ -4351,7 +4351,7 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { } changesResults = rt2.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d", rt2Since), "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) doc, err = rt2collection.GetDocument(rt2ctx, docID, db.DocUnmarshalAll) @@ -5922,7 +5922,7 @@ func TestActiveReplicatorPullConflictReadWriteIntlProps(t *testing.T) { // Should end up as winner under default conflict resolution. changesResults := rt1.WaitForChanges(1, "/{{.keyspace}}/_changes?&since=0", "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() From 64202a8a0e2dead75dae3cbebbe2a5b3573b944a Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 13:18:06 +0100 Subject: [PATCH 06/18] Specify or `cv` in OpenAPI spec description --- docs/api/components/schemas.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 0fcf259904..194904f03d 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -545,7 +545,7 @@ Changes-feed: description: The document ID the change happened on. type: string changes: - description: List of document leafs with each leaf containing only a `rev` field. + description: List of document leafs with each leaf containing only a `rev` or `cv` field. type: array items: oneOf: From 5aefeac956c3b2026c0b27a1d9349d2576b0cb49 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 13:31:29 +0100 Subject: [PATCH 07/18] Godoc correction for revOrCV --- db/crud.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/crud.go b/db/crud.go index 2ba302cf2f..c493c92aa4 100644 --- a/db/crud.go +++ b/db/crud.go @@ -354,7 +354,7 @@ func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Contex // Underlying revision retrieval used by Get1xRevBody, Get1xRevBodyWithHistory, GetRevCopy. // Returns the revision of a document using the revision cache. -// - revid may be "", meaning the current revision. +// - revOrCV may be "", meaning the current revision. It can be a RevTree ID or a HLV CV. // - maxHistory is >0 if the caller wants a revision history; it's the max length of the history. // - historyFrom is an optional list of revIDs the client already has. If any of these are found // in the revision's history, it will be trimmed after that revID. From 9b483222ac95375cb9b6b6d00c58bc94f78bea87 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 14:19:43 +0100 Subject: [PATCH 08/18] Lookup ChangeEntry version instead of using preference to produce audit event --- db/changes.go | 21 +++++++++++---------- rest/changes_api.go | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/db/changes.go b/db/changes.go index 546b501e15..9601ad7210 100644 --- a/db/changes.go +++ b/db/changes.go @@ -62,13 +62,14 @@ func ParseChangesVersionType(s string) (ChangesVersionType, error) { } } -// ChangeVersionString attempts to return the version string for the preferred ChangesVersionType, but will fall back to rev if cv is not available when requested. -func (ce *ChangeEntry) ChangeVersionString(versionType ChangesVersionType) string { - if s, ok := ce.Changes[0][versionType]; ok { - return s +// ChangeVersionString returns the first version string we found in the ChangeEntry. +func (ce *ChangeEntry) ChangeVersionString() string { + // pick just the first entry in changes + for _, changeVersionString := range ce.Changes[0] { + // whichever version is present we'll return it - only expected to have one version type populated - since the feed is initialized with a preferred version type. + return changeVersionString } - // requested version type not found, return `rev` as a fallback. - return ce.Changes[0][ChangesVersionTypeRevTreeID] + return "" } // A changes entry; Database.GetChanges returns an array of these. @@ -160,7 +161,7 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e } else if options.IncludeDocs { // Retrieve document via rev cache - revID := entry.ChangeVersionString(options.VersionType) + revID := entry.ChangeVersionString() err := db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID) if err != nil { base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err) @@ -183,7 +184,7 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co includeConflicts := options.Conflicts && entry.branched - revID := entry.ChangeVersionString(options.VersionType) + revID := entry.ChangeVersionString() if includeConflicts { // should've been validated in the handler layer but be defensive if options.VersionType == ChangesVersionTypeCV { @@ -576,7 +577,7 @@ func makeRevocationChangeEntry(ctx context.Context, logEntry *LogEntry, seqID Se } // AuditReadEvent issues a read event for this change entry. If there is no document body, there will be no event used. -func (ce *ChangeEntry) AuditReadEvent(ctx context.Context, versionType ChangesVersionType) { +func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) { if ce.Err != nil { return } @@ -585,7 +586,7 @@ func (ce *ChangeEntry) AuditReadEvent(ctx context.Context, versionType ChangesVe } base.Audit(ctx, base.AuditIDDocumentRead, base.AuditFields{ base.AuditFieldDocID: ce.ID, - base.AuditFieldDocVersion: ce.ChangeVersionString(versionType), + base.AuditFieldDocVersion: ce.ChangeVersionString(), }) } diff --git a/rest/changes_api.go b/rest/changes_api.go index 81414c39eb..4ad9421b40 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -428,7 +428,7 @@ func (h *handler) sendSimpleChanges(channels base.Set, options db.ChangesOptions } _ = encoder.Encode(entry) lastSeq = entry.Seq - entry.AuditReadEvent(h.ctx(), options.VersionType) + entry.AuditReadEvent(h.ctx()) } case <-heartbeat: From 49cb009e42e6f62e6b240d805bfecb8e9d72db53 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 14:20:17 +0100 Subject: [PATCH 09/18] Add TODOs - some rev-ID only code that is queued behind another ticket and BLIP sendChanges refactor --- db/changes.go | 10 +++++++++- db/revision_cache_interface.go | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/db/changes.go b/db/changes.go index 9601ad7210..8c283839e6 100644 --- a/db/changes.go +++ b/db/changes.go @@ -175,6 +175,7 @@ func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx conte if err != nil { return err } + // TODO: Stamp CV? entry.Doc, err = rev.As1xBytes(ctx, db, nil, nil, false) return err } @@ -203,6 +204,12 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co } if options.IncludeDocs { var err error + // TODO: CBG-4776 - fetch by CV + if !isLegacyRev(revID) { + base.AssertfCtx(ctx, "AddDocInstanceToChangeEntry: got IncludeDocs and a CV - can't fetch doc body yet") + return nil + } + entry.Doc, _, err = db.get1xRevFromDoc(ctx, doc, revID, false) db.dbStats().Database().NumDocReadsRest.Add(1) if err != nil { @@ -559,7 +566,8 @@ func makeChangeEntry(ctx context.Context, logEntry *LogEntry, seqID SequenceID, // This allows current version to be nil in event of CV not being populated on log entry // allowing omitempty to work as expected if logEntry.SourceID != "" { - // TODO: Remove this if we change BLIP generateBlipSyncChanges and tests to use versionType and read the value from the normal Changes array... no reason to store this info in two places. + // TODO: CBG-4804: Remove this if we change BLIP generateBlipSyncChanges and tests to use versionType and read the value from the normal Changes array... no reason to store this info in two places. + // this also allows us to do a revtree ID fallback in cases where the CV is not available on an old rev, since the type is chosen inside when producing logentry and building the change row. change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version} } if logEntry.Flags&channels.Removed != 0 { diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index b3fd5b215d..316c2a3d4d 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -241,6 +241,8 @@ func (rev *DocumentRevision) Inject1xBodyProperties(ctx context.Context, db *Dat {Key: BodyRev, Val: rev.RevID}, } + // TODO: Stamp CV if available? + if requestedHistory != nil { kvPairs = append(kvPairs, base.KVPair{Key: BodyRevisions, Val: requestedHistory}) } From d9f41b4b675d636dab45b47cebe4713c52b6a33c Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 14:21:18 +0100 Subject: [PATCH 10/18] missing AuditReadEvent params --- rest/changes_api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest/changes_api.go b/rest/changes_api.go index 4ad9421b40..a7c84f02d2 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -501,7 +501,7 @@ func (h *handler) sendContinuousChangesByHTTP(inChannels base.Set, options db.Ch break } - change.AuditReadEvent(h.ctx(), options.VersionType) + change.AuditReadEvent(h.ctx()) } } else { _, err = h.response.Write([]byte("\n")) @@ -578,7 +578,7 @@ func (h *handler) sendContinuousChangesByWebSocket(inChannels base.Set, options return err } for _, change := range changes { - change.AuditReadEvent(h.ctx(), options.VersionType) + change.AuditReadEvent(h.ctx()) } return err }) From 7f10a9b420bca41e4eac87658c5d220fb5413f17 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 15:22:03 +0100 Subject: [PATCH 11/18] Fetch by CV/RevID in include_docs style changes feed --- db/changes.go | 14 +++++-------- rest/changes_test.go | 47 +++++++++++++++++++++++++++++++++----------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/db/changes.go b/db/changes.go index 8c283839e6..0f6e83f399 100644 --- a/db/changes.go +++ b/db/changes.go @@ -204,17 +204,13 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co } if options.IncludeDocs { var err error - // TODO: CBG-4776 - fetch by CV - if !isLegacyRev(revID) { - base.AssertfCtx(ctx, "AddDocInstanceToChangeEntry: got IncludeDocs and a CV - can't fetch doc body yet") - return nil - } - - entry.Doc, _, err = db.get1xRevFromDoc(ctx, doc, revID, false) - db.dbStats().Database().NumDocReadsRest.Add(1) + // TODO: CBG-4776 - fetch by CV with sane APIs + err = db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID) if err != nil { - base.WarnfCtx(ctx, "Changes feed: error getting doc %q/%q: %v", base.UD(doc.ID), revID, err) + base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err) + return err } + db.dbStats().Database().NumDocReadsRest.Add(1) } return nil diff --git a/rest/changes_test.go b/rest/changes_test.go index 6fc1212686..ca98bb2f71 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -13,6 +13,7 @@ import ( "log" "net/http" "net/http/httptest" + "strings" "sync" "sync/atomic" "testing" @@ -455,8 +456,12 @@ func TestChangesVersionType(t *testing.T) { rt := NewRestTester(t, nil) defer rt.Close() - rt.PutDoc("doc1", `{"foo":"bar"}`) - rt.PutDoc("doc2", `{"buzz":"quux"}`) + doc1 := "doc1" + doc1Body := `{"foo":"bar"}` + rt.PutDoc(doc1, doc1Body) + doc2 := "doc2" + doc2Body := `{"buzz":"quux"}` + rt.PutDoc(doc2, doc2Body) rt.WaitForPendingChanges() @@ -486,7 +491,7 @@ func TestChangesVersionType(t *testing.T) { { name: "rev version_type", changesRequestMethod: http.MethodGet, - changesRequestQueryParams: "?version_type=rev", + changesRequestQueryParams: "?version_type=rev&include_docs=true", expectedStatus: http.StatusOK, expectedVersionType: db.ChangesVersionTypeRevTreeID, expectedDocs: 2, @@ -494,7 +499,7 @@ func TestChangesVersionType(t *testing.T) { { name: "cv version_type", changesRequestMethod: http.MethodGet, - changesRequestQueryParams: "?version_type=cv", + changesRequestQueryParams: "?version_type=cv&include_docs=true", expectedStatus: http.StatusOK, expectedVersionType: db.ChangesVersionTypeCV, expectedDocs: 2, @@ -502,7 +507,7 @@ func TestChangesVersionType(t *testing.T) { { name: "rev docid filter", changesRequestMethod: http.MethodGet, - changesRequestQueryParams: "?version_type=rev&filter=_doc_ids&doc_ids=doc1", + changesRequestQueryParams: "?version_type=rev&filter=_doc_ids&doc_ids=doc1&include_docs=true", expectedStatus: http.StatusOK, expectedVersionType: db.ChangesVersionTypeRevTreeID, expectedDocs: 1, @@ -510,7 +515,7 @@ func TestChangesVersionType(t *testing.T) { { name: "cv docid filter", changesRequestMethod: http.MethodGet, - changesRequestQueryParams: "?version_type=cv&filter=_doc_ids&doc_ids=doc1", + changesRequestQueryParams: "?version_type=cv&filter=_doc_ids&doc_ids=doc1&include_docs=true", expectedStatus: http.StatusOK, expectedVersionType: db.ChangesVersionTypeCV, expectedDocs: 1, @@ -567,6 +572,16 @@ func TestChangesVersionType(t *testing.T) { require.Truef(t, ok, "Expected version type %s, got %v", test.expectedVersionType, change) require.NotEmpty(t, versionValue) } + if strings.Contains(test.changesRequestQueryParams, "include_docs=true") { + var expectedBody string + switch changeEntry.ID { + case doc1: + expectedBody = doc1Body + case doc2: + expectedBody = doc2Body + } + require.Contains(t, string(changeEntry.Doc), expectedBody[1:len(expectedBody)-1]) // strip {}s from doc body - 1.x API stamps additional properties so accommodate + } } }) } @@ -579,16 +594,18 @@ func TestChangesFeedCVWithOldRevOnlyData(t *testing.T) { seq, err := db.AllocateTestSequence(rt.GetDatabase()) require.NoError(t, err) oldDoc := "oldDoc" - oldDocBody := []byte(`{"body_field":"1234"}`) - oldDocSyncData := []byte(fmt.Sprintf(`{"sequence":%d,"rev":{"rev": "1-abc"},"value_crc32c":"%s"}`, seq, base.Crc32cHashString(oldDocBody))) - _, err = rt.GetSingleDataStore().WriteWithXattrs(t.Context(), oldDoc, 0, 0, oldDocBody, map[string][]byte{base.SyncXattrName: oldDocSyncData}, nil, nil) + oldDocBody := `{"body_field":"1234"}` + oldDocSyncData := []byte(fmt.Sprintf(`{"sequence":%d,"rev":{"rev": "1-abc"},"history":{"revs":["1-abc"],"parents":[-1],"channels":[null]},"value_crc32c":"%s"}`, seq, base.Crc32cHashString([]byte(oldDocBody)))) + _, err = rt.GetSingleDataStore().WriteWithXattrs(t.Context(), oldDoc, 0, 0, []byte(oldDocBody), map[string][]byte{base.SyncXattrName: oldDocSyncData}, nil, nil) require.NoError(t, err) - rt.PutDoc("newDoc", `{"foo":"bar"}`) + newDoc := "newDoc" + newDocBody := `{"foo":"bar"}` + rt.PutDoc(newDoc, newDocBody) rt.WaitForPendingChanges() - resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_changes?version_type=cv", "") + resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_changes?version_type=cv&include_docs=true", "") RequireStatus(t, resp, http.StatusOK) var changesResults ChangesResults require.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &changesResults)) @@ -608,5 +625,13 @@ func TestChangesFeedCVWithOldRevOnlyData(t *testing.T) { require.Truef(t, ok, "Expected version type %s, got %v", expectedType, change) require.NotEmpty(t, versionValue) } + var expectedBody string + switch changeEntry.ID { + case oldDoc: + expectedBody = oldDocBody + case newDoc: + expectedBody = newDocBody + } + require.Contains(t, string(changeEntry.Doc), expectedBody[1:len(expectedBody)-1]) // strip {}s from doc body - 1.x API stamps additional properties so accommodate } } From e4b21b8d230c535c33c34702ba5982062d2a68c7 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 15:27:37 +0100 Subject: [PATCH 12/18] Rewrite switch case in `createChangesEntry` to have assert case instead of silent rev fallback --- db/changes.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/db/changes.go b/db/changes.go index 0f6e83f399..4001562da4 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1364,14 +1364,13 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio return nil } - row.Changes = []ChangeByVersionType{{ChangesVersionTypeRevTreeID: populatedDoc.CurrentRev}} switch options.VersionType { case ChangesVersionTypeCV: - row.Changes[0] = ChangeByVersionType{options.VersionType: populatedDoc.HLV.GetCurrentVersionString()} - case ChangesVersionTypeRevTreeID: - fallthrough + row.Changes = []ChangeByVersionType{{options.VersionType: populatedDoc.HLV.GetCurrentVersionString()}} + case "", ChangesVersionTypeRevTreeID: + row.Changes = []ChangeByVersionType{{options.VersionType: populatedDoc.CurrentRev}} default: - // already initialized with a 'rev' change entry above + base.AssertfCtx(ctx, "createChangeEntry called with an unsupported VersionType: %s", options.VersionType) } row.Deleted = populatedDoc.Deleted From e79e7caf39ec70dadc8016bcceab6bf387e98b32 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 15:28:12 +0100 Subject: [PATCH 13/18] Continue returning metadata only changes `row` without doc body for error on fetch - consistent with old behaviour --- db/changes.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/changes.go b/db/changes.go index 4001562da4..b982a6c284 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1415,8 +1415,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio row.Removed = base.SetFromArray(removedChannels) if options.IncludeDocs || options.Conflicts { if err := db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options); err != nil { - base.WarnfCtx(ctx, "Unable to add doc instance to change entry for %s: %v", base.UD(docid), err) - return nil + base.WarnfCtx(ctx, "Unable to add doc instance to change entry for %s: %v - will return metadata only", base.UD(docid), err) } } From 09c2719b8c88081c00792071dbb7a67f23b6963f Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 15:31:32 +0100 Subject: [PATCH 14/18] Use consts in switch case for `createChangesEntry` (handle empty string case) --- db/changes.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/changes.go b/db/changes.go index b982a6c284..ce171a1768 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1366,9 +1366,9 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio switch options.VersionType { case ChangesVersionTypeCV: - row.Changes = []ChangeByVersionType{{options.VersionType: populatedDoc.HLV.GetCurrentVersionString()}} + row.Changes = []ChangeByVersionType{{ChangesVersionTypeCV: populatedDoc.HLV.GetCurrentVersionString()}} case "", ChangesVersionTypeRevTreeID: - row.Changes = []ChangeByVersionType{{options.VersionType: populatedDoc.CurrentRev}} + row.Changes = []ChangeByVersionType{{ChangesVersionTypeRevTreeID: populatedDoc.CurrentRev}} default: base.AssertfCtx(ctx, "createChangeEntry called with an unsupported VersionType: %s", options.VersionType) } From 7a5e448302f2a317685d27a87633c4d425eca055 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 16:20:46 +0100 Subject: [PATCH 15/18] %s -> %q for logging version type in changes options --- db/changes.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/changes.go b/db/changes.go index ce171a1768..ebfc7b08a8 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1370,7 +1370,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio case "", ChangesVersionTypeRevTreeID: row.Changes = []ChangeByVersionType{{ChangesVersionTypeRevTreeID: populatedDoc.CurrentRev}} default: - base.AssertfCtx(ctx, "createChangeEntry called with an unsupported VersionType: %s", options.VersionType) + base.AssertfCtx(ctx, "createChangeEntry called with an unsupported VersionType: %q", options.VersionType) } row.Deleted = populatedDoc.Deleted @@ -1424,7 +1424,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio func (options ChangesOptions) String() string { return fmt.Sprintf( - `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, Revocations: %t, RequestPlusSeq: %d, VersionType: %s}`, + `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, Revocations: %t, RequestPlusSeq: %d, VersionType: %q}`, options.Since, options.Limit, options.Conflicts, From f2950cc7038363363522133d578036c6019e226d Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 16:22:07 +0100 Subject: [PATCH 16/18] godoc comment for ParseChangesVersionType --- db/changes.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/changes.go b/db/changes.go index ebfc7b08a8..f8ea5ad3e0 100644 --- a/db/changes.go +++ b/db/changes.go @@ -51,6 +51,8 @@ const ( ChangesVersionTypeCV ChangesVersionType = "cv" // Use current version in changes feed entries ) +// ParseChangesVersionType parses a string into a ChangesVersionType. +// If "" is passed, it defaults to ChangesVersionTypeRevTreeID as a default fallback. func ParseChangesVersionType(s string) (ChangesVersionType, error) { switch ChangesVersionType(s) { case "", ChangesVersionTypeRevTreeID: From ca9a072a41b2a01f88ac49fe6c83f2e962a3dee8 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 16:25:18 +0100 Subject: [PATCH 17/18] Assert for length check in ChangeVersionString --- db/changes.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/db/changes.go b/db/changes.go index f8ea5ad3e0..5f65ec5a8c 100644 --- a/db/changes.go +++ b/db/changes.go @@ -65,7 +65,11 @@ func ParseChangesVersionType(s string) (ChangesVersionType, error) { } // ChangeVersionString returns the first version string we found in the ChangeEntry. -func (ce *ChangeEntry) ChangeVersionString() string { +func (ce *ChangeEntry) ChangeVersionString(ctx context.Context) string { + if len(ce.Changes) == 0 { + base.AssertfCtx(ctx, "ChangesEntry has no changes: %s", ce.String()) + return "" + } // pick just the first entry in changes for _, changeVersionString := range ce.Changes[0] { // whichever version is present we'll return it - only expected to have one version type populated - since the feed is initialized with a preferred version type. @@ -163,7 +167,7 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e } else if options.IncludeDocs { // Retrieve document via rev cache - revID := entry.ChangeVersionString() + revID := entry.ChangeVersionString(ctx) err := db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID) if err != nil { base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err) @@ -187,7 +191,7 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co includeConflicts := options.Conflicts && entry.branched - revID := entry.ChangeVersionString() + revID := entry.ChangeVersionString(ctx) if includeConflicts { // should've been validated in the handler layer but be defensive if options.VersionType == ChangesVersionTypeCV { @@ -592,7 +596,7 @@ func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) { } base.Audit(ctx, base.AuditIDDocumentRead, base.AuditFields{ base.AuditFieldDocID: ce.ID, - base.AuditFieldDocVersion: ce.ChangeVersionString(), + base.AuditFieldDocVersion: ce.ChangeVersionString(ctx), }) } From ad063d944985e77df413c1105f8873f72036b7bc Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 15 Aug 2025 17:26:46 +0100 Subject: [PATCH 18/18] Stamp _cv in changes bodies instead of TODO --- db/changes.go | 1 - db/revision_cache_interface.go | 4 +++- rest/blip_channel_filter_test.go | 20 ++++++++++---------- rest/changestest/changes_api_test.go | 8 ++++++++ 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/db/changes.go b/db/changes.go index 5f65ec5a8c..a0ad2504e8 100644 --- a/db/changes.go +++ b/db/changes.go @@ -181,7 +181,6 @@ func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx conte if err != nil { return err } - // TODO: Stamp CV? entry.Doc, err = rev.As1xBytes(ctx, db, nil, nil, false) return err } diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index 316c2a3d4d..c8d6833d3c 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -241,7 +241,9 @@ func (rev *DocumentRevision) Inject1xBodyProperties(ctx context.Context, db *Dat {Key: BodyRev, Val: rev.RevID}, } - // TODO: Stamp CV if available? + if rev.CV != nil { + kvPairs = append(kvPairs, base.KVPair{Key: BodyCV, Val: rev.CV.String()}) + } if requestedHistory != nil { kvPairs = append(kvPairs, base.KVPair{Key: BodyRevisions, Val: requestedHistory}) diff --git a/rest/blip_channel_filter_test.go b/rest/blip_channel_filter_test.go index 077fba63a6..435b258196 100644 --- a/rest/blip_channel_filter_test.go +++ b/rest/blip_channel_filter_test.go @@ -46,7 +46,7 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { client := btcRunner.SingleCollection(btc.id) const docID = "doc1" - version1 := rt.PutDoc("doc1", `{"channels":["A"]}`) + version1 := rt.PutDocDirectly("doc1", JsonToMap(t, `{"channels":["A"]}`)) rt.WaitForPendingChanges() response := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?since=0&channels=A&include_docs=true", "", "alice") @@ -56,10 +56,10 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { { "results": [ {"seq":1, "id": "_user/alice", "changes":[]}, - {"seq":3, "id": "doc1", "doc": {"_id": "doc1", "_rev":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} + {"seq":3, "id": "doc1", "doc": {"_id": "doc1", "_rev":"%s", "_cv":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} ], "last_seq": "3" -}`, version1.RevTreeID, version1.RevTreeID) +}`, version1.RevTreeID, version1.CV.String(), version1.RevTreeID) require.JSONEq(t, expectedChanges1, string(response.BodyBytes())) client.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0", Channels: "A"}) @@ -67,9 +67,9 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { btcRunner.WaitForVersion(btc.id, docID, version1) // remove channel A from doc1 - version2 := rt.UpdateDoc(docID, version1, `{"channels":["B"]}`) + version2 := rt.UpdateDocDirectly(docID, version1, JsonToMap(t, `{"channels":["B"]}`)) markerDocID := "marker" - markerDocVersion := rt.PutDoc(markerDocID, `{"channels":["A"]}`) + markerDocVersion := rt.PutDocDirectly(markerDocID, JsonToMap(t, `{"channels":["A"]}`)) rt.WaitForPendingChanges() // alice will see doc1 rev2 with body @@ -79,11 +79,11 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { aliceExpectedChanges2 := fmt.Sprintf(` { "results": [ - {"seq":4, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "channels": ["B"]}, "changes": [{"rev":"%s"}]}, - {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} + {"seq":4, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "_cv":"%s", "channels": ["B"]}, "changes": [{"rev":"%s"}]}, + {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "_cv":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} ], "last_seq": "5" -}`, docID, docID, version2.RevTreeID, version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.RevTreeID) +}`, docID, docID, version2.RevTreeID, version2.CV.String(), version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.CV.String(), markerDocVersion.RevTreeID) require.JSONEq(t, aliceExpectedChanges2, string(response.BodyBytes())) client.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0", Channels: "A"}) @@ -105,10 +105,10 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { { "results": [ {"seq":4, "id": "doc1", "removed":["A"], "doc": {"_id": "doc1", "_rev":"%s", "_removed": true}, "changes": [{"rev":"%s"}]}, - {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} + {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "_cv": "%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} ], "last_seq": "5" -}`, version2.RevTreeID, version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.RevTreeID) +}`, version2.RevTreeID, version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.CV.String(), markerDocVersion.RevTreeID) require.JSONEq(t, bobExpectedChanges2, string(response.BodyBytes())) }) } diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index bf166bdb7e..3f2c3bac16 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -837,6 +837,10 @@ func assertChangeEntryMatches(t *testing.T, expectedChangeEntryString string, re var resultBody db.Body assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) assert.NoError(t, resultBody.Unmarshal(result.Doc)) + + // strip out _cv - it's not deterministic in tests like _rev is, but also tests really shouldn't be comparing full bodies! It's so brittle... + delete(resultBody, db.BodyCV) + db.AssertEqualBodies(t, expectedBody, resultBody) } else { assert.Equal(t, expectedChange.Doc, result.Doc) @@ -1893,6 +1897,10 @@ func TestChangesIncludeDocs(t *testing.T) { var resultBody db.Body assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) assert.NoError(t, resultBody.Unmarshal(result.Doc)) + + // strip out _cv - it's not deterministic in tests like _rev is, but also tests really shouldn't be comparing full bodies! It's so brittle... + delete(resultBody, db.BodyCV) + db.AssertEqualBodies(t, expectedBody, resultBody) } else { assert.Equal(t, expectedChange.Doc, result.Doc)