Skip to content

Commit 53ee3fc

Browse files
committed
Support sending changes for old rev-only docs on a cv changes feed
1 parent 80d40c1 commit 53ee3fc

File tree

5 files changed

+89
-17
lines changed

5 files changed

+89
-17
lines changed

db/changes.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type ChangesOptions struct {
4242
VersionType ChangesVersionType // The type of version to use for the changes feed. This is used to determine whether to use send revtree IDs or CV in the changes feed entries.
4343
}
4444

45+
// ChangesVersionType determines the preferred version type to use in changes feed entries.
46+
// If the requested version type is not available, the feed will attempt to fall back to ChangesVersionTypeRevTreeID which should always be available.
4547
type ChangesVersionType string
4648

4749
const (
@@ -60,6 +62,15 @@ func ParseChangesVersionType(s string) (ChangesVersionType, error) {
6062
}
6163
}
6264

65+
// ChangeVersionString attempts to return the version string for the preferred ChangesVersionType, but will fall back to rev if cv is not available when requested.
66+
func (ce *ChangeEntry) ChangeVersionString(versionType ChangesVersionType) string {
67+
if s, ok := ce.Changes[0][versionType]; ok {
68+
return s
69+
}
70+
// requested version type not found, return `rev` as a fallback.
71+
return ce.Changes[0][ChangesVersionTypeRevTreeID]
72+
}
73+
6374
// A changes entry; Database.GetChanges returns an array of these.
6475
// Marshals into the standard CouchDB _changes format.
6576
type ChangeEntry struct {
@@ -128,7 +139,10 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e
128139
base.WarnfCtx(ctx, "Changes feed: error getting doc %q: %v", base.UD(entry.ID), err)
129140
return
130141
}
131-
db.AddDocInstanceToChangeEntry(ctx, entry, doc, options)
142+
if err := db.AddDocInstanceToChangeEntry(ctx, entry, doc, options); err != nil {
143+
base.WarnfCtx(ctx, "Changes feed: error adding doc %q to change entry: %v", base.UD(entry.ID), err)
144+
return
145+
}
132146

133147
} else if includeConflicts {
134148
// Load doc metadata only
@@ -139,11 +153,14 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e
139153
base.WarnfCtx(ctx, "Changes feed: error getting doc sync data %q: %v", base.UD(entry.ID), err)
140154
return
141155
}
142-
db.AddDocInstanceToChangeEntry(ctx, entry, doc, options)
156+
if err := db.AddDocInstanceToChangeEntry(ctx, entry, doc, options); err != nil {
157+
base.WarnfCtx(ctx, "Changes feed: error adding doc %q to change entry: %v", base.UD(entry.ID), err)
158+
return
159+
}
143160

