Skip to content

Commit 2ed30e1

Browse files
committed
tweak timestamp extraction
1 parent 0044884 commit 2ed30e1

File tree

1 file changed

+17
-14
lines changed

1 file changed

+17
-14
lines changed

internal/verifier/change_stream.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,12 @@ func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.Cha
8484
return nil
8585
}
8686

87-
return verifier.persistChangeStreamResumeToken(ctx, cs)
87+
err := verifier.persistChangeStreamResumeToken(ctx, cs)
88+
if err != nil {
89+
lastPersistedTime = time.Now()
90+
}
91+
92+
return err
8893
}
8994

9095
for {
@@ -198,13 +203,8 @@ func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
198203
return errors.Wrap(err, "failed to read cluster time from session")
199204
}
200205

201-
csTimestampNewerThanCluster := csTimestamp.T > clusterTime.T
202-
if !csTimestampNewerThanCluster && csTimestamp.T == clusterTime.T {
203-
csTimestampNewerThanCluster = csTimestamp.I > clusterTime.I
204-
}
205-
206206
verifier.srcStartAtTs = &csTimestamp
207-
if csTimestampNewerThanCluster {
207+
if csTimestamp.After(clusterTime) {
208208
verifier.srcStartAtTs = &clusterTime
209209
}
210210

@@ -259,21 +259,24 @@ func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs
259259
}
260260

261261
func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, error) {
262+
tokenStruct := struct {
263+
Data string `bson:"_data"`
264+
}{}
265+
262266
// Change stream token is always a V1 keystring in the _data field
263-
resumeTokenDataValue := resumeToken.Lookup("_data")
264-
resumeTokenData, ok := resumeTokenDataValue.StringValueOK()
265-
if !ok {
266-
return primitive.Timestamp{}, fmt.Errorf("Resume token _data is missing or the wrong type: %v",
267-
resumeTokenDataValue.Type)
267+
err := bson.Unmarshal(resumeToken, &tokenStruct)
268+
if err != nil {
269+
return primitive.Timestamp{}, errors.Wrapf(err, "failed to extract %#q from resume token (%v)", "_data", resumeToken)
268270
}
269-
resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, resumeTokenData)
271+
272+
resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenStruct.Data)
270273
if err != nil {
271274
return primitive.Timestamp{}, err
272275
}
273276
// First element is the cluster time we want
274277
resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp)
275278
if !ok {
276-
return primitive.Timestamp{}, errors.New("Resume token lacks a cluster time")
279+
return primitive.Timestamp{}, errors.Errorf("resume token data's (%+v) first element is of type %T, not a timestamp", resumeTokenBson, resumeTokenBson[0].Value)
277280
}
278281

279282
return resumeTokenTime, nil

0 commit comments

Comments
 (0)