Skip to content

Commit 5d8cdc4

Browse files
authored
PBM-1309: PBM can't be started if RS doesn't have a majority (#938)
* Add read/write concerns options for commands * Set read/write concern for command type
1 parent f29d521 commit 5d8cdc4

File tree

2 files changed

+68
-14
lines changed

2 files changed

+68
-14
lines changed

pbm/connect/connect.go

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"strings"
77
"time"
88

9+
"go.mongodb.org/mongo-driver/bson"
910
"go.mongodb.org/mongo-driver/mongo"
1011
"go.mongodb.org/mongo-driver/mongo/options"
1112
"go.mongodb.org/mongo-driver/mongo/readconcern"
@@ -83,7 +84,10 @@ func ServerSelectionTimeout(d time.Duration) MongoOption {
8384
}
8485
}
8586

86-
func MongoConnect(ctx context.Context, uri string, mongoOptions ...MongoOption) (*mongo.Client, error) {
87+
func MongoConnectWithOpts(ctx context.Context,
88+
uri string,
89+
mongoOptions ...MongoOption,
90+
) (*mongo.Client, *options.ClientOptions, error) {
8791
if !strings.HasPrefix(uri, "mongodb://") {
8892
uri = "mongodb://" + uri
8993
}
@@ -103,37 +107,50 @@ func MongoConnect(ctx context.Context, uri string, mongoOptions ...MongoOption)
103107
for _, opt := range mongoOptions {
104108
if opt != nil {
105109
if err := opt(mopts); err != nil {
106-
return nil, errors.Wrap(err, "invalid mongo option")
110+
return nil, nil, errors.Wrap(err, "invalid mongo option")
107111
}
108112
}
109113
}
110114

111115
conn, err := mongo.Connect(ctx, mopts)
112116
if err != nil {
113-
return nil, errors.Wrap(err, "connect")
117+
return nil, nil, errors.Wrap(err, "connect")
114118
}
115119

116120
err = conn.Ping(ctx, nil)
117121
if err != nil {
118-
return nil, errors.Wrap(err, "ping")
122+
return nil, nil, errors.Wrap(err, "ping")
119123
}
120124

121-
return conn, nil
125+
return conn, mopts, nil
126+
}
127+
128+
func MongoConnect(
129+
ctx context.Context,
130+
uri string,
131+
mongoOptions ...MongoOption,
132+
) (*mongo.Client, error) {
133+
client, _, err := MongoConnectWithOpts(ctx, uri, mongoOptions...)
134+
return client, err
122135
}
123136

124137
type clientImpl struct {
125-
client *mongo.Client
138+
client *mongo.Client
139+
options *options.ClientOptions
126140
}
127141

128142
func UnsafeClient(m *mongo.Client) Client {
129-
return &clientImpl{m}
143+
return &clientImpl{
144+
client: m,
145+
options: options.Client(),
146+
}
130147
}
131148

132149
// Connect resolves MongoDB connection to Primary member and wraps it within Client object.
133150
// In case of replica set it returns connection to Primary member,
134151
// while in case of sharded cluster it returns connection to Config RS Primary member.
135152
func Connect(ctx context.Context, uri, appName string) (Client, error) {
136-
client, err := MongoConnect(ctx, uri, AppName(appName))
153+
client, opts, err := MongoConnectWithOpts(ctx, uri, AppName(appName))
137154
if err != nil {
138155
return nil, errors.Wrap(err, "create mongo connection")
139156
}
@@ -143,7 +160,10 @@ func Connect(ctx context.Context, uri, appName string) (Client, error) {
143160
return nil, errors.Wrap(err, "get NodeInfo")
144161
}
145162
if inf.isMongos() {
146-
return &clientImpl{client: client}, nil
163+
return &clientImpl{
164+
client: client,
165+
options: opts,
166+
}, nil
147167
}
148168

149169
inf.Opts, err = getMongodOpts(ctx, client, nil)
@@ -152,7 +172,10 @@ func Connect(ctx context.Context, uri, appName string) (Client, error) {
152172
}
153173

154174
if inf.isClusterLeader() {
155-
return &clientImpl{client: client}, nil
175+
return &clientImpl{
176+
client: client,
177+
options: opts,
178+
}, nil
156179
}
157180

158181
csvr, err := getConfigsvrURI(ctx, client)
@@ -183,7 +206,10 @@ func Connect(ctx context.Context, uri, appName string) (Client, error) {
183206
return nil, errors.Wrapf(err, "create mongo connection to configsvr with connection string '%s'", curi)
184207
}
185208

186-
return &clientImpl{client: client}, nil
209+
return &clientImpl{
210+
client: client,
211+
options: opts,
212+
}, nil
187213
}
188214

189215
func (l *clientImpl) HasValidConnection(ctx context.Context) error {
@@ -212,11 +238,16 @@ func (l *clientImpl) MongoClient() *mongo.Client {
212238
return l.client
213239
}
214240

241+
func (l *clientImpl) MongoOptions() *options.ClientOptions {
242+
return l.options
243+
}
244+
215245
func (l *clientImpl) ConfigDatabase() *mongo.Database {
216246
return l.client.Database("config")
217247
}
218248

219-
func (l *clientImpl) AdminCommand(ctx context.Context, cmd any, opts ...*options.RunCmdOptions) *mongo.SingleResult {
249+
func (l *clientImpl) AdminCommand(ctx context.Context, cmd bson.D, opts ...*options.RunCmdOptions) *mongo.SingleResult {
250+
cmd = l.applyOptonsFromConnString(cmd)
220251
return l.client.Database(defs.DB).RunCommand(ctx, cmd, opts...)
221252
}
222253

@@ -260,15 +291,38 @@ func (l *clientImpl) AgentsStatusCollection() *mongo.Collection {
260291
return l.client.Database(defs.DB).Collection(defs.AgentsStatusCollection)
261292
}
262293

294+
func (l *clientImpl) applyOptonsFromConnString(cmd bson.D) bson.D {
295+
if len(cmd) == 0 {
296+
return cmd
297+
}
298+
299+
cmdName := cmd[0].Key
300+
switch cmdName {
301+
case "create":
302+
if l.options.WriteConcern != nil {
303+
cmd = append(cmd, bson.E{"writeConcern", l.options.WriteConcern})
304+
}
305+
default:
306+
// do nothing for all other commands:
307+
// flushRouterConfig
308+
// _configsvrBalancerStart
309+
// _configsvrBalancerStop
310+
// _configsvrBalancerStatus
311+
}
312+
313+
return cmd
314+
}
315+
263316
var ErrInvalidConnection = errors.New("invalid mongo connection")
264317

265318
type Client interface {
266319
Disconnect(ctx context.Context) error
267320

268321
MongoClient() *mongo.Client
322+
MongoOptions() *options.ClientOptions
269323

270324
ConfigDatabase() *mongo.Database
271-
AdminCommand(ctx context.Context, cmd any, opts ...*options.RunCmdOptions) *mongo.SingleResult
325+
AdminCommand(ctx context.Context, cmd bson.D, opts ...*options.RunCmdOptions) *mongo.SingleResult
272326

273327
LogCollection() *mongo.Collection
274328
ConfigCollection() *mongo.Collection

pbm/restore/logical.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,7 @@ func (r *Restore) updateRouterConfig(ctx context.Context) error {
909909
}
910910
}
911911

912-
res := r.leadConn.AdminCommand(ctx, primitive.M{"flushRouterConfig": 1})
912+
res := r.leadConn.AdminCommand(ctx, bson.D{{"flushRouterConfig", 1}})
913913
return errors.Wrap(res.Err(), "flushRouterConfig")
914914
}
915915

0 commit comments

Comments
 (0)