144161
} else if options.IncludeDocs {
145162
// Retrieve document via rev cache
146-
revID := entry.Changes[0]["rev"]
163+
revID := entry.ChangeVersionString(options.VersionType)
147164
err := db.AddDocToChangeEntryUsingRevCache(ctx, entry, revID)
148165
if err != nil {
149166
base.WarnfCtx(ctx, "Changes feed: error getting revision body for %q (%s): %v", base.UD(entry.ID), revID, err)
@@ -162,12 +179,16 @@ func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx conte
162179
}
163180

164181
// Adds a document body and/or its conflicts to a ChangeEntry
165-
func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Context, entry *ChangeEntry, doc *Document, options ChangesOptions) {
182+
func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Context, entry *ChangeEntry, doc *Document, options ChangesOptions) error {
166183

167184
includeConflicts := options.Conflicts && entry.branched
168185

169-
revID := entry.Changes[0]["rev"]
186+
revID := entry.ChangeVersionString(options.VersionType)
170187
if includeConflicts {
188+
// should've been validated in the handler layer but be defensive
189+
if options.VersionType == ChangesVersionTypeCV {
190+
return fmt.Errorf("changes feed does not support showing in-conflict revisions when using version_type=cv")
191+
}
171192
doc.History.forEachLeaf(func(leaf *RevInfo) {
172193
if leaf.ID != revID {
173194
if !leaf.Deleted {
@@ -187,6 +208,8 @@ func (db *DatabaseCollectionWithUser) AddDocInstanceToChangeEntry(ctx context.Co
187208
base.WarnfCtx(ctx, "Changes feed: error getting doc %q/%q: %v", base.UD(doc.ID), revID, err)
188209
}
189210
}
211+
212+
return nil
190213
}
191214

192215
// Parameters
@@ -553,7 +576,7 @@ func makeRevocationChangeEntry(ctx context.Context, logEntry *LogEntry, seqID Se
553576
}
554577

555578
// AuditReadEvent issues a read event for this change entry. If there is no document body, there will be no event used.
556-
func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) {
579+
func (ce *ChangeEntry) AuditReadEvent(ctx context.Context, versionType ChangesVersionType) {
557580
if ce.Err != nil {
558581
return
559582
}
@@ -562,7 +585,7 @@ func (ce *ChangeEntry) AuditReadEvent(ctx context.Context) {
562585
}
563586
base.Audit(ctx, base.AuditIDDocumentRead, base.AuditFields{
564587
base.AuditFieldDocID: ce.ID,
565-
base.AuditFieldDocVersion: ce.Changes[0]["rev"],
588+
base.AuditFieldDocVersion: ce.ChangeVersionString(versionType),
566589
})
567590
}
568591

@@ -1387,7 +1410,10 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio
13871410

13881411
row.Removed = base.SetFromArray(removedChannels)
13891412
if options.IncludeDocs || options.Conflicts {
1390-
db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options)
1413+
if err := db.AddDocInstanceToChangeEntry(ctx, row, populatedDoc, options); err != nil {
1414+
base.WarnfCtx(ctx, "Unable to add doc instance to change entry for %s: %v", base.UD(docid), err)
1415+
return nil
1416+
}
13911417
}
13921418

13931419
return row

docs/api/components/schemas.yaml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,15 +549,18 @@ Changes-feed:
549549
type: array
550550
items:
551551
type: object
552+
minProperties: 1
553+
maxProperties: 1
552554
properties:
553555
rev:
554-
description: The new Revision ID that was caused by that change. This is omitted when the `version_type` parameter is `cv`.
556+
description: |
557+
The new Revision ID associated with the change.
558+
This is usually omitted in favour of the `cv` property when the `version_type` preference is set to `cv`,
559+
however data written prior to Sync Gateway 4.0 can still emit `rev` only change entries if we do not have a `cv` available for that document revision.
555560
type: string
556-
optional: true
557561
cv:
558-
description: The new Current Version that was caused by that change. This is present only when the `version_type` parameter is `cv`.
562+
description: The new Current Version associated with the change. This value requires the `version_type` preference set to `cv`.
559563
type: string
560-
optional: true
561564
uniqueItems: true
562565
uniqueItems: true
563566
last_seq:

docs/api/paths/admin/keyspace-_changes.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ get:
104104
default: false
105105
- name: version_type
106106
in: query
107-
description: The type of document versioning to use for the changes feed.
107+
description: The preferred type of document versioning to use for the changes feed.
108108
schema:
109109
type: string
110110
default: rev
@@ -176,7 +176,7 @@ post:
176176
type: boolean
177177
default: false
178178
version_type:
179-
description: The type of document versioning to use for the changes feed.
179+
description: The preferred type of document versioning to use for the changes feed.
180180
type: string
181181
default: rev
182182
enum:

rest/changes_api.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ func (h *handler) handleChanges() error {
190190
return base.HTTPErrorf(http.StatusBadRequest, "Invalid version_type: %v", err)
191191
}
192192

193+
if options.Conflicts && options.VersionType == db.ChangesVersionTypeCV {
194+
return base.HTTPErrorf(http.StatusBadRequest, "Cannot use 'style=all_docs' with 'version_type=cv'")
195+
}
196+
193197
useRequestPlus, _ := h.getOptBoolQuery("request_plus", h.db.Options.ChangesRequestPlus)
194198
if useRequestPlus && feed != feedTypeContinuous {
195199
var seqErr error
@@ -424,7 +428,7 @@ func (h *handler) sendSimpleChanges(channels base.Set, options db.ChangesOptions
424428
}
425429
_ = encoder.Encode(entry)
426430
lastSeq = entry.Seq
427-
entry.AuditReadEvent(h.ctx())
431+
entry.AuditReadEvent(h.ctx(), options.VersionType)
428432
}
429433

430434
case <-heartbeat:
@@ -497,7 +501,7 @@ func (h *handler) sendContinuousChangesByHTTP(inChannels base.Set, options db.Ch
497501
break
498502
}
499503

500-
change.AuditReadEvent(h.ctx())
504+
change.AuditReadEvent(h.ctx(), options.VersionType)
501505
}
502506
} else {
503507
_, err = h.response.Write([]byte("\n"))
@@ -574,7 +578,7 @@ func (h *handler) sendContinuousChangesByWebSocket(inChannels base.Set, options
574578
return err
575579
}
576580
for _, change := range changes {
577-
change.AuditReadEvent(h.ctx())
581+
change.AuditReadEvent(h.ctx(), options.VersionType)
578582
}
579583
return err
580584
})

rest/changes_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,3 +571,42 @@ func TestChangesVersionType(t *testing.T) {
571571
})
572572
}
573573
}
574+
575+
func TestChangesFeedCVWithOldRevOnlyData(t *testing.T) {
576+
rt := NewRestTester(t, nil)
577+
defer rt.Close()
578+
579+
seq, err := db.AllocateTestSequence(rt.GetDatabase())
580+
require.NoError(t, err)
581+
oldDoc := "oldDoc"
582+
oldDocBody := []byte(`{"body_field":"1234"}`)
583+
oldDocSyncData := []byte(fmt.Sprintf(`{"sequence":%d,"rev":{"rev": "1-abc"},"value_crc32c":"%s"}`, seq, base.Crc32cHashString(oldDocBody)))
584+
_, err = rt.GetSingleDataStore().WriteWithXattrs(t.Context(), oldDoc, 0, 0, oldDocBody, map[string][]byte{base.SyncXattrName: oldDocSyncData}, nil, nil)
585+
require.NoError(t, err)
586+
587+
rt.PutDoc("newDoc", `{"foo":"bar"}`)
588+
589+
rt.WaitForPendingChanges()
590+
591+
resp := rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_changes?version_type=cv", "")
592+
RequireStatus(t, resp, http.StatusOK)
593+
var changesResults ChangesResults
594+
require.NoError(t, base.JSONUnmarshal(resp.Body.Bytes(), &changesResults))
595+
require.Len(t, changesResults.Results, 2)
596+
for i, changeEntry := range changesResults.Results {
597+
for _, change := range changeEntry.Changes {
598+
require.Len(t, change, 1) // ensure only one version type is present
599+
// and that it was the expected one (and we have a value)
600+
var expectedType db.ChangesVersionType
601+
if i == 0 {
602+
// first doc was written with a RevID and no CV available
603+
expectedType = db.ChangesVersionTypeRevTreeID
604+
} else {
605+
expectedType = db.ChangesVersionTypeCV
606+
}
607+
versionValue, ok := change[expectedType]
608+
require.Truef(t, ok, "Expected version type %s, got %v", expectedType, change)
609+
require.NotEmpty(t, versionValue)
610+
}
611+
}
612+
}

0 commit comments

Comments
 (0)