diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 14879b7e6f..efb283e42e 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -534,7 +534,7 @@ func TestChannelCacheBackfill(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1, TriggeredBy: 0, LowSeq: 2}, ID: "doc-1", - Changes: []ChangeRev{{"rev": "1-a"}}, + Changes: []ChangeByVersionType{{"rev": "1-a"}}, collectionID: collectionID}, changes[0]) lastSeq := changes[len(changes)-1].Seq @@ -568,7 +568,7 @@ func TestChannelCacheBackfill(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 3, LowSeq: 3}, ID: "doc-3", - Changes: []ChangeRev{{"rev": "1-a"}}, + Changes: []ChangeByVersionType{{"rev": "1-a"}}, collectionID: collectionID, }, changes[0]) @@ -722,7 +722,7 @@ func TestLowSequenceHandling(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1, TriggeredBy: 0, LowSeq: 2}, ID: "doc-1", - Changes: []ChangeRev{{"rev": "1-a"}}, + Changes: []ChangeByVersionType{{"rev": "1-a"}}, collectionID: collectionID}, changes[0]) // Test backfill clear - sequence numbers go back to standard handling diff --git a/db/changes.go b/db/changes.go index 974905dfed..a0ad2504e8 100644 --- a/db/changes.go +++ b/db/changes.go @@ -26,32 +26,69 @@ import ( // Options for changes-feeds. ChangesOptions must not contain any mutable pointer references, as // changes processing currently assumes a deep copy when doing chanOpts := changesOptions. type ChangesOptions struct { - Since SequenceID // sequence # to start _after_ - Limit int // Max number of changes to return, if nonzero - Conflicts bool // Show all conflicting revision IDs, not just winning one? - IncludeDocs bool // Include doc body of each change? - Wait bool // Wait for results, instead of immediately returning empty result? - Continuous bool // Run continuously until terminated? - RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq - HeartbeatMs uint64 // How often to send a heartbeat to the client - TimeoutMs uint64 // After this amount of time, close the longpoll connection - ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions - Revocations bool // Specifies whether revocation messages should be sent on the changes feed - clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client - ChangesCtx context.Context // Used for cancelling checking the changes feed should stop + Since SequenceID // sequence # to start _after_ + Limit int // Max number of changes to return, if nonzero + Conflicts bool // Show all conflicting revision IDs, not just winning one? + IncludeDocs bool // Include doc body of each change? + Wait bool // Wait for results, instead of immediately returning empty result? + Continuous bool // Run continuously until terminated? + RequestPlusSeq uint64 // Do not stop changes before cached sequence catches up with requestPlusSeq + HeartbeatMs uint64 // How often to send a heartbeat to the client + TimeoutMs uint64 // After this amount of time, close the longpoll connection + ActiveOnly bool // If true, only return information on non-deleted, non-removed revisions + Revocations bool // Specifies whether revocation messages should be sent on the changes feed + clientType clientType // Can be used to determine if the replication is being started from a CBL 2.x or SGR2 client + ChangesCtx context.Context // Used for cancelling checking the changes feed should stop + VersionType ChangesVersionType // The type of version to use for the changes feed. This is used to determine whether to use send revtree IDs or CV in the changes feed entries. +} + +// ChangesVersionType determines the preferred version type to use in changes feed entries. +// If the requested version type is not available, the feed will attempt to fall back to ChangesVersionTypeRevTreeID which should always be available. +type ChangesVersionType string + +const ( + ChangesVersionTypeRevTreeID ChangesVersionType = "rev" // Use revtree IDs in changes feed entries + ChangesVersionTypeCV ChangesVersionType = "cv" // Use current version in changes feed entries +) + +// ParseChangesVersionType parses a string into a ChangesVersionType. +// If "" is passed, it defaults to ChangesVersionTypeRevTreeID as a default fallback. +func ParseChangesVersionType(s string) (ChangesVersionType, error) { + switch ChangesVersionType(s) { + case "", ChangesVersionTypeRevTreeID: + return ChangesVersionTypeRevTreeID, nil + case ChangesVersionTypeCV: + return ChangesVersionTypeCV, nil + default: + return "", fmt.Errorf("unknown changes version type: %q", s) + } +} + +// ChangeVersionString returns the first version string we found in the ChangeEntry. +func (ce *ChangeEntry) ChangeVersionString(ctx context.Context) string { + if len(ce.Changes) == 0 { + base.AssertfCtx(ctx, "ChangesEntry has no changes: %s", ce.String()) + return "" + } + // pick just the first entry in changes + for _, changeVersionString := range ce.Changes[0] { + // whichever version is present we'll return it - only expected to have one version type populated - since the feed is initialized with a preferred version type. + return changeVersionString + } + return "" } // A changes entry; Database.GetChanges returns an array of these. // Marshals into the standard CouchDB _changes format. type ChangeEntry struct { - Seq SequenceID `json:"seq"` - ID string `json:"id"` - Deleted bool `json:"deleted,omitempty"` - Removed base.Set `json:"removed,omitempty"` - Doc json.RawMessage `json:"doc,omitempty"` - Changes []ChangeRev `json:"changes"` - Err error `json:"err,omitempty"` // Used to notify feed consumer of errors - allRemoved bool // Flag to track whether an entry is a removal in all channels visible to the user. + Seq SequenceID `json:"seq"` + ID string `json:"id"` + Deleted bool `json:"deleted,omitempty"` + Removed base.Set `json:"removed,omitempty"` + Doc json.RawMessage `json:"doc,omitempty"` + Changes []ChangeByVersionType `json:"changes"` + Err error `json:"err,omitempty"` // Used to notify feed consumer of errors + allRemoved bool // Flag to track whether an entry is a removal in all channels visible to the user. branched bool backfill backfillFlag // Flag used to identify non-client entries used for backfill synchronization (di only) principalDoc bool // Used to indicate _user/_role docs @@ -74,16 +111,12 @@ const ( BackfillFlag_Complete ) -type ChangeRev map[string]string // Key is always "rev", value is rev ID +type ChangeByVersionType map[ChangesVersionType]string // Keyed by the type of version in the value type ViewDoc struct { Json json.RawMessage // should be type 'document', but that fails to unmarshal correctly } -func (db *DatabaseCollectionWithUser) AddDocToChangeEntry(ctx context.Context, entry *ChangeEntry, options ChangesOptions) { - db.addDocToChangeEntry(ctx, entry, options) -} - // Adds a document body and/or its conflicts to a ChangeEntry func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, entry *ChangeEntry, options ChangesOptions) { @@ -113,7 +146,10 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e base.WarnfCtx(ctx, "Changes feed: error getting doc %q: %v", base.UD(entry.ID), err) return } - db.AddDocInstanceToChangeEntry(ctx, entry, doc, options) + if err := db.AddDocInstanceToChangeEntry(ctx, entry, doc, options); err != nil { + base.WarnfCtx(ctx, "Changes feed: error adding doc %q to change entry: %v", base.UD(entry.ID), err) + return + } } else if includeConflicts { // Load doc metadata only @@ -124,11 +160,14 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e base.WarnfCtx(ctx, "Changes feed: error getting doc sync data %q: %v", base.UD(entry.ID), err) return } - db.AddDocInstanceToChangeEntry(ctx, entry, doc, options) + if err := db.AddDocInstanceToChangeEntry(ctx, entry, doc, options); err != nil { + base.WarnfCtx(ctx, "Changes feed: error adding doc %q to change entry: %v", base.UD(entry.ID), err) + return + } } else if options.IncludeDocs { // Retrieve document via rev cache - revID := entry.Changes[0]["rev"] + revID := entry.ChangeVersionString(ctx) err := db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID) if err != nil { base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err) @@ -147,31 +186,39 @@ func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx conte } // Adds a document body and/or its conflicts to a ChangeEntry -func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Context, entry *ChangeEntry, doc *Document, options ChangesOptions) { +func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Context, entry *ChangeEntry, doc *Document, options ChangesOptions) error { includeConflicts := options.Conflicts && entry.branched - revID := entry.Changes[0]["rev"] + revID := entry.ChangeVersionString(ctx) if includeConflicts { + // should've been validated in the handler layer but be defensive + if options.VersionType == ChangesVersionTypeCV { + return fmt.Errorf("changes feed does not support showing in-conflict revisions when using version_type=cv") + } doc.History.forEachLeaf(func(leaf *RevInfo) { if leaf.ID != revID { if !leaf.Deleted { entry.Deleted = false } if !(options.ActiveOnly && leaf.Deleted) { - entry.Changes = append(entry.Changes, ChangeRev{"rev": leaf.ID}) + entry.Changes = append(entry.Changes, ChangeByVersionType{"rev": leaf.ID}) } } }) } if options.IncludeDocs { var err error - entry.Doc, _, err = db.get1xRevFromDoc(ctx, doc, revID, false) - db.dbStats().Database().NumDocReadsRest.Add(1) + // TODO: CBG-4776 - fetch by CV with sane APIs + err = db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID) if err != nil { - base.WarnfCtx(ctx, "Changes feed: error getting doc %q/%q: %v", base.UD(doc.ID), revID, err) + base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err) + return err } + db.dbStats().Database().NumDocReadsRest.Add(1) } + + return nil } // Parameters @@ -300,7 +347,7 @@ func (db *DatabaseCollectionWithUser) buildRevokedFeed(ctx context.Context, ch c continue } - change := makeRevocationChangeEntry(logEntry, seqID, singleChannelCache.ChannelID()) + change := makeRevocationChangeEntry(ctx, logEntry, seqID, singleChannelCache.ChannelID(), options.VersionType) base.DebugfCtx(ctx, base.KeyChanges, "Channel feed processing revocation seq: %v in channel %s ", seqID, base.UD(singleChannelCache.ChannelID().Name)) @@ -459,7 +506,7 @@ func (db *DatabaseCollectionWithUser) changesFeed(ctx context.Context, singleCha TriggeredBy: options.Since.TriggeredBy, } - change := makeChangeEntry(logEntry, seqID, singleChannelCache.ChannelID()) + change := makeChangeEntry(ctx, logEntry, seqID, singleChannelCache.ChannelID(), options.VersionType) lastSeq = logEntry.Sequence // Don't include deletes or removals during initial channel backfill @@ -494,20 +541,34 @@ func (db *DatabaseCollectionWithUser) changesFeed(ctx context.Context, singleCha return feed } -func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) ChangeEntry { +func makeChangeEntry(ctx context.Context, logEntry *LogEntry, seqID SequenceID, channel channels.ID, versionType ChangesVersionType) ChangeEntry { change := ChangeEntry{ Seq: seqID, ID: logEntry.DocID, Deleted: (logEntry.Flags & channels.Deleted) != 0, - Changes: []ChangeRev{{"rev": logEntry.RevID}}, + Changes: []ChangeByVersionType{{ChangesVersionTypeRevTreeID: logEntry.RevID}}, branched: (logEntry.Flags & channels.Branched) != 0, principalDoc: logEntry.IsPrincipal, collectionID: logEntry.CollectionID, } + + switch versionType { + case ChangesVersionTypeCV: + if logEntry.SourceID != "" { + change.Changes[0] = ChangeByVersionType{versionType: Version{SourceID: logEntry.SourceID, Value: logEntry.Version}.String()} + } + case ChangesVersionTypeRevTreeID: + fallthrough + default: + // already initialized with a 'rev' change entry + } + // populate CurrentVersion entry if log entry has sourceID and Version populated // This allows current version to be nil in event of CV not being populated on log entry // allowing omitempty to work as expected if logEntry.SourceID != "" { + // TODO: CBG-4804: Remove this if we change BLIP generateBlipSyncChanges and tests to use versionType and read the value from the normal Changes array... no reason to store this info in two places. + // this also allows us to do a revtree ID fallback in cases where the CV is not available on an old rev, since the type is chosen inside when producing logentry and building the change row. change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version} } if logEntry.Flags&channels.Removed != 0 { @@ -517,8 +578,8 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) return change } -func makeRevocationChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID) ChangeEntry { - entry := makeChangeEntry(logEntry, seqID, channel) +func makeRevocationChangeEntry(ctx context.Context, logEntry *LogEntry, seqID SequenceID, channel channels.ID, versionType ChangesVersionType) ChangeEntry { + entry := makeChangeEntry(ctx, logEntry, seqID, channel, versionType) entry.Revoked = true return entry @@ -534,7 +595,7 @@ func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) { } base.Audit(ctx, base.AuditIDDocumentRead, base.AuditFields{ base.AuditFieldDocID: ce.ID, - base.AuditFieldDocVersion: ce.Changes[0]["rev"], + base.AuditFieldDocVersion: ce.ChangeVersionString(ctx), }) } @@ -603,7 +664,7 @@ func (db *DatabaseCollectionWithUser) appendUserFeed(feeds []<-chan *ChangeEntry entry := ChangeEntry{ Seq: userSeq, ID: "_user/" + name, - Changes: []ChangeRev{}, + Changes: []ChangeByVersionType{}, principalDoc: true, } userFeed := make(chan *ChangeEntry, 1) @@ -799,7 +860,7 @@ func (col *DatabaseCollectionWithUser) SimpleMultiChangesFeed(ctx context.Contex if useLateSequenceFeeds { lateSequenceFeedHandler := lateSequenceFeeds[chanID] if lateSequenceFeedHandler != nil { - latefeed, err := col.getLateFeed(options.ChangesCtx, lateSequenceFeedHandler, singleChannelCache) + latefeed, err := col.getLateFeed(options.ChangesCtx, lateSequenceFeedHandler, singleChannelCache, options.VersionType) if err != nil { base.WarnfCtx(ctx, "MultiChangesFeed got error reading late sequence feed %q, rolling back channel changes feed to last sent low sequence #%d.", base.UD(chanName), lastSentLowSeq) chanOpts.Since.LowSeq = lastSentLowSeq @@ -1188,7 +1249,7 @@ func (db *DatabaseCollectionWithUser) newLateSequenceFeed(singleChannelCache Sin // Feed to process late sequences for the channel. Updates lastSequence as it works the feed. Error indicates // previous position in late sequence feed isn't available, and caller should reset to low sequence. -func (db *DatabaseCollectionWithUser) getLateFeed(ctx context.Context, feedHandler *lateSequenceFeed, singleChannelCache SingleChannelCache) (<-chan *ChangeEntry, error) { +func (db *DatabaseCollectionWithUser) getLateFeed(ctx context.Context, feedHandler *lateSequenceFeed, singleChannelCache SingleChannelCache, versionType ChangesVersionType) (<-chan *ChangeEntry, error) { if !singleChannelCache.SupportsLateFeed() { return nil, errors.New("Cache doesn't support late feeds") @@ -1227,7 +1288,7 @@ func (db *DatabaseCollectionWithUser) getLateFeed(ctx context.Context, feedHandl seqID := SequenceID{ Seq: logEntry.Sequence, } - change := makeChangeEntry(logEntry, seqID, singleChannelCache.ChannelID()) + change := makeChangeEntry(ctx, logEntry, seqID, singleChannelCache.ChannelID(), versionType) select { case <-ctx.Done(): return @@ -1308,9 +1369,15 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio return nil } - changes := make([]ChangeRev, 1) - changes[0] = ChangeRev{"rev": populatedDoc.CurrentRev} - row.Changes = changes + switch options.VersionType { + case ChangesVersionTypeCV: + row.Changes = []ChangeByVersionType{{ChangesVersionTypeCV: populatedDoc.HLV.GetCurrentVersionString()}} + case "", ChangesVersionTypeRevTreeID: + row.Changes = []ChangeByVersionType{{ChangesVersionTypeRevTreeID: populatedDoc.CurrentRev}} + default: + base.AssertfCtx(ctx, "createChangeEntry called with an unsupported VersionType: %q", options.VersionType) + } + row.Deleted = populatedDoc.Deleted row.Seq = SequenceID{Seq: populatedDoc.Sequence} row.SetBranched((populatedDoc.Flags & channels.Branched) != 0) @@ -1352,7 +1419,9 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio row.Removed = base.SetFromArray(removedChannels) if options.IncludeDocs || options.Conflicts { - db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options) + if err := db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options); err != nil { + base.WarnfCtx(ctx, "Unable to add doc instance to change entry for %s: %v - will return metadata only", base.UD(docid), err) + } } return row @@ -1360,7 +1429,7 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio func (options ChangesOptions) String() string { return fmt.Sprintf( - `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, Revocations: %t, RequestPlusSeq: %d}`, + `{Since: %s, Limit: %d, Conflicts: %t, IncludeDocs: %t, Wait: %t, Continuous: %t, HeartbeatMs: %d, TimeoutMs: %d, ActiveOnly: %t, Revocations: %t, RequestPlusSeq: %d, VersionType: %q}`, options.Since, options.Limit, options.Conflicts, @@ -1372,6 +1441,7 @@ func (options ChangesOptions) String() string { options.ActiveOnly, options.Revocations, options.RequestPlusSeq, + options.VersionType, ) } diff --git a/db/changes_test.go b/db/changes_test.go index 0447893c0f..b2bc8d2d19 100644 --- a/db/changes_test.go +++ b/db/changes_test.go @@ -162,7 +162,7 @@ func TestChangesAfterChannelAdded(t *testing.T) { require.Len(t, changes, 1) assert.Equal(t, "doc2", changes[0].ID) - assert.Equal(t, []ChangeRev{{"rev": revid}}, changes[0].Changes) + assert.Equal(t, []ChangeByVersionType{{"rev": revid}}, changes[0].Changes) // validate from zero changes = getChanges(t, collection, base.SetOf("*"), getChangesOptionsWithZeroSeq(t)) @@ -234,7 +234,7 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1}, ID: "alpha", - Changes: []ChangeRev{{"rev": revid}}, + Changes: []ChangeByVersionType{{"rev": revid}}, collectionID: collectionID}, changes[0]) lastSeq := getLastSeq(changes) @@ -279,7 +279,7 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) { ID: "alpha", Removed: base.SetOf("A"), allRemoved: true, - Changes: []ChangeRev{{"rev": "2-e99405a23fa102238fa8c3fd499b15bc"}}, + Changes: []ChangeByVersionType{{"rev": "2-e99405a23fa102238fa8c3fd499b15bc"}}, collectionID: collectionID}, changes[0]) printChanges(changes) @@ -352,7 +352,7 @@ func TestDocDeletionFromChannelCoalesced(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 1}, ID: "alpha", - Changes: []ChangeRev{{"rev": revid}}, + Changes: []ChangeByVersionType{{"rev": revid}}, collectionID: collectionID}, changes[0]) lastSeq := getLastSeq(changes) @@ -392,7 +392,7 @@ func TestDocDeletionFromChannelCoalesced(t *testing.T) { require.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 3}, ID: "alpha", - Changes: []ChangeRev{{"rev": "3-e99405a23fa102238fa8c3fd499b15bc"}}, + Changes: []ChangeByVersionType{{"rev": "3-e99405a23fa102238fa8c3fd499b15bc"}}, collectionID: collectionID}, changes[0]) printChanges(changes) diff --git a/db/crud.go b/db/crud.go index 171f2f5406..c493c92aa4 100644 --- a/db/crud.go +++ b/db/crud.go @@ -354,27 +354,43 @@ func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Contex // Underlying revision retrieval used by Get1xRevBody, Get1xRevBodyWithHistory, GetRevCopy. // Returns the revision of a document using the revision cache. -// - revid may be "", meaning the current revision. +// - revOrCV may be "", meaning the current revision. It can be a RevTree ID or a HLV CV. // - maxHistory is >0 if the caller wants a revision history; it's the max length of the history. // - historyFrom is an optional list of revIDs the client already has. If any of these are found // in the revision's history, it will be trimmed after that revID. // - attachmentsSince is nil to return no attachment bodies, otherwise a (possibly empty) list of // revisions for which the client already has attachments and doesn't need bodies. Any attachment // that hasn't changed since one of those revisions will be returned as a stub. -func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string) (revision DocumentRevision, err error) { - if revid != "" { +func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revOrCV string, maxHistory int, historyFrom []string) (DocumentRevision, error) { + var ( + revID *string + cv *Version + ) + + var ( + revision DocumentRevision + getErr error + ) + if revOrCV != "" { // Get a specific revision body and history from the revision cache // (which will load them if necessary, by calling revCacheLoader, above) - revision, err = db.revisionCache.GetWithRev(ctx, docid, revid, RevCacheOmitDelta) + if currentVersion, parseErr := ParseVersion(revOrCV); parseErr != nil { + // try as a rev ID + revID = &revOrCV + revision, getErr = db.revisionCache.GetWithRev(ctx, docid, *revID, RevCacheOmitDelta) + } else { + cv = ¤tVersion + revision, getErr = db.revisionCache.GetWithCV(ctx, docid, cv, RevCacheOmitDelta) + } } else { - // No rev ID given, so load active revision - revision, err = db.revisionCache.GetActive(ctx, docid) + // No rev given, so load active revision + revision, getErr = db.revisionCache.GetActive(ctx, docid) } - if err != nil { - return DocumentRevision{}, err + if getErr != nil { + return DocumentRevision{}, getErr } - return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom) + return db.documentRevisionForRequest(ctx, docid, revision, revID, cv, maxHistory, historyFrom) } // documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc. diff --git a/db/database_test.go b/db/database_test.go index ff955263e2..59fdbf0473 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -1718,7 +1718,7 @@ func TestConflicts(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 3}, ID: "doc", - Changes: []ChangeRev{{"rev": "2-b"}, {"rev": "2-a"}}, + Changes: []ChangeByVersionType{{"rev": "2-b"}, {"rev": "2-a"}}, branched: true, collectionID: collectionID, CurrentVersion: &Version{SourceID: source, Value: version}, @@ -1753,7 +1753,7 @@ func TestConflicts(t *testing.T) { assert.Equal(t, &ChangeEntry{ Seq: SequenceID{Seq: 4}, ID: "doc", - Changes: []ChangeRev{{"rev": "2-a"}, {"rev": rev3}}, + Changes: []ChangeByVersionType{{"rev": "2-a"}, {"rev": rev3}}, branched: true, collectionID: collectionID, CurrentVersion: &Version{SourceID: bucketUUID, Value: doc.Cas}, diff --git a/db/revision_cache_interface.go b/db/revision_cache_interface.go index b3fd5b215d..c8d6833d3c 100644 --- a/db/revision_cache_interface.go +++ b/db/revision_cache_interface.go @@ -241,6 +241,10 @@ func (rev *DocumentRevision) Inject1xBodyProperties(ctx context.Context, db *Dat {Key: BodyRev, Val: rev.RevID}, } + if rev.CV != nil { + kvPairs = append(kvPairs, base.KVPair{Key: BodyCV, Val: rev.CV.String()}) + } + if requestedHistory != nil { kvPairs = append(kvPairs, base.KVPair{Key: BodyRevisions, Val: requestedHistory}) } diff --git a/db/sequence_id.go b/db/sequence_id.go index 3afeabea97..c7ef78984b 100644 --- a/db/sequence_id.go +++ b/db/sequence_id.go @@ -120,11 +120,11 @@ func parseIntegerSequenceID(str string) (SequenceID, error) { return SequenceID{}, err } } else { - return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence") + return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence: %q", str) } if err != nil { - return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence") + return SequenceID{}, base.HTTPErrorf(400, "Invalid sequence: %q", str) } return s, nil } diff --git a/docs/api/components/schemas.yaml b/docs/api/components/schemas.yaml index 090b33d8d5..194904f03d 100644 --- a/docs/api/components/schemas.yaml +++ b/docs/api/components/schemas.yaml @@ -545,14 +545,22 @@ Changes-feed: description: The document ID the change happened on. type: string changes: - description: List of document leafs with each leaf containing only a `rev` field. + description: List of document leafs with each leaf containing only a `rev` or `cv` field. type: array items: - type: object - properties: - rev: - description: The new revision that was caused by that change. - type: string + oneOf: + - type: object + properties: + rev: + description: The Revision Tree ID associated with the change. This may be returned even when the `version_type` preference is set to `cv` if this document was an old revision written prior to upgrading to SG 4.0+ + type: string + title: Revision Tree ID + - type: object + properties: + cv: + description: The HLV Current Version associated with the change. This value requires the `version_type` preference set to `cv` and this document revision being written through SG 4.0+ + type: string + title: HLV Current Version uniqueItems: true uniqueItems: true last_seq: diff --git a/docs/api/paths/admin/keyspace-_changes.yaml b/docs/api/paths/admin/keyspace-_changes.yaml index 91ae88ca11..9f328fb4ac 100644 --- a/docs/api/paths/admin/keyspace-_changes.yaml +++ b/docs/api/paths/admin/keyspace-_changes.yaml @@ -102,6 +102,18 @@ get: schema: type: boolean default: false + - name: version_type + in: query + description: The preferred type of document versioning to use for the changes feed. + schema: + type: string + default: rev + enum: + - rev + - cv + x-enumDescriptions: + rev: 'Revision Tree IDs. For example: 1-293a80ce8f4874724732f27d35b3959a13cd96e0' + cv: 'HLV Current Version. For example: 1854e4e557cc0000@zTWkmBiYZgNQo7BHVZrB/Q' responses: '200': $ref: ../../components/responses.yaml#/changes-feed @@ -166,6 +178,16 @@ post: description: 'When true, ensures all valid documents written prior to the request being issued are included in the response. This is only applicable for non-continuous feeds.' type: boolean default: false + version_type: + description: The preferred type of document versioning to use for the changes feed. + type: string + default: rev + enum: + - rev + - cv + x-enumDescriptions: + rev: 'Revision Tree IDs. For example: 1-293a80ce8f4874724732f27d35b3959a13cd96e0' + cv: 'HLV Current Version. For example: 1854e4e557cc0000@zTWkmBiYZgNQo7BHVZrB/Q' responses: '200': $ref: ../../components/responses.yaml#/changes-feed diff --git a/docs/api/paths/public/keyspace-_changes.yaml b/docs/api/paths/public/keyspace-_changes.yaml index b2a631d724..019f52f7ba 100644 --- a/docs/api/paths/public/keyspace-_changes.yaml +++ b/docs/api/paths/public/keyspace-_changes.yaml @@ -81,7 +81,7 @@ get: minimum: 0 - name: feed in: query - description: 'The type of changes feed to use. ' + description: 'The type of changes feed to use.' schema: type: string default: normal @@ -90,6 +90,18 @@ get: - longpoll - continuous - websocket + - name: version_type + in: query + description: 'The preferred type of document versioning to use for the changes feed.' + schema: + type: string + default: rev + enum: + - rev + - cv + x-enumDescriptions: + rev: 'Revision Tree IDs. For example: 1-293a80ce8f4874724732f27d35b3959a13cd96e0' + cv: 'HLV Current Version. For example: 1854e4e557cc0000@zTWkmBiYZgNQo7BHVZrB/Q' responses: '200': $ref: ../../components/responses.yaml#/changes-feed diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 9f60fbbeb6..a00769d0ff 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -2342,7 +2342,7 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0&revocations=true", "user", true) assert.Equal(t, "doc", changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2351,7 +2351,7 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes = rt.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%s&revocations=true", changes.Last_Seq), "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2362,11 +2362,11 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) { changes = rt.WaitForChanges(2, fmt.Sprintf("/{{.keyspace}}/_changes?since=%s&revocations=true", changes.Last_Seq), "user", true) assert.Equal(t, "doc", changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) assert.Equal(t, "3-1bc9dd04c8a257ba28a41eaad90d32de", changes.Results[0].Changes[0]["rev"]) assert.False(t, changes.Results[0].Revoked) assert.Equal(t, "docmarker", changes.Results[1].ID) - RequireChangeRevVersion(t, docMarkerVersion, changes.Results[1].Changes[0]) + RequireChangeRev(t, docMarkerVersion, changes.Results[1].Changes[0], db.ChangesVersionTypeRevTreeID) assert.Equal(t, "1-999bcad4aab47f0a8a24bd9d3598060c", changes.Results[1].Changes[0]["rev"]) assert.False(t, changes.Results[1].Revoked) @@ -2427,7 +2427,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi changes := rt.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0&revocations=true", "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartOneshotPull(btc.id) _ = btcRunner.WaitForVersion(btc.id, docID, version) @@ -2438,7 +2438,7 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi // At this point changes should send revocation, as document isn't in any of the user's channels changes = rt.WaitForChanges(1, "/{{.keyspace}}/_changes?filter=sync_gateway/bychannel&channels=A&since=0&revocations=true", "user", true) assert.Equal(t, docID, changes.Results[0].ID) - RequireChangeRevVersion(t, version, changes.Results[0].Changes[0]) + RequireChangeRev(t, version, changes.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false}) _ = btcRunner.WaitForVersion(btc.id, docID, version) diff --git a/rest/blip_channel_filter_test.go b/rest/blip_channel_filter_test.go index 077fba63a6..435b258196 100644 --- a/rest/blip_channel_filter_test.go +++ b/rest/blip_channel_filter_test.go @@ -46,7 +46,7 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { client := btcRunner.SingleCollection(btc.id) const docID = "doc1" - version1 := rt.PutDoc("doc1", `{"channels":["A"]}`) + version1 := rt.PutDocDirectly("doc1", JsonToMap(t, `{"channels":["A"]}`)) rt.WaitForPendingChanges() response := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?since=0&channels=A&include_docs=true", "", "alice") @@ -56,10 +56,10 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { { "results": [ {"seq":1, "id": "_user/alice", "changes":[]}, - {"seq":3, "id": "doc1", "doc": {"_id": "doc1", "_rev":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} + {"seq":3, "id": "doc1", "doc": {"_id": "doc1", "_rev":"%s", "_cv":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} ], "last_seq": "3" -}`, version1.RevTreeID, version1.RevTreeID) +}`, version1.RevTreeID, version1.CV.String(), version1.RevTreeID) require.JSONEq(t, expectedChanges1, string(response.BodyBytes())) client.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0", Channels: "A"}) @@ -67,9 +67,9 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { btcRunner.WaitForVersion(btc.id, docID, version1) // remove channel A from doc1 - version2 := rt.UpdateDoc(docID, version1, `{"channels":["B"]}`) + version2 := rt.UpdateDocDirectly(docID, version1, JsonToMap(t, `{"channels":["B"]}`)) markerDocID := "marker" - markerDocVersion := rt.PutDoc(markerDocID, `{"channels":["A"]}`) + markerDocVersion := rt.PutDocDirectly(markerDocID, JsonToMap(t, `{"channels":["A"]}`)) rt.WaitForPendingChanges() // alice will see doc1 rev2 with body @@ -79,11 +79,11 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { aliceExpectedChanges2 := fmt.Sprintf(` { "results": [ - {"seq":4, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "channels": ["B"]}, "changes": [{"rev":"%s"}]}, - {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} + {"seq":4, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "_cv":"%s", "channels": ["B"]}, "changes": [{"rev":"%s"}]}, + {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "_cv":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} ], "last_seq": "5" -}`, docID, docID, version2.RevTreeID, version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.RevTreeID) +}`, docID, docID, version2.RevTreeID, version2.CV.String(), version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.CV.String(), markerDocVersion.RevTreeID) require.JSONEq(t, aliceExpectedChanges2, string(response.BodyBytes())) client.StartPullSince(BlipTesterPullOptions{Continuous: false, Since: "0", Channels: "A"}) @@ -105,10 +105,10 @@ func TestChannelFilterRemovalFromChannel(t *testing.T) { { "results": [ {"seq":4, "id": "doc1", "removed":["A"], "doc": {"_id": "doc1", "_rev":"%s", "_removed": true}, "changes": [{"rev":"%s"}]}, - {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} + {"seq":5, "id": "%s", "doc": {"_id": "%s", "_rev":"%s", "_cv": "%s", "channels": ["A"]}, "changes": [{"rev":"%s"}]} ], "last_seq": "5" -}`, version2.RevTreeID, version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.RevTreeID) +}`, version2.RevTreeID, version2.RevTreeID, markerDocID, markerDocID, markerDocVersion.RevTreeID, markerDocVersion.CV.String(), markerDocVersion.RevTreeID) require.JSONEq(t, bobExpectedChanges2, string(response.BodyBytes())) }) } diff --git a/rest/changes_api.go b/rest/changes_api.go index 78ce4a03fb..a7c84f02d2 100644 --- a/rest/changes_api.go +++ b/rest/changes_api.go @@ -185,6 +185,14 @@ func (h *handler) handleChanges() error { options.ActiveOnly = h.getBoolQuery("active_only") options.IncludeDocs = h.getBoolQuery("include_docs") options.Revocations = h.getBoolQuery("revocations") + options.VersionType, err = db.ParseChangesVersionType(h.getQuery("version_type")) + if err != nil { + return base.HTTPErrorf(http.StatusBadRequest, "Invalid version_type: %v", err) + } + + if options.Conflicts && options.VersionType == db.ChangesVersionTypeCV { + return base.HTTPErrorf(http.StatusBadRequest, "Cannot use 'style=all_docs' with 'version_type=cv'") + } useRequestPlus, _ := h.getOptBoolQuery("request_plus", h.db.Options.ChangesRequestPlus) if useRequestPlus && feed != feedTypeContinuous { @@ -602,6 +610,7 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti AcceptEncoding string `json:"accept_encoding"` ActiveOnly bool `json:"active_only"` // Return active revisions only RequestPlus *bool `json:"request_plus"` // Wait for sequence buffering to catch up to database seq value at time request was issued + VersionType string `json:"version_type"` // Version type to use for changes feed } // Initialize since clock and hasher ahead of unmarshalling sequence @@ -643,6 +652,11 @@ func (h *handler) readChangesOptionsFromJSON(jsonData []byte) (feed string, opti true, ) + options.VersionType, err = db.ParseChangesVersionType(input.VersionType) + if err != nil { + return "", options, "", nil, nil, false, base.HTTPErrorf(http.StatusBadRequest, "Invalid version_type: %v", err) + } + compress = (input.AcceptEncoding == "gzip") if h.db != nil && feed != feedTypeContinuous { diff --git a/rest/changes_test.go b/rest/changes_test.go index 2b1c716c17..ca98bb2f71 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -13,6 +13,7 @@ import ( "log" "net/http" "net/http/httptest" + "strings" "sync" "sync/atomic" "testing" @@ -449,3 +450,188 @@ func TestCVPopulationOnDocIDChanges(t *testing.T) { assert.Equal(t, bucketUUID, changes.Results[0].CurrentVersion.SourceID) assert.Equal(t, fetchedDoc.Cas, changes.Results[0].CurrentVersion.Value) } + +// TestChangesVersionType tests the /_changes REST endpoint with different version_type parameters for each possible underlying feed type and HTTP method. +func TestChangesVersionType(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + + doc1 := "doc1" + doc1Body := `{"foo":"bar"}` + rt.PutDoc(doc1, doc1Body) + doc2 := "doc2" + doc2Body := `{"buzz":"quux"}` + rt.PutDoc(doc2, doc2Body) + + rt.WaitForPendingChanges() + + tests := []struct { + name string + changesRequestMethod string + changesRequestQueryParams string + changesRequestBody string + expectedStatus int + expectedVersionType db.ChangesVersionType + expectedDocs int + }{ + { + name: "invalid version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=invalid", + expectedStatus: http.StatusBadRequest, + }, + { + name: "empty version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "", + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeRevTreeID, + expectedDocs: 2, + }, + { + name: "rev version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=rev&include_docs=true", + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeRevTreeID, + expectedDocs: 2, + }, + { + name: "cv version_type", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=cv&include_docs=true", + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeCV, + expectedDocs: 2, + }, + { + name: "rev docid filter", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=rev&filter=_doc_ids&doc_ids=doc1&include_docs=true", + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeRevTreeID, + expectedDocs: 1, + }, + { + name: "cv docid filter", + changesRequestMethod: http.MethodGet, + changesRequestQueryParams: "?version_type=cv&filter=_doc_ids&doc_ids=doc1&include_docs=true", + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeCV, + expectedDocs: 1, + }, + { + name: "rev post", + changesRequestMethod: http.MethodPost, + changesRequestQueryParams: "", + changesRequestBody: `{"version_type":"rev"}`, + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeRevTreeID, + expectedDocs: 2, + }, + { + name: "cv post", + changesRequestMethod: http.MethodPost, + changesRequestQueryParams: "", + changesRequestBody: `{"version_type":"cv"}`, + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeCV, + expectedDocs: 2, + }, + { + name: "cv docid filter post", + changesRequestMethod: http.MethodPost, + changesRequestQueryParams: "", + changesRequestBody: `{"version_type":"cv", "filter":"_doc_ids", "doc_ids":["doc1"]}`, + expectedStatus: http.StatusOK, + expectedVersionType: db.ChangesVersionTypeCV, + expectedDocs: 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require.NotEmptyf(t, test.changesRequestMethod, "Test case %q requires a changesRequestMethod to be set", test.name) + + if test.expectedStatus != http.StatusOK { + resp := rt.SendAdminRequest(test.changesRequestMethod, fmt.Sprintf("/{{.keyspace}}/_changes%s", test.changesRequestQueryParams), test.changesRequestBody) + RequireStatus(t, resp, test.expectedStatus) + return + } + + resp := rt.SendAdminRequest(test.changesRequestMethod, fmt.Sprintf("/{{.keyspace}}/_changes%s", test.changesRequestQueryParams), test.changesRequestBody) + RequireStatus(t, resp, test.expectedStatus) + var changesResults ChangesResults + require.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &changesResults)) + require.Len(t, changesResults.Results, test.expectedDocs) + for _, changeEntry := range changesResults.Results { + for _, change := range changeEntry.Changes { + require.Len(t, change, 1) // ensure only one version type is present + // and that it was the expected one (and we have a value) + versionValue, ok := change[test.expectedVersionType] + require.Truef(t, ok, "Expected version type %s, got %v", test.expectedVersionType, change) + require.NotEmpty(t, versionValue) + } + if strings.Contains(test.changesRequestQueryParams, "include_docs=true") { + var expectedBody string + switch changeEntry.ID { + case doc1: + expectedBody = doc1Body + case doc2: + expectedBody = doc2Body + } + require.Contains(t, string(changeEntry.Doc), expectedBody[1:len(expectedBody)-1]) // strip {}s from doc body - 1.x API stamps additional properties so accommodate + } + } + }) + } +} + +func TestChangesFeedCVWithOldRevOnlyData(t *testing.T) { + rt := NewRestTester(t, nil) + defer rt.Close() + + seq, err := db.AllocateTestSequence(rt.GetDatabase()) + require.NoError(t, err) + oldDoc := "oldDoc" + oldDocBody := `{"body_field":"1234"}` + oldDocSyncData := []byte(fmt.Sprintf(`{"sequence":%d,"rev":{"rev": "1-abc"},"history":{"revs":["1-abc"],"parents":[-1],"channels":[null]},"value_crc32c":"%s"}`, seq, base.Crc32cHashString([]byte(oldDocBody)))) + _, err = rt.GetSingleDataStore().WriteWithXattrs(t.Context(), oldDoc, 0, 0, []byte(oldDocBody), map[string][]byte{base.SyncXattrName: oldDocSyncData}, nil, nil) + require.NoError(t, err) + + newDoc := "newDoc" + newDocBody := `{"foo":"bar"}` + rt.PutDoc(newDoc, newDocBody) + + rt.WaitForPendingChanges() + + resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_changes?version_type=cv&include_docs=true", "") + RequireStatus(t, resp, http.StatusOK) + var changesResults ChangesResults + require.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &changesResults)) + require.Len(t, changesResults.Results, 2) + for i, changeEntry := range changesResults.Results { + for _, change := range changeEntry.Changes { + require.Len(t, change, 1) // ensure only one version type is present + // and that it was the expected one (and we have a value) + var expectedType db.ChangesVersionType + if i == 0 { + // first doc was written with a RevID and no CV available + expectedType = db.ChangesVersionTypeRevTreeID + } else { + expectedType = db.ChangesVersionTypeCV + } + versionValue, ok := change[expectedType] + require.Truef(t, ok, "Expected version type %s, got %v", expectedType, change) + require.NotEmpty(t, versionValue) + } + var expectedBody string + switch changeEntry.ID { + case oldDoc: + expectedBody = oldDocBody + case newDoc: + expectedBody = newDocBody + } + require.Contains(t, string(changeEntry.Doc), expectedBody[1:len(expectedBody)-1]) // strip {}s from doc body - 1.x API stamps additional properties so accommodate + } +} diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index bf166bdb7e..3f2c3bac16 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -837,6 +837,10 @@ func assertChangeEntryMatches(t *testing.T, expectedChangeEntryString string, re var resultBody db.Body assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) assert.NoError(t, resultBody.Unmarshal(result.Doc)) + + // strip out _cv - it's not deterministic in tests like _rev is, but also tests really shouldn't be comparing full bodies! It's so brittle... + delete(resultBody, db.BodyCV) + db.AssertEqualBodies(t, expectedBody, resultBody) } else { assert.Equal(t, expectedChange.Doc, result.Doc) @@ -1893,6 +1897,10 @@ func TestChangesIncludeDocs(t *testing.T) { var resultBody db.Body assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) assert.NoError(t, resultBody.Unmarshal(result.Doc)) + + // strip out _cv - it's not deterministic in tests like _rev is, but also tests really shouldn't be comparing full bodies! It's so brittle... + delete(resultBody, db.BodyCV) + db.AssertEqualBodies(t, expectedBody, resultBody) } else { assert.Equal(t, expectedChange.Doc, result.Doc) diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 5f1b9e6867..3c7372be63 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -4088,7 +4088,7 @@ func TestActiveReplicatorPullConflict(t *testing.T) { changesResults := rt1.WaitForChanges(1, "/{{.keyspace}}/_changes?since=0", "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() @@ -4307,7 +4307,7 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { // Validate results on the local (rt1) changesResults := rt1.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d", localDoc.Sequence), "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) rawDocResponse := rt1.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_raw/"+docID, "") @@ -4351,7 +4351,7 @@ func TestActiveReplicatorPushAndPullConflict(t *testing.T) { } changesResults = rt2.WaitForChanges(1, fmt.Sprintf("/{{.keyspace}}/_changes?since=%d", rt2Since), "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) doc, err = rt2collection.GetDocument(rt2ctx, docID, db.DocUnmarshalAll) @@ -5922,7 +5922,7 @@ func TestActiveReplicatorPullConflictReadWriteIntlProps(t *testing.T) { // Should end up as winner under default conflict resolution. changesResults := rt1.WaitForChanges(1, "/{{.keyspace}}/_changes?&since=0", "", true) assert.Equal(t, docID, changesResults.Results[0].ID) - rest.RequireChangeRevVersion(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0]) + rest.RequireChangeRev(t, test.expectedLocalVersion, changesResults.Results[0].Changes[0], db.ChangesVersionTypeRevTreeID) t.Logf("Changes response is %+v", changesResults) rt1collection, rt1ctx := rt1.GetSingleTestDatabaseCollection() diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 02a3c01625..ba29af3c66 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -883,10 +883,19 @@ func (cr ChangesResults) Summary() string { return strings.Join(revs, ", ") } -// RequireChangeRevVersion asserts that the given ChangeRev has the expected version for a given entry returned by _changes feed -func RequireChangeRevVersion(t *testing.T, expected DocVersion, changeRev db.ChangeRev) { - // CV will only be populated if changes requests it - require.Equal(t, expected.RevTreeID, changeRev["rev"], "Expected rev %s, got %s", expected.RevTreeID, changeRev["rev"]) +// RequireChangeRev asserts that the given db.ChangeByVersionType returned a /_changes feed has the expected DocVersion entry, for a given versionType (rev or cv) +func RequireChangeRev(t *testing.T, expected DocVersion, changeRev db.ChangeByVersionType, versionType db.ChangesVersionType) { + // Only one version type will be populated on a changes feed, based on what the original request demanded and what version types are available on that particular revision. + var expectedStr string + switch versionType { + case db.ChangesVersionTypeRevTreeID: + expectedStr = expected.RevTreeID + case db.ChangesVersionTypeCV: + expectedStr = expected.CV.String() + default: + t.Fatalf("Unexpected version type: %q", versionType) + } + require.Equalf(t, expectedStr, changeRev[versionType], "Expected changeRev[%q]==%s, got %s", versionType, expected.RevTreeID, changeRev[versionType]) } func (rt *RestTester) WaitForChanges(numChangesExpected int, changesURL, username string, useAdminPort bool) ChangesResults {