Skip to content

Commit 3d4821a

Browse files
committed
fix change stream time
1 parent 1d05e22 commit 3d4821a

File tree

7 files changed

+621
-16
lines changed

7 files changed

+621
-16
lines changed

internal/verifier/change_stream_test.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -290,14 +290,6 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
290290
var ops []bson.Raw
291291
suite.Require().NoError(cursor.All(ctx, &ops))
292292

293-
/*
294-
_, err = coll.InsertOne(
295-
ctx,
296-
bson.D{{"_id", "before kill"}},
297-
)
298-
suite.Require().NoError(err)
299-
*/
300-
301293
for _, cursorRaw := range ops {
302294
opId, err := cursorRaw.LookupErr("opid")
303295
suite.Require().NoError(err, "should get opid from op")
@@ -315,16 +307,12 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
315307
)
316308
}
317309

318-
sess, err := suite.srcMongoClient.StartSession()
319-
suite.Require().NoError(err)
320310
_, err = coll.InsertOne(
321-
mongo.NewSessionContext(ctx, sess),
311+
ctx,
322312
bson.D{{"_id", "after kill"}},
323313
)
324314
suite.Require().NoError(err)
325315

326-
suite.T().Logf("cluster time after insert: %+v", sess.ClusterTime())
327-
328316
suite.Require().NoError(verifier.WritesOff(ctx))
329317

330318
suite.Require().NoError(verifierRunner.Await())
@@ -339,8 +327,6 @@ func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
339327

340328
suite.Assert().Zero(incompleteTasks, "no incomplete tasks")
341329
suite.Require().Len(failedTasks, 1, "expect one failed task")
342-
343-
suite.T().Logf("task: %+v", failedTasks[0])
344330
}
345331

346332
func (suite *IntegrationTestSuite) TestEventBeforeWritesOff() {

internal/verifier/clustertime.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/10gen/migration-verifier/internal/retry"
88
"github.com/10gen/migration-verifier/internal/util"
99
"github.com/10gen/migration-verifier/mbson"
10+
"github.com/10gen/migration-verifier/option"
1011
"github.com/pkg/errors"
1112
"go.mongodb.org/mongo-driver/bson"
1213
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -98,7 +99,7 @@ func syncClusterTimeAcrossShards(
9899
ctx,
99100
client,
100101
"syncing cluster time",
101-
maxTime,
102+
option.Some(maxTime),
102103
)
103104

104105
// If any shard’s cluster time >= maxTime, the mongos will return a

option/bson.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package option
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"go.mongodb.org/mongo-driver/bson"
6+
"go.mongodb.org/mongo-driver/bson/bsontype"
7+
"go.mongodb.org/mongo-driver/bson/primitive"
8+
)
9+
10+
// MarshalBSONValue implements bson.ValueMarshaler.
11+
func (o Option[T]) MarshalBSONValue() (bsontype.Type, []byte, error) {
12+
val, exists := o.Get()
13+
if !exists {
14+
return bson.MarshalValue(primitive.Null{})
15+
}
16+
17+
return bson.MarshalValue(val)
18+
}
19+
20+
// UnmarshalBSONValue implements bson.ValueUnmarshaler.
21+
func (o *Option[T]) UnmarshalBSONValue(bType bsontype.Type, raw []byte) error {
22+
23+
switch bType {
24+
case bson.TypeNull:
25+
o.val = nil
26+
27+
default:
28+
valPtr := new(T)
29+
30+
err := bson.UnmarshalValue(bType, raw, &valPtr)
31+
if err != nil {
32+
return errors.Wrapf(err, "failed to unmarshal %T", *o)
33+
}
34+
35+
// This may not even be possible, but we should still check.
36+
if isNil(*valPtr) {
37+
return errors.Wrapf(err, "refuse to unmarshal nil %T value", *o)
38+
}
39+
40+
o.val = valPtr
41+
}
42+
43+
return nil
44+
}
45+
46+
// IsZero implements bsoncodec.Zeroer.
47+
func (o Option[T]) IsZero() bool {
48+
return o.IsNone()
49+
}

option/json.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package option
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
)
7+
8+
var _ json.Marshaler = &Option[int]{}
9+
var _ json.Unmarshaler = &Option[int]{}
10+
11+
// MarshalJSON encodes Option into json.
12+
func (o Option[T]) MarshalJSON() ([]byte, error) {
13+
val, exists := o.Get()
14+
if exists {
15+
return json.Marshal(val)
16+
}
17+
18+
return json.Marshal(nil)
19+
}
20+
21+
// UnmarshalJSON decodes Option from json.
22+
func (o *Option[T]) UnmarshalJSON(b []byte) error {
23+
if bytes.Equal(b, []byte("null")) {
24+
o.val = nil
25+
} else {
26+
val := *new(T)
27+
28+
err := json.Unmarshal(b, &val)
29+
if err != nil {
30+
return err
31+
}
32+
33+
o.val = &val
34+
}
35+
36+
return nil
37+
}

