Skip to content

Commit a9dfbf3

Browse files
authored
GODRIVER-2220 Ignore readpref for output aggs if any servers are pre-5.0 (#804)
1 parent 5cab184 commit a9dfbf3

File tree

2 files changed

+20
-25
lines changed

2 files changed

+20
-25
lines changed

mongo/description/server_selector.go

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -185,47 +185,48 @@ func selectForReplicaSet(rp *readpref.ReadPref, isOutputAggregate bool, t Topolo
185185
return nil, err
186186
}
187187

188+
// If underlying operation is an aggregate with an output stage, only apply read preference
189+
// if all candidates are 5.0+. Otherwise, operate under primary read preference.
190+
if isOutputAggregate {
191+
for _, s := range candidates {
192+
if s.WireVersion.Max < 13 {
193+
return selectByKind(candidates, RSPrimary), nil
194+
}
195+
}
196+
}
197+
188198
switch rp.Mode() {
189199
case readpref.PrimaryMode:
190200
return selectByKind(candidates, RSPrimary), nil
191201
case readpref.PrimaryPreferredMode:
192202
selected := selectByKind(candidates, RSPrimary)
193203

194204
if len(selected) == 0 {
195-
selected = selectSecondaries(rp, isOutputAggregate, candidates)
205+
selected = selectSecondaries(rp, candidates)
196206
return selectByTagSet(selected, rp.TagSets()), nil
197207
}
198208

199209
return selected, nil
200210
case readpref.SecondaryPreferredMode:
201-
selected := selectSecondaries(rp, isOutputAggregate, candidates)
211+
selected := selectSecondaries(rp, candidates)
202212
selected = selectByTagSet(selected, rp.TagSets())
203213
if len(selected) > 0 {
204214
return selected, nil
205215
}
206216
return selectByKind(candidates, RSPrimary), nil
207217
case readpref.SecondaryMode:
208-
selected := selectSecondaries(rp, isOutputAggregate, candidates)
209-
selected = selectByTagSet(selected, rp.TagSets())
210-
211-
// When selecting a server for an aggregate with an output stage,
212-
// Secondary read preference functions identically to SecondaryPreferred:
213-
// if no secondaries are available (none >= wire version 13), we fall
214-
// back to primary.
215-
if !isOutputAggregate || len(selected) > 0 {
216-
return selected, nil
217-
}
218-
return selectByKind(candidates, RSPrimary), nil
218+
selected := selectSecondaries(rp, candidates)
219+
return selectByTagSet(selected, rp.TagSets()), nil
219220
case readpref.NearestMode:
220221
selected := selectByKind(candidates, RSPrimary)
221-
selected = append(selected, selectSecondaries(rp, isOutputAggregate, candidates)...)
222+
selected = append(selected, selectSecondaries(rp, candidates)...)
222223
return selectByTagSet(selected, rp.TagSets()), nil
223224
}
224225

225226
return nil, fmt.Errorf("unsupported mode: %d", rp.Mode())
226227
}
227228

228-
func selectSecondaries(rp *readpref.ReadPref, isOutputAggregate bool, candidates []Server) []Server {
229+
func selectSecondaries(rp *readpref.ReadPref, candidates []Server) []Server {
229230
secondaries := selectByKind(candidates, RSSecondary)
230231
if len(secondaries) == 0 {
231232
return secondaries
@@ -244,11 +245,7 @@ func selectSecondaries(rp *readpref.ReadPref, isOutputAggregate bool, candidates
244245
for _, secondary := range secondaries {
245246
estimatedStaleness := baseTime.Sub(secondary.LastWriteTime) + secondary.HeartbeatInterval
246247
if estimatedStaleness <= maxStaleness {
247-
// If underlying operation is an aggregate with an output stage, only select secondaries with
248-
// wire version 13 or greater.
249-
if !isOutputAggregate || secondary.WireVersion.Max >= 13 {
250-
selected = append(selected, secondary)
251-
}
248+
selected = append(selected, secondary)
252249
}
253250
}
254251

@@ -261,11 +258,7 @@ func selectSecondaries(rp *readpref.ReadPref, isOutputAggregate bool, candidates
261258
for _, secondary := range secondaries {
262259
estimatedStaleness := secondary.LastUpdateTime.Sub(secondary.LastWriteTime) - primary.LastUpdateTime.Sub(primary.LastWriteTime) + secondary.HeartbeatInterval
263260
if estimatedStaleness <= maxStaleness {
264-
// If underlying operation is an aggregate with an output stage, only select secondaries with
265-
// wire version 13 or greater.
266-
if !isOutputAggregate || secondary.WireVersion.Max >= 13 {
267-
selected = append(selected, secondary)
268-
}
261+
selected = append(selected, secondary)
269262
}
270263
}
271264
return selected

x/mongo/driver/operation.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,8 @@ func (op Operation) getReadPrefBasedOnTransaction() (*readpref.ReadPref, error)
12041204
}
12051205

12061206
func (op Operation) createReadPref(desc description.SelectedServer, isOpQuery bool) (bsoncore.Document, error) {
1207+
// TODO(GODRIVER-2231): Instead of checking if isOutputAggregate and desc.Server.WireVersion.Max < 13,
1208+
// somehow check if supplied readPreference was "overwritten" with primary in description.selectForReplicaSet.
12071209
if desc.Server.Kind == description.Standalone || (isOpQuery && desc.Server.Kind != description.Mongos) ||
12081210
op.Type == Write || (op.IsOutputAggregate && desc.Server.WireVersion.Max < 13) {
12091211
// Don't send read preference for:

0 commit comments

Comments
 (0)