From a5a1e97ab433cc52ac746988984817013dec71d0 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 26 Mar 2025 12:11:00 -0700 Subject: [PATCH 1/5] add metric Signed-off-by: Justin Jung --- pkg/distributor/distributor.go | 6 ++++++ pkg/distributor/query.go | 1 + 2 files changed, 7 insertions(+) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f9b347e56e4..a6a6efb8fec 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -123,6 +123,7 @@ type Distributor struct { ingesterAppendFailures *prometheus.CounterVec ingesterQueries *prometheus.CounterVec ingesterQueryFailures *prometheus.CounterVec + ingesterPartialDataQueries prometheus.Counter replicationFactor prometheus.Gauge latestSeenSampleTimestampPerUser *prometheus.GaugeVec @@ -375,6 +376,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "distributor_ingester_query_failures_total", Help: "The total number of failed queries sent to ingesters.", }, []string{"ingester"}), + ingesterPartialDataQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_ingester_partial_data_queries_total", + Help: "The total number of queries sent to ingesters that may have returned partial data.", + }), replicationFactor: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "distributor_replication_factor", diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 3733d9dc353..28c00be807d 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -331,6 +331,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri if partialdata.IsPartialDataError(err) { level.Info(d.log).Log("msg", "returning partial data") + d.ingesterPartialDataQueries.Inc() return resp, err } From d3018ca1bdeb33d650195ddef9a9b72f8fe16697 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 27 Mar 2025 09:54:54 -0700 Subject: [PATCH 2/5] improve log to include failed instance address Signed-off-by: Justin Jung --- pkg/distributor/query.go | 2 +- pkg/ring/replication_set.go | 3 ++- pkg/ring/replication_set_test.go | 9 +++++++-- pkg/ring/replication_set_tracker.go | 19 ++++++++++++++++++- pkg/ring/replication_set_tracker_test.go | 22 ++++++++++++++++++++++ 5 files changed, 50 insertions(+), 5 deletions(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 28c00be807d..8de7630e755 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -330,7 +330,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats.AddFetchedSamples(uint64(resp.SamplesCount())) if partialdata.IsPartialDataError(err) { - level.Info(d.log).Log("msg", "returning partial data") + level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error()) d.ingesterPartialDataQueries.Inc() return resp, err } diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 497e7287930..285192f6ab2 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -2,6 +2,7 @@ package ring import ( "context" + "fmt" "sort" "time" @@ -102,7 +103,7 @@ track: } if partialDataEnabled && trackerFailed { - return tracker.getResults(), partialdata.ErrPartialData + return tracker.getResults(), fmt.Errorf("failed to get data from %s: %w", tracker.failedInstances(), partialdata.ErrPartialData) } return tracker.getResults(), nil diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index b5764563797..dd1d276809e 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -124,6 +124,7 @@ func TestReplicationSet_Do(t *testing.T) { expectedError error zoneResultsQuorum bool queryPartialData bool + errStrContains []string }{ { name: "max errors = 0, no errors no delay", @@ -196,12 +197,13 @@ func TestReplicationSet_Do(t *testing.T) { }, { name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)", - instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}}, f: failingFunctionOnZones("zone1", "zone2"), maxUnavailableZones: 1, queryPartialData: true, want: []interface{}{1}, expectedError: partialdata.ErrPartialData, + errStrContains: []string{"failed to get data from", "10.0.0.1", "10.0.0.2"}, }, { name: "with partial data enabled, should fail on instances failing in all zones", @@ -264,7 +266,10 @@ func TestReplicationSet_Do(t *testing.T) { } got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f) if tt.expectedError != nil { - assert.Equal(t, tt.expectedError, err) + assert.ErrorIs(t, err, tt.expectedError) + for _, str := range tt.errStrContains { + assert.ErrorContains(t, err, str) + } } else { assert.NoError(t, err) } diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index 3d9d67901f9..278adf92d21 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -12,6 +12,9 @@ type replicationSetResultTracker interface { // Returns true if the maximum number of failed executions have been reached. failed() bool + // Returns list of instance addresses that failed + failedInstances() []string + // Returns true if executions failed in all zones. Only relevant for zoneAwareResultTracker. failedInAllZones() bool @@ -24,6 +27,7 @@ type defaultResultTracker struct { numSucceeded int numErrors int maxErrors int + failedInst []string results []interface{} } @@ -33,15 +37,17 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe numSucceeded: 0, numErrors: 0, maxErrors: maxErrors, + failedInst: make([]string, 0, len(instances)), results: make([]interface{}, 0, len(instances)), } } -func (t *defaultResultTracker) done(_ *InstanceDesc, result interface{}, err error) { +func (t *defaultResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err == nil { t.numSucceeded++ t.results = append(t.results, result) } else { + t.failedInst = append(t.failedInst, instance.GetAddr()) t.numErrors++ } } @@ -58,6 +64,10 @@ func (t *defaultResultTracker) failedInAllZones() bool { return false } +func (t *defaultResultTracker) failedInstances() []string { + return t.failedInst +} + func (t *defaultResultTracker) getResults() []interface{} { return t.results } @@ -72,6 +82,7 @@ type zoneAwareResultTracker struct { resultsPerZone map[string][]interface{} numInstances int zoneResultsQuorum bool + failedInst []string zoneCount int } @@ -82,6 +93,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int maxUnavailableZones: maxUnavailableZones, numInstances: len(instances), zoneResultsQuorum: zoneResultsQuorum, + failedInst: make([]string, 0, len(instances)), } for _, instance := range instances { @@ -97,6 +109,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ + t.failedInst = append(t.failedInst, instance.Addr) } else { if _, ok := t.resultsPerZone[instance.Zone]; !ok { // If it is the first result in the zone, then total number of instances @@ -133,6 +146,10 @@ func (t *zoneAwareResultTracker) failedInAllZones() bool { return failedZones == t.zoneCount } +func (t *zoneAwareResultTracker) failedInstances() []string { + return t.failedInst +} + func (t *zoneAwareResultTracker) getResults() []interface{} { results := make([]interface{}, 0, t.numInstances) if t.zoneResultsQuorum { diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index 1e22418ecbd..dd57ec30657 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -154,6 +154,17 @@ func TestDefaultResultTracker(t *testing.T) { assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults()) }, }, + "failedInstances should work": { + instances: []InstanceDesc{instance1, instance2}, + maxErrors: 2, + run: func(t *testing.T, tracker *defaultResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) + assert.ElementsMatch(t, []string{"127.0.0.1"}, tracker.failedInstances()) + + tracker.done(&instance2, nil, errors.New("test")) + assert.ElementsMatch(t, []string{"127.0.0.1", "127.0.0.2"}, tracker.failedInstances()) + }, + }, } for testName, testCase := range tests { @@ -422,6 +433,17 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.True(t, tracker.failedInAllZones()) }, }, + "failedInstances should work": { + instances: []InstanceDesc{instance1, instance2}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) + assert.ElementsMatch(t, []string{"127.0.0.1"}, tracker.failedInstances()) + + tracker.done(&instance2, nil, errors.New("test")) + assert.ElementsMatch(t, []string{"127.0.0.1", "127.0.0.2"}, tracker.failedInstances()) + }, + }, } for testName, testCase := range tests { From c56df2535da29b8223123f9aadc356a7e22ae5f9 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 27 Mar 2025 10:11:08 -0700 Subject: [PATCH 3/5] more tests Signed-off-by: Justin Jung --- pkg/ring/replication_set.go | 2 +- pkg/ring/replication_set_tracker.go | 12 +++++----- pkg/ring/replication_set_tracker_test.go | 28 ++++++++++++++++++++---- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 285192f6ab2..9bf0d073fc0 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -81,7 +81,7 @@ track: tracker.done(res.instance, res.res, res.err) if res.err != nil { if tracker.failed() { - if !partialDataEnabled || tracker.failedInAllZones() { + if !partialDataEnabled || tracker.failedCompletely() { return nil, res.err } trackerFailed = true diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index 278adf92d21..206b6623af3 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -15,8 +15,8 @@ type replicationSetResultTracker interface { // Returns list of instance addresses that failed failedInstances() []string - // Returns true if executions failed in all zones. Only relevant for zoneAwareResultTracker. - failedInAllZones() bool + // Returns true if executions failed in all instances or all zones. + failedCompletely() bool // Returns recorded results. getResults() []interface{} @@ -29,6 +29,7 @@ type defaultResultTracker struct { maxErrors int failedInst []string results []interface{} + numInstances int } func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker { @@ -39,6 +40,7 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe maxErrors: maxErrors, failedInst: make([]string, 0, len(instances)), results: make([]interface{}, 0, len(instances)), + numInstances: len(instances), } } @@ -60,8 +62,8 @@ func (t *defaultResultTracker) failed() bool { return t.numErrors > t.maxErrors } -func (t *defaultResultTracker) failedInAllZones() bool { - return false +func (t *defaultResultTracker) failedCompletely() bool { + return t.numInstances == t.numErrors } func (t *defaultResultTracker) failedInstances() []string { @@ -141,7 +143,7 @@ func (t *zoneAwareResultTracker) failed() bool { return failedZones > t.maxUnavailableZones } -func (t *zoneAwareResultTracker) failedInAllZones() bool { +func (t *zoneAwareResultTracker) failedCompletely() bool { failedZones := len(t.failuresByZone) return failedZones == t.zoneCount } diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index dd57ec30657..247a24892a0 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -154,6 +154,26 @@ func TestDefaultResultTracker(t *testing.T) { assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults()) }, }, + "failedCompletely should return true only if all instances have failed, regardless of max errors": { + instances: []InstanceDesc{instance1, instance2, instance3}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + assert.False(t, tracker.failedCompletely()) + + tracker.done(&instance2, nil, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + assert.False(t, tracker.failedCompletely()) + + tracker.done(&instance3, nil, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + assert.True(t, tracker.failedCompletely()) + }, + }, "failedInstances should work": { instances: []InstanceDesc{instance1, instance2}, maxErrors: 2, @@ -410,7 +430,7 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.failed()) }, }, - "failInAllZones should return true only if all zones have failed, regardless of max unavailable zones": { + "failedCompletely should return true only if all zones have failed, regardless of max unavailable zones": { instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, maxUnavailableZones: 1, run: func(t *testing.T, tracker *zoneAwareResultTracker) { @@ -418,19 +438,19 @@ func TestZoneAwareResultTracker(t *testing.T) { tracker.done(&instance1, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - assert.False(t, tracker.failedInAllZones()) + assert.False(t, tracker.failedCompletely()) // Zone-b tracker.done(&instance3, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) - assert.False(t, tracker.failedInAllZones()) + assert.False(t, tracker.failedCompletely()) // Zone-c tracker.done(&instance5, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) - assert.True(t, tracker.failedInAllZones()) + assert.True(t, tracker.failedCompletely()) }, }, "failedInstances should work": { From 6f08c10c530160c1f00a94e0bc1399206fc9da73 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 27 Mar 2025 10:12:18 -0700 Subject: [PATCH 4/5] changelog Signed-off-by: Justin Jung --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b6c3743ce5..4cf877a357a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [ENHANCEMENT] Query Frontend: Add a `-frontend.enabled-ruler-query-stats` flag to configure whether to report the query stats log for queries coming from the Ruler. #6504 * [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617 * [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628 +* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 From 36b6fccbbd95aea29f724af8699d4419b6fd774b Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 27 Mar 2025 15:26:26 -0700 Subject: [PATCH 5/5] include all partial errors in log Signed-off-by: Justin Jung --- pkg/ring/replication_set.go | 25 +++++------- pkg/ring/replication_set_test.go | 2 +- pkg/ring/replication_set_tracker.go | 49 ++++++++++++++++-------- pkg/ring/replication_set_tracker_test.go | 45 +++++++++++++++++----- 4 files changed, 78 insertions(+), 43 deletions(-) diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 9bf0d073fc0..745c742f990 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -71,20 +71,13 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults }(i, &r.Instances[i]) } - trackerFailed := false - cnt := 0 - -track: - for !tracker.succeeded() { + for !tracker.succeeded() && !tracker.finished() { select { case res := <-ch: tracker.done(res.instance, res.res, res.err) if res.err != nil { - if tracker.failed() { - if !partialDataEnabled || tracker.failedCompletely() { - return nil, res.err - } - trackerFailed = true + if tracker.failed() && (!partialDataEnabled || tracker.failedCompletely()) { + return nil, res.err } // force one of the delayed requests to start @@ -92,18 +85,18 @@ track: forceStart <- struct{}{} } } - cnt++ - if cnt == len(r.Instances) { - break track - } case <-ctx.Done(): return nil, ctx.Err() } } - if partialDataEnabled && trackerFailed { - return tracker.getResults(), fmt.Errorf("failed to get data from %s: %w", tracker.failedInstances(), partialdata.ErrPartialData) + if partialDataEnabled && tracker.failed() { + finalErr := partialdata.ErrPartialData + for _, partialErr := range tracker.getErrors() { + finalErr = fmt.Errorf("%w: %w", finalErr, partialErr) + } + return tracker.getResults(), finalErr } return tracker.getResults(), nil diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index dd1d276809e..d72e4cd5257 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -203,7 +203,7 @@ func TestReplicationSet_Do(t *testing.T) { queryPartialData: true, want: []interface{}{1}, expectedError: partialdata.ErrPartialData, - errStrContains: []string{"failed to get data from", "10.0.0.1", "10.0.0.2"}, + errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"}, }, { name: "with partial data enabled, should fail on instances failing in all zones", diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index 206b6623af3..a0f594b442c 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -1,25 +1,30 @@ package ring +import "fmt" + type replicationSetResultTracker interface { // Signals an instance has done the execution, either successful (no error) // or failed (with error). If successful, result will be recorded and can // be accessed via getResults. done(instance *InstanceDesc, result interface{}, err error) + // Returns true if all instances are done executing + finished() bool + // Returns true if the minimum number of successful results have been received. succeeded() bool // Returns true if the maximum number of failed executions have been reached. failed() bool - // Returns list of instance addresses that failed - failedInstances() []string - // Returns true if executions failed in all instances or all zones. failedCompletely() bool // Returns recorded results. getResults() []interface{} + + // Returns errors + getErrors() []error } type defaultResultTracker struct { @@ -27,9 +32,9 @@ type defaultResultTracker struct { numSucceeded int numErrors int maxErrors int - failedInst []string results []interface{} numInstances int + errors []error } func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker { @@ -38,7 +43,7 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe numSucceeded: 0, numErrors: 0, maxErrors: maxErrors, - failedInst: make([]string, 0, len(instances)), + errors: make([]error, 0, len(instances)), results: make([]interface{}, 0, len(instances)), numInstances: len(instances), } @@ -49,11 +54,15 @@ func (t *defaultResultTracker) done(instance *InstanceDesc, result interface{}, t.numSucceeded++ t.results = append(t.results, result) } else { - t.failedInst = append(t.failedInst, instance.GetAddr()) + t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err)) t.numErrors++ } } +func (t *defaultResultTracker) finished() bool { + return t.numSucceeded+t.numErrors == t.numInstances +} + func (t *defaultResultTracker) succeeded() bool { return t.numSucceeded >= t.minSucceeded } @@ -66,14 +75,14 @@ func (t *defaultResultTracker) failedCompletely() bool { return t.numInstances == t.numErrors } -func (t *defaultResultTracker) failedInstances() []string { - return t.failedInst -} - func (t *defaultResultTracker) getResults() []interface{} { return t.results } +func (t *defaultResultTracker) getErrors() []error { + return t.errors +} + // zoneAwareResultTracker tracks the results per zone. // All instances in a zone must succeed in order for the zone to succeed. type zoneAwareResultTracker struct { @@ -84,8 +93,9 @@ type zoneAwareResultTracker struct { resultsPerZone map[string][]interface{} numInstances int zoneResultsQuorum bool - failedInst []string zoneCount int + doneCount int + errors []error } func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int, zoneResultsQuorum bool) *zoneAwareResultTracker { @@ -95,7 +105,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int maxUnavailableZones: maxUnavailableZones, numInstances: len(instances), zoneResultsQuorum: zoneResultsQuorum, - failedInst: make([]string, 0, len(instances)), + errors: make([]error, 0, len(instances)), } for _, instance := range instances { @@ -111,7 +121,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ - t.failedInst = append(t.failedInst, instance.Addr) + t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err)) } else { if _, ok := t.resultsPerZone[instance.Zone]; !ok { // If it is the first result in the zone, then total number of instances @@ -122,6 +132,11 @@ func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{} } t.waitingByZone[instance.Zone]-- + t.doneCount++ +} + +func (t *zoneAwareResultTracker) finished() bool { + return t.doneCount == t.numInstances } func (t *zoneAwareResultTracker) succeeded() bool { @@ -148,10 +163,6 @@ func (t *zoneAwareResultTracker) failedCompletely() bool { return failedZones == t.zoneCount } -func (t *zoneAwareResultTracker) failedInstances() []string { - return t.failedInst -} - func (t *zoneAwareResultTracker) getResults() []interface{} { results := make([]interface{}, 0, t.numInstances) if t.zoneResultsQuorum { @@ -169,3 +180,7 @@ func (t *zoneAwareResultTracker) getResults() []interface{} { } return results } + +func (t *zoneAwareResultTracker) getErrors() []error { + return t.errors +} diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index 247a24892a0..ed782739c77 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -2,6 +2,7 @@ package ring import ( "errors" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -154,7 +155,7 @@ func TestDefaultResultTracker(t *testing.T) { assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults()) }, }, - "failedCompletely should return true only if all instances have failed, regardless of max errors": { + "failedCompletely() should return true only if all instances have failed, regardless of max errors": { instances: []InstanceDesc{instance1, instance2, instance3}, maxErrors: 1, run: func(t *testing.T, tracker *defaultResultTracker) { @@ -174,15 +175,28 @@ func TestDefaultResultTracker(t *testing.T) { assert.True(t, tracker.failedCompletely()) }, }, - "failedInstances should work": { + "finished() should return true only if all instances are done": { instances: []InstanceDesc{instance1, instance2}, - maxErrors: 2, + maxErrors: 1, run: func(t *testing.T, tracker *defaultResultTracker) { tracker.done(&instance1, nil, errors.New("test")) - assert.ElementsMatch(t, []string{"127.0.0.1"}, tracker.failedInstances()) + assert.False(t, tracker.finished()) tracker.done(&instance2, nil, errors.New("test")) - assert.ElementsMatch(t, []string{"127.0.0.1", "127.0.0.2"}, tracker.failedInstances()) + assert.True(t, tracker.finished()) + }, + }, + "getErrors() should return list of all errors": { + instances: []InstanceDesc{instance1, instance2}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + tracker.done(&instance1, nil, errors.New("test1")) + err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1")) + assert.ElementsMatch(t, []error{err1}, tracker.getErrors()) + + tracker.done(&instance2, nil, errors.New("test2")) + err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2")) + assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors()) }, }, } @@ -430,7 +444,7 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.failed()) }, }, - "failedCompletely should return true only if all zones have failed, regardless of max unavailable zones": { + "failedCompletely() should return true only if all zones have failed, regardless of max unavailable zones": { instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, maxUnavailableZones: 1, run: func(t *testing.T, tracker *zoneAwareResultTracker) { @@ -453,15 +467,28 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.True(t, tracker.failedCompletely()) }, }, - "failedInstances should work": { + "finished() should return true only if all instances are done": { instances: []InstanceDesc{instance1, instance2}, maxUnavailableZones: 1, run: func(t *testing.T, tracker *zoneAwareResultTracker) { tracker.done(&instance1, nil, errors.New("test")) - assert.ElementsMatch(t, []string{"127.0.0.1"}, tracker.failedInstances()) + assert.False(t, tracker.finished()) tracker.done(&instance2, nil, errors.New("test")) - assert.ElementsMatch(t, []string{"127.0.0.1", "127.0.0.2"}, tracker.failedInstances()) + assert.True(t, tracker.finished()) + }, + }, + "getErrors() should return list of all errors": { + instances: []InstanceDesc{instance1, instance2}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + tracker.done(&instance1, nil, errors.New("test1")) + err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1")) + assert.ElementsMatch(t, []error{err1}, tracker.getErrors()) + + tracker.done(&instance2, nil, errors.New("test2")) + err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2")) + assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors()) }, }, }