diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b6c3743ce5..4cf877a357a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [ENHANCEMENT] Query Frontend: Add a `-frontend.enabled-ruler-query-stats` flag to configure whether to report the query stats log for queries coming from the Ruler. #6504 * [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617 * [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628 +* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f9b347e56e4..a6a6efb8fec 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -123,6 +123,7 @@ type Distributor struct { ingesterAppendFailures *prometheus.CounterVec ingesterQueries *prometheus.CounterVec ingesterQueryFailures *prometheus.CounterVec + ingesterPartialDataQueries prometheus.Counter replicationFactor prometheus.Gauge latestSeenSampleTimestampPerUser *prometheus.GaugeVec @@ -375,6 +376,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "distributor_ingester_query_failures_total", Help: "The total number of failed queries sent to ingesters.", }, []string{"ingester"}), + ingesterPartialDataQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_ingester_partial_data_queries_total", + Help: "The total number of queries sent to ingesters that may have returned partial data.", + }), replicationFactor: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "distributor_replication_factor", diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 3733d9dc353..8de7630e755 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -330,7 +330,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats.AddFetchedSamples(uint64(resp.SamplesCount())) if partialdata.IsPartialDataError(err) { - level.Info(d.log).Log("msg", "returning partial data") + level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error()) + d.ingesterPartialDataQueries.Inc() return resp, err } diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 497e7287930..745c742f990 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -2,6 +2,7 @@ package ring import ( "context" + "fmt" "sort" "time" @@ -70,20 +71,13 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults }(i, &r.Instances[i]) } - trackerFailed := false - cnt := 0 - -track: - for !tracker.succeeded() { + for !tracker.succeeded() && !tracker.finished() { select { case res := <-ch: tracker.done(res.instance, res.res, res.err) if res.err != nil { - if tracker.failed() { - if !partialDataEnabled || tracker.failedInAllZones() { - return nil, res.err - } - trackerFailed = true + if tracker.failed() && (!partialDataEnabled || tracker.failedCompletely()) { + return nil, res.err } // force one of the delayed requests to start @@ -91,18 +85,18 @@ track: forceStart <- struct{}{} } } - cnt++ - if cnt == len(r.Instances) { - break track - } case <-ctx.Done(): return nil, ctx.Err() } } - if partialDataEnabled && trackerFailed { - return tracker.getResults(), partialdata.ErrPartialData + if partialDataEnabled && tracker.failed() { + finalErr := partialdata.ErrPartialData + for _, partialErr := range tracker.getErrors() { + finalErr = fmt.Errorf("%w: %w", finalErr, partialErr) + } + return tracker.getResults(), finalErr } return tracker.getResults(), nil diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index b5764563797..d72e4cd5257 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -124,6 +124,7 @@ func TestReplicationSet_Do(t *testing.T) { expectedError error zoneResultsQuorum bool queryPartialData bool + errStrContains []string }{ { name: "max errors = 0, no errors no delay", @@ -196,12 +197,13 @@ func TestReplicationSet_Do(t *testing.T) { }, { name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)", - instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}}, f: failingFunctionOnZones("zone1", "zone2"), maxUnavailableZones: 1, queryPartialData: true, want: []interface{}{1}, expectedError: partialdata.ErrPartialData, + errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"}, }, { name: "with partial data enabled, should fail on instances failing in all zones", @@ -264,7 +266,10 @@ func TestReplicationSet_Do(t *testing.T) { } got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f) if tt.expectedError != nil { - assert.Equal(t, tt.expectedError, err) + assert.ErrorIs(t, err, tt.expectedError) + for _, str := range tt.errStrContains { + assert.ErrorContains(t, err, str) + } } else { assert.NoError(t, err) } diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index 3d9d67901f9..a0f594b442c 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -1,22 +1,30 @@ package ring +import "fmt" + type replicationSetResultTracker interface { // Signals an instance has done the execution, either successful (no error) // or failed (with error). If successful, result will be recorded and can // be accessed via getResults. done(instance *InstanceDesc, result interface{}, err error) + // Returns true if all instances are done executing + finished() bool + // Returns true if the minimum number of successful results have been received. succeeded() bool // Returns true if the maximum number of failed executions have been reached. failed() bool - // Returns true if executions failed in all zones. Only relevant for zoneAwareResultTracker. - failedInAllZones() bool + // Returns true if executions failed in all instances or all zones. + failedCompletely() bool // Returns recorded results. getResults() []interface{} + + // Returns errors + getErrors() []error } type defaultResultTracker struct { @@ -25,6 +33,8 @@ type defaultResultTracker struct { numErrors int maxErrors int results []interface{} + numInstances int + errors []error } func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker { @@ -33,19 +43,26 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe numSucceeded: 0, numErrors: 0, maxErrors: maxErrors, + errors: make([]error, 0, len(instances)), results: make([]interface{}, 0, len(instances)), + numInstances: len(instances), } } -func (t *defaultResultTracker) done(_ *InstanceDesc, result interface{}, err error) { +func (t *defaultResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err == nil { t.numSucceeded++ t.results = append(t.results, result) } else { + t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err)) t.numErrors++ } } +func (t *defaultResultTracker) finished() bool { + return t.numSucceeded+t.numErrors == t.numInstances +} + func (t *defaultResultTracker) succeeded() bool { return t.numSucceeded >= t.minSucceeded } @@ -54,14 +71,18 @@ func (t *defaultResultTracker) failed() bool { return t.numErrors > t.maxErrors } -func (t *defaultResultTracker) failedInAllZones() bool { - return false +func (t *defaultResultTracker) failedCompletely() bool { + return t.numInstances == t.numErrors } func (t *defaultResultTracker) getResults() []interface{} { return t.results } +func (t *defaultResultTracker) getErrors() []error { + return t.errors +} + // zoneAwareResultTracker tracks the results per zone. // All instances in a zone must succeed in order for the zone to succeed. type zoneAwareResultTracker struct { @@ -73,6 +94,8 @@ type zoneAwareResultTracker struct { numInstances int zoneResultsQuorum bool zoneCount int + doneCount int + errors []error } func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int, zoneResultsQuorum bool) *zoneAwareResultTracker { @@ -82,6 +105,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int maxUnavailableZones: maxUnavailableZones, numInstances: len(instances), zoneResultsQuorum: zoneResultsQuorum, + errors: make([]error, 0, len(instances)), } for _, instance := range instances { @@ -97,6 +121,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ + t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err)) } else { if _, ok := t.resultsPerZone[instance.Zone]; !ok { // If it is the first result in the zone, then total number of instances @@ -107,6 +132,11 @@ func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{} } t.waitingByZone[instance.Zone]-- + t.doneCount++ +} + +func (t *zoneAwareResultTracker) finished() bool { + return t.doneCount == t.numInstances } func (t *zoneAwareResultTracker) succeeded() bool { @@ -128,7 +158,7 @@ func (t *zoneAwareResultTracker) failed() bool { return failedZones > t.maxUnavailableZones } -func (t *zoneAwareResultTracker) failedInAllZones() bool { +func (t *zoneAwareResultTracker) failedCompletely() bool { failedZones := len(t.failuresByZone) return failedZones == t.zoneCount } @@ -150,3 +180,7 @@ func (t *zoneAwareResultTracker) getResults() []interface{} { } return results } + +func (t *zoneAwareResultTracker) getErrors() []error { + return t.errors +} diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index 1e22418ecbd..ed782739c77 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -2,6 +2,7 @@ package ring import ( "errors" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -154,6 +155,50 @@ func TestDefaultResultTracker(t *testing.T) { assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults()) }, }, + "failedCompletely() should return true only if all instances have failed, regardless of max errors": { + instances: []InstanceDesc{instance1, instance2, instance3}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + assert.False(t, tracker.failedCompletely()) + + tracker.done(&instance2, nil, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + assert.False(t, tracker.failedCompletely()) + + tracker.done(&instance3, nil, errors.New("test")) + assert.False(t, tracker.succeeded()) + assert.True(t, tracker.failed()) + assert.True(t, tracker.failedCompletely()) + }, + }, + "finished() should return true only if all instances are done": { + instances: []InstanceDesc{instance1, instance2}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) + assert.False(t, tracker.finished()) + + tracker.done(&instance2, nil, errors.New("test")) + assert.True(t, tracker.finished()) + }, + }, + "getErrors() should return list of all errors": { + instances: []InstanceDesc{instance1, instance2}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + tracker.done(&instance1, nil, errors.New("test1")) + err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1")) + assert.ElementsMatch(t, []error{err1}, tracker.getErrors()) + + tracker.done(&instance2, nil, errors.New("test2")) + err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2")) + assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors()) + }, + }, } for testName, testCase := range tests { @@ -399,7 +444,7 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.failed()) }, }, - "failInAllZones should return true only if all zones have failed, regardless of max unavailable zones": { + "failedCompletely() should return true only if all zones have failed, regardless of max unavailable zones": { instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, maxUnavailableZones: 1, run: func(t *testing.T, tracker *zoneAwareResultTracker) { @@ -407,19 +452,43 @@ func TestZoneAwareResultTracker(t *testing.T) { tracker.done(&instance1, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - assert.False(t, tracker.failedInAllZones()) + assert.False(t, tracker.failedCompletely()) // Zone-b tracker.done(&instance3, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) - assert.False(t, tracker.failedInAllZones()) + assert.False(t, tracker.failedCompletely()) // Zone-c tracker.done(&instance5, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) - assert.True(t, tracker.failedInAllZones()) + assert.True(t, tracker.failedCompletely()) + }, + }, + "finished() should return true only if all instances are done": { + instances: []InstanceDesc{instance1, instance2}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) + assert.False(t, tracker.finished()) + + tracker.done(&instance2, nil, errors.New("test")) + assert.True(t, tracker.finished()) + }, + }, + "getErrors() should return list of all errors": { + instances: []InstanceDesc{instance1, instance2}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + tracker.done(&instance1, nil, errors.New("test1")) + err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1")) + assert.ElementsMatch(t, []error{err1}, tracker.getErrors()) + + tracker.done(&instance2, nil, errors.New("test2")) + err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2")) + assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors()) }, }, }