option/option.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Package option implements [option types] in Go.
2+
// It takes inspiration from [samber/mo] but also works with BSON and exposes
3+
// a (hopefully) more refined interface.
4+
//
5+
// Option types facilitate avoidance of nil-dereference bugs, at the cost of a
6+
// bit more overhead.
7+
//
8+
// A couple special notes:
9+
// - nil values inside the Option, like `Some([]int(nil))`, are forbidden.
10+
// - Option’s BSON marshaling/unmarshaling interoperates with the [bson]
11+
// package’s handling of nilable pointers. So any code that uses nilable
12+
// pointers to represent optional values can switch to Option and
13+
// should continue working with existing persisted data.
14+
// - Because encoding/json provides no equivalent to bsoncodec.Zeroer,
15+
// Option always marshals to JSON null if empty.
16+
//
17+
// Prefer Option to nilable pointers in all new code, and consider
18+
// changing existing code to use it.
19+
//
20+
// [option types]: https://en.wikipedia.org/wiki/Option_type
21+
package option
22+
23+
import (
24+
"fmt"
25+
"reflect"
26+
27+
"go.mongodb.org/mongo-driver/bson"
28+
"go.mongodb.org/mongo-driver/bson/bsoncodec"
29+
)
30+
31+
var _ bson.ValueMarshaler = &Option[int]{}
32+
var _ bson.ValueUnmarshaler = &Option[int]{}
33+
var _ bsoncodec.Zeroer = &Option[int]{}
34+
35+
// Option represents a possibly-empty value.
36+
// Its zero value is the empty case.
37+
type Option[T any] struct {
38+
val *T
39+
}
40+
41+
// Some creates an Option with a value.
42+
func Some[T any](value T) Option[T] {
43+
if isNil(value) {
44+
panic(fmt.Sprintf("Option forbids nil value (%T).", value))
45+
}
46+
47+
return Option[T]{&value}
48+
}
49+
50+
// None creates an Option with no value.
51+
//
52+
// Note that `None[T]()` is interchangeable with `Option[T]{}`.
53+
func None[T any]() Option[T] {
54+
return Option[T]{}
55+
}
56+
57+
// FromPointer will convert a nilable pointer into its
58+
// equivalent Option.
59+
func FromPointer[T any](valPtr *T) Option[T] {
60+
if valPtr == nil {
61+
return None[T]()
62+
}
63+
64+
if isNil(*valPtr) {
65+
panic(fmt.Sprintf("Given pointer (%T) refers to nil, which is forbidden.", valPtr))
66+
}
67+
68+
myCopy := *valPtr
69+
70+
return Option[T]{&myCopy}
71+
}
72+
73+
// IfNotZero returns an Option that’s populated if & only if
74+
// the given value is a non-zero value. (NB: The zero value
75+
// for slices & maps is nil, not empty!)
76+
//
77+
// This is useful, e.g., to interface with code that uses
78+
// nil to indicate a missing slice or map.
79+
func IfNotZero[T any](value T) Option[T] {
80+
81+
// copied from samber/mo.EmptyableToOption:
82+
if reflect.ValueOf(&value).Elem().IsZero() {
83+
return Option[T]{}
84+
}
85+
86+
return Option[T]{&value}
87+
}
88+
89+
// Get “unboxes” the Option’s internal value.
90+
// The boolean indicates whether the value exists.
91+
func (o Option[T]) Get() (T, bool) {
92+
if o.val == nil {
93+
return *new(T), false
94+
}
95+
96+
return *o.val, true
97+
}
98+
99+
// MustGet is like Get but panics if the Option is empty.
100+
func (o Option[T]) MustGet() T {
101+
val, exists := o.Get()
102+
if !exists {
103+
panic(fmt.Sprintf("MustGet() called on empty %T", o))
104+
}
105+
106+
return val
107+
}
108+
109+
// OrZero returns either the Option’s internal value or
110+
// the type’s zero value.
111+
func (o Option[T]) OrZero() T {
112+
val, exists := o.Get()
113+
if exists {
114+
return val
115+
}
116+
117+
return *new(T)
118+
}
119+
120+
// OrElse returns either the Option’s internal value or
121+
// the given `fallback`.
122+
func (o Option[T]) OrElse(fallback T) T {
123+
val, exists := o.Get()
124+
if exists {
125+
return val
126+
}
127+
128+
return fallback
129+
}
130+
131+
// ToPointer converts the Option to a nilable pointer.
132+
// The internal value (if it exists) is (shallow-)copied.
133+
func (o Option[T]) ToPointer() *T {
134+
val, exists := o.Get()
135+
if exists {
136+
theCopy := val
137+
return &theCopy
138+
}
139+
140+
return nil
141+
}
142+
143+
// IsNone returns a boolean indicating whether or not the option is a None
144+
// value.
145+
func (o Option[T]) IsNone() bool {
146+
return o.val == nil
147+
}
148+
149+
// IsSome returns a boolean indicating whether or not the option is a Some
150+
// value.
151+
func (o Option[T]) IsSome() bool {
152+
return o.val != nil
153+
}

0 commit comments

Comments
 (0)