Skip to content

Commit 434b11b

Browse files
authored
REP-6804 Refactor change stream reader to generic “change reader” (#171)
This refactors the change stream reader to be an implementation of a generic “change reader”. Some renames are made here (e.g., “handler” to “persistor”), which hopefully will lend greater clarity long-term. This anticipates creation of an “oplog reader” that will replace the change stream reader in some use cases.
1 parent 1c17571 commit 434b11b

13 files changed

+604
-466
lines changed

internal/util/clusterinfo.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,18 @@ type ClusterInfo struct {
1919
Topology ClusterTopology
2020
}
2121

22+
// ClusterHasBSONSize indicates whether a cluster with the given
23+
// major & minor version numbers supports the $bsonSize aggregation operator.
24+
func ClusterHasBSONSize(va [2]int) bool {
25+
major := va[0]
26+
27+
if major == 4 {
28+
return va[1] >= 4
29+
}
30+
31+
return major > 4
32+
}
33+
2234
const (
2335
TopologySharded ClusterTopology = "sharded"
2436
TopologyReplset ClusterTopology = "replset"

internal/verifier/change_reader.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package verifier
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/10gen/migration-verifier/history"
8+
"github.com/10gen/migration-verifier/internal/logger"
9+
"github.com/10gen/migration-verifier/internal/util"
10+
"github.com/10gen/migration-verifier/msync"
11+
"github.com/10gen/migration-verifier/option"
12+
"github.com/pkg/errors"
13+
"github.com/rs/zerolog"
14+
"github.com/samber/lo"
15+
"go.mongodb.org/mongo-driver/v2/bson"
16+
"go.mongodb.org/mongo-driver/v2/mongo"
17+
"go.mongodb.org/mongo-driver/v2/mongo/options"
18+
)
19+
20+
type ddlEventHandling string
21+
22+
const (
23+
fauxDocSizeForDeleteEvents = 1024
24+
25+
// The number of batches we’ll hold in memory at once.
26+
batchChanBufferSize = 100
27+
28+
onDDLEventAllow ddlEventHandling = "allow"
29+
30+
changeReaderCollectionName = "changeReader"
31+
)
32+
33+
type changeReader interface {
34+
getWhichCluster() whichCluster
35+
getReadChannel() <-chan changeEventBatch
36+
getError() *util.Eventual[error]
37+
getStartTimestamp() option.Option[bson.Timestamp]
38+
getEventsPerSecond() option.Option[float64]
39+
getLag() option.Option[time.Duration]
40+
getBufferSaturation() float64
41+
setWritesOff(bson.Timestamp)
42+
setPersistorError(error)
43+
start(context.Context) error
44+
done() <-chan struct{}
45+
persistResumeToken(context.Context, bson.Raw) error
46+
isRunning() bool
47+
String() string
48+
}
49+
50+
type ChangeReaderCommon struct {
51+
readerType whichCluster
52+
53+
lastChangeEventTime *bson.Timestamp
54+
logger *logger.Logger
55+
namespaces []string
56+
57+
metaDB *mongo.Database
58+
watcherClient *mongo.Client
59+
clusterInfo util.ClusterInfo
60+
61+
resumeTokenTSExtractor func(bson.Raw) (bson.Timestamp, error)
62+
63+
running bool
64+
changeEventBatchChan chan changeEventBatch
65+
writesOffTs *util.Eventual[bson.Timestamp]
66+
readerError *util.Eventual[error]
67+
persistorError *util.Eventual[error]
68+
doneChan chan struct{}
69+
70+
startAtTs *bson.Timestamp
71+
72+
lag *msync.TypedAtomic[option.Option[time.Duration]]
73+
batchSizeHistory *history.History[int]
74+
75+
onDDLEvent ddlEventHandling
76+
}
77+
78+
func (rc *ChangeReaderCommon) getWhichCluster() whichCluster {
79+
return rc.readerType
80+
}
81+
82+
func (rc *ChangeReaderCommon) setPersistorError(err error) {
83+
rc.persistorError.Set(err)
84+
}
85+
86+
func (rc *ChangeReaderCommon) getError() *util.Eventual[error] {
87+
return rc.readerError
88+
}
89+
90+
func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] {
91+
return option.FromPointer(rc.startAtTs)
92+
}
93+
94+
func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) {
95+
rc.writesOffTs.Set(ts)
96+
}
97+
98+
func (rc *ChangeReaderCommon) isRunning() bool {
99+
return rc.running
100+
}
101+
102+
func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch {
103+
return rc.changeEventBatchChan
104+
}
105+
106+
func (rc *ChangeReaderCommon) done() <-chan struct{} {
107+
return rc.doneChan
108+
}
109+
110+
// getBufferSaturation returns the reader’s internal buffer’s saturation level
111+
// as a fraction. If saturation rises, that means we’re reading events faster
112+
// than we can persist them.
113+
func (rc *ChangeReaderCommon) getBufferSaturation() float64 {
114+
return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan))
115+
}
116+
117+
// getLag returns the observed change stream lag (i.e., the delta between
118+
// cluster time and the most-recently-seen change event).
119+
func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] {
120+
return rc.lag.Load()
121+
}
122+
123+
// getEventsPerSecond returns the number of change events per second we’ve been
124+
// seeing “recently”. (See implementation for the actual period over which we
125+
// compile this metric.)
126+
func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] {
127+
logs := rc.batchSizeHistory.Get()
128+
lastLog, hasLogs := lo.Last(logs)
129+
130+
if hasLogs && lastLog.At != logs[0].At {
131+
span := lastLog.At.Sub(logs[0].At)
132+
133+
// Each log contains a time and a # of events that happened since
134+
// the prior log. Thus, each log’s Datum is a count of events that
135+
// happened before the timestamp. Since we want the # of events that
136+
// happened between the first & last times, we only want events *after*
137+
// the first time. Thus, we skip the first log entry here.
138+
totalEvents := 0
139+
for _, log := range logs[1:] {
140+
totalEvents += log.Datum
141+
}
142+
143+
return option.Some(util.DivideToF64(totalEvents, span.Seconds()))
144+
}
145+
146+
return option.None[float64]()
147+
}
148+
149+
func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error {
150+
coll := rc.metaDB.Collection(changeReaderCollectionName)
151+
_, err := coll.ReplaceOne(
152+
ctx,
153+
bson.D{{"_id", rc.resumeTokenDocID()}},
154+
token,
155+
options.Replace().SetUpsert(true),
156+
)
157+
158+
if err == nil {
159+
ts, err := rc.resumeTokenTSExtractor(token)
160+
161+
logEvent := rc.logger.Debug()
162+
163+
if err == nil {
164+
logEvent = addTimestampToLogEvent(ts, logEvent)
165+
} else {
166+
rc.logger.Warn().Err(err).
167+
Msg("failed to extract resume token timestamp")
168+
}
169+
170+
logEvent.Msgf("Persisted %s's resume token.", rc.readerType)
171+
172+
return nil
173+
}
174+
175+
return errors.Wrapf(err, "failed to persist %s resume token (%v)", rc.readerType, token)
176+
}
177+
178+
func (rc *ChangeReaderCommon) resumeTokenDocID() string {
179+
switch rc.readerType {
180+
case src:
181+
return "srcResumeToken"
182+
case dst:
183+
return "dstResumeToken"
184+
default:
185+
panic("unknown readerType: " + rc.readerType)
186+
}
187+
}
188+
189+
func (rc *ChangeReaderCommon) getMetadataCollection() *mongo.Collection {
190+
return rc.metaDB.Collection(changeReaderCollectionName)
191+
}
192+
193+
func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Option[bson.Raw], error) {
194+
coll := rc.getMetadataCollection()
195+
196+
token, err := coll.FindOne(
197+
ctx,
198+
bson.D{{"_id", rc.resumeTokenDocID()}},
199+
).Raw()
200+
201+
if errors.Is(err, mongo.ErrNoDocuments) {
202+
return option.None[bson.Raw](), nil
203+
}
204+
205+
return option.Some(token), err
206+
}
207+
208+
func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) {
209+
tokenTs, err := rc.resumeTokenTSExtractor(token)
210+
if err == nil {
211+
lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T)
212+
rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs)))
213+
} else {
214+
rc.logger.Warn().
215+
Err(err).
216+
Msgf("Failed to extract timestamp from %s's resume token to compute lag.", rc.readerType)
217+
}
218+
}
219+
220+
func (rc *ChangeReaderCommon) logIgnoredDDL(rawEvent bson.Raw) {
221+
rc.logger.Info().
222+
Str("reader", string(rc.readerType)).
223+
Stringer("event", rawEvent).
224+
Msg("Ignoring event with unrecognized type on destination. (It’s assumedly internal to the migration.)")
225+
}
226+
227+
func (rc *ChangeReaderCommon) wrapPersistorErrorForReader() error {
228+
return errors.Wrap(
229+
rc.persistorError.Get(),
230+
"event persistor failed, so no more events can be processed",
231+
)
232+
}
233+
234+
func addTimestampToLogEvent(ts bson.Timestamp, event *zerolog.Event) *zerolog.Event {
235+
return event.
236+
Any("timestamp", ts).
237+
Time("time", time.Unix(int64(ts.T), int64(0)))
238+
}

0 commit comments

Comments
 (0)