Skip to content

Commit ff91465

Browse files
authored
Do not return partial data if 422 or half of fleet is down (#6765)
* Short-circuit replication set if 422 is returned Signed-off-by: Justin Jung <[email protected]> * Do not return partial data if at least half of fleet is down Signed-off-by: Justin Jung <[email protected]> * Add user and request info in partial data log Signed-off-by: Justin Jung <[email protected]> * No need to return all errors from ingesters to user Signed-off-by: Justin Jung <[email protected]> * Fix test Signed-off-by: Justin Jung <[email protected]> * Add zone info in the log Signed-off-by: Justin Jung <[email protected]> * Change condition to check validation.LimitError Signed-off-by: Justin Jung <[email protected]> * Nit Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent 89d3895 commit ff91465

File tree

7 files changed

+64
-12
lines changed

7 files changed

+64
-12
lines changed

pkg/distributor/query.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cortexproject/cortex/pkg/util/extract"
2323
"github.com/cortexproject/cortex/pkg/util/grpcutil"
2424
"github.com/cortexproject/cortex/pkg/util/limiter"
25+
util_log "github.com/cortexproject/cortex/pkg/util/log"
2526
"github.com/cortexproject/cortex/pkg/util/validation"
2627
)
2728

@@ -330,9 +331,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
330331
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))
331332

332333
if partialdata.IsPartialDataError(err) {
333-
level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error())
334+
level.Warn(util_log.WithContext(ctx, d.log)).Log("msg", "returning partial data", "err", err.Error())
334335
d.ingesterPartialDataQueries.Inc()
335-
return resp, err
336+
return resp, partialdata.ErrPartialData
336337
}
337338

338339
return resp, nil

pkg/ring/replication_set.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/cortexproject/cortex/pkg/querier/partialdata"
10+
"github.com/cortexproject/cortex/pkg/util/validation"
1011
)
1112

1213
// ReplicationSet describes the instances to talk to for a given key, and how
@@ -80,6 +81,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults
8081
return nil, res.err
8182
}
8283

84+
if validation.IsLimitError(res.err) {
85+
return nil, res.err
86+
}
87+
8388
// force one of the delayed requests to start
8489
if delay > 0 && r.MaxUnavailableZones == 0 {
8590
forceStart <- struct{}{}

pkg/ring/replication_set_test.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/atomic"
1212

1313
"github.com/cortexproject/cortex/pkg/querier/partialdata"
14+
"github.com/cortexproject/cortex/pkg/util/validation"
1415
)
1516

1617
func TestReplicationSet_GetAddresses(t *testing.T) {
@@ -196,12 +197,17 @@ func TestReplicationSet_Do(t *testing.T) {
196197
expectedError: errZoneFailure,
197198
},
198199
{
199-
name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)",
200-
instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}},
201-
f: failingFunctionOnZones("zone1", "zone2"),
200+
name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (6 instances)",
201+
instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}, {Addr: "10.0.0.4", Zone: "zone1"}, {Addr: "10.0.0.5", Zone: "zone2"}, {Addr: "10.0.0.6", Zone: "zone3"}},
202+
f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) {
203+
if ing.Addr == "10.0.0.1" || ing.Addr == "10.0.0.2" {
204+
return nil, errZoneFailure
205+
}
206+
return 1, nil
207+
},
202208
maxUnavailableZones: 1,
203209
queryPartialData: true,
204-
want: []interface{}{1},
210+
want: []interface{}{1, 1, 1, 1},
205211
expectedError: partialdata.ErrPartialData,
206212
errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"},
207213
},
@@ -213,6 +219,19 @@ func TestReplicationSet_Do(t *testing.T) {
213219
expectedError: errZoneFailure,
214220
queryPartialData: true,
215221
},
222+
{
223+
name: "with partial data enabled, should fail on instances returning 422",
224+
instances: []InstanceDesc{{Addr: "1", Zone: "zone1"}, {Addr: "2", Zone: "zone2"}, {Addr: "3", Zone: "zone3"}, {Addr: "4", Zone: "zone1"}, {Addr: "5", Zone: "zone2"}, {Addr: "6", Zone: "zone3"}},
225+
f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) {
226+
if ing.Addr == "1" || ing.Addr == "2" {
227+
return nil, validation.LimitError("limit breached")
228+
}
229+
return 1, nil
230+
},
231+
maxUnavailableZones: 1,
232+
expectedError: validation.LimitError("limit breached"),
233+
queryPartialData: true,
234+
},
216235
{
217236
name: "max unavailable zones = 1, should succeed on instances failing in 1 out of 3 zones (6 instances)",
218237
instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone3"}},
@@ -266,7 +285,7 @@ func TestReplicationSet_Do(t *testing.T) {
266285
}
267286
got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f)
268287
if tt.expectedError != nil {
269-
assert.ErrorIs(t, err, tt.expectedError)
288+
assert.ErrorContains(t, err, tt.expectedError.Error())
270289
for _, str := range tt.errStrContains {
271290
assert.ErrorContains(t, err, str)
272291
}

pkg/ring/replication_set_tracker.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ring
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
)
46

57
type replicationSetResultTracker interface {
68
// Signals an instance has done the execution, either successful (no error)
@@ -121,7 +123,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
121123
func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) {
122124
if err != nil {
123125
t.failuresByZone[instance.Zone]++
124-
t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err))
126+
t.errors = append(t.errors, fmt.Errorf("(%s, %s) %w", instance.GetAddr(), instance.GetZone(), err))
125127
} else {
126128
if _, ok := t.resultsPerZone[instance.Zone]; !ok {
127129
// If it is the first result in the zone, then total number of instances
@@ -160,7 +162,9 @@ func (t *zoneAwareResultTracker) failed() bool {
160162

161163
func (t *zoneAwareResultTracker) failedCompletely() bool {
162164
failedZones := len(t.failuresByZone)
163-
return failedZones == t.zoneCount
165+
allZonesFailed := failedZones == t.zoneCount
166+
atLeastHalfOfFleetFailed := len(t.errors) >= t.numInstances/2
167+
return allZonesFailed || (t.failed() && atLeastHalfOfFleetFailed)
164168
}
165169

166170
func (t *zoneAwareResultTracker) getResults() []interface{} {

pkg/ring/replication_set_tracker_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,18 @@ func TestZoneAwareResultTracker(t *testing.T) {
467467
assert.True(t, tracker.failedCompletely())
468468
},
469469
},
470+
"failedCompletely() should return true if failed() is true and half of the fleet are unavailable": {
471+
instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6},
472+
maxUnavailableZones: 1,
473+
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
474+
tracker.done(&instance1, nil, errors.New("test")) // Zone-a
475+
tracker.done(&instance2, nil, errors.New("test")) // Zone-a
476+
tracker.done(&instance3, nil, errors.New("test")) // Zone-b
477+
478+
assert.True(t, tracker.failed())
479+
assert.True(t, tracker.failedCompletely())
480+
},
481+
},
470482
"finished() should return true only if all instances are done": {
471483
instances: []InstanceDesc{instance1, instance2},
472484
maxUnavailableZones: 1,
@@ -483,11 +495,11 @@ func TestZoneAwareResultTracker(t *testing.T) {
483495
maxUnavailableZones: 1,
484496
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
485497
tracker.done(&instance1, nil, errors.New("test1"))
486-
err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1"))
498+
err1 := fmt.Errorf("(%s, %s) %w", instance1.GetAddr(), instance2.GetZone(), errors.New("test1"))
487499
assert.ElementsMatch(t, []error{err1}, tracker.getErrors())
488500

489501
tracker.done(&instance2, nil, errors.New("test2"))
490-
err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2"))
502+
err2 := fmt.Errorf("(%s, %s) %w", instance2.GetAddr(), instance2.GetZone(), errors.New("test2"))
491503
assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors())
492504
},
493505
},

pkg/util/validation/limits.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ func (e LimitError) Error() string {
5050
return string(e)
5151
}
5252

53+
func IsLimitError(e error) bool {
54+
var limitError LimitError
55+
return errors.As(e, &limitError)
56+
}
57+
5358
type DisabledRuleGroup struct {
5459
Namespace string `yaml:"namespace" doc:"nocli|description=namespace in which the rule group belongs"`
5560
Name string `yaml:"name" doc:"nocli|description=name of the rule group"`

pkg/util/validation/limits_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package validation
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"reflect"
67
"regexp"
78
"strings"
@@ -885,3 +886,8 @@ func TestLimitsPerLabelSetsForSeries(t *testing.T) {
885886
})
886887
}
887888
}
889+
890+
func TestIsLimitError(t *testing.T) {
891+
assert.False(t, IsLimitError(fmt.Errorf("test error")))
892+
assert.True(t, IsLimitError(LimitError("test error")))
893+
}

0 commit comments

Comments
 (0)