Skip to content

Commit a33ddfa

Browse files
authored
Push request should fail when label set is out of order (#6746)
* Push request should fail when label set is out of order Signed-off-by: Alex Le <[email protected]> * updated changelog Signed-off-by: Alex Le <[email protected]> * fix test Signed-off-by: Alex Le <[email protected]> * add metric Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent 444c8e8 commit a33ddfa

File tree

4 files changed

+71
-13
lines changed

4 files changed

+71
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617
3030
* [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628
3131
* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676
32+
* [ENHANCEMENT] Ingester: Push request should fail when label set is out of order #6746
3233
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
3334
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
3435
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/ingester/ingester.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,17 @@ type extendedAppender interface {
11371137
storage.GetRef
11381138
}
11391139

1140+
func (i *Ingester) isLabelSetOutOfOrder(labels labels.Labels) bool {
1141+
last := ""
1142+
for _, l := range labels {
1143+
if strings.Compare(last, l.Name) > 0 {
1144+
return true
1145+
}
1146+
last = l.Name
1147+
}
1148+
return false
1149+
}
1150+
11401151
// Push adds metrics to a block
11411152
func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
11421153
if err := i.checkRunning(); err != nil {
@@ -1302,6 +1313,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
13021313

13031314
// Look up a reference for this series.
13041315
tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels)
1316+
if i.isLabelSetOutOfOrder(tsLabels) {
1317+
i.metrics.oooLabelsTotal.WithLabelValues(userID).Inc()
1318+
return nil, wrapWithUser(errors.Errorf("out-of-order label set found when push: %s", tsLabels), userID)
1319+
}
13051320
tsLabelsHash := tsLabels.Hash()
13061321
ref, copiedLabels := app.GetRef(tsLabels, tsLabelsHash)
13071322

pkg/ingester/ingester_test.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2365,6 +2365,42 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) {
23652365
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
23662366
}
23672367

2368+
func TestIngester_Push_OutOfOrderLabels(t *testing.T) {
2369+
// Create ingester
2370+
cfg := defaultIngesterTestConfig(t)
2371+
r := prometheus.NewRegistry()
2372+
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
2373+
require.NoError(t, err)
2374+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
2375+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
2376+
2377+
// Wait until it's ACTIVE
2378+
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
2379+
return i.lifecycler.GetState()
2380+
})
2381+
2382+
ctx := user.InjectOrgID(context.Background(), "test-user")
2383+
2384+
outOfOrderLabels := labels.Labels{
2385+
{Name: labels.MetricName, Value: "test_metric"},
2386+
{Name: "c", Value: "3"},
2387+
{Name: "a", Value: "1"}, // Out of order (a comes before c)
2388+
}
2389+
2390+
req, _ := mockWriteRequest(t, outOfOrderLabels, 1, 2)
2391+
_, err = i.Push(ctx, req)
2392+
require.Error(t, err)
2393+
require.Contains(t, err.Error(), "out-of-order label set found")
2394+
2395+
metric := `
2396+
# HELP cortex_ingester_out_of_order_labels_total The total number of out of order label found per user.
2397+
# TYPE cortex_ingester_out_of_order_labels_total counter
2398+
cortex_ingester_out_of_order_labels_total{user="test-user"} 1
2399+
`
2400+
err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_out_of_order_labels_total")
2401+
require.NoError(t, err)
2402+
}
2403+
23682404
func BenchmarkIngesterPush(b *testing.B) {
23692405
limits := defaultLimitsTestConfig()
23702406
benchmarkIngesterPush(b, limits, false)
@@ -2730,8 +2766,8 @@ func Test_Ingester_LabelNames(t *testing.T) {
27302766
value float64
27312767
timestamp int64
27322768
}{
2733-
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
2734-
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
2769+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
2770+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
27352771
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
27362772
}
27372773

@@ -2786,8 +2822,8 @@ func Test_Ingester_LabelValues(t *testing.T) {
27862822
value float64
27872823
timestamp int64
27882824
}{
2789-
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
2790-
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
2825+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
2826+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
27912827
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
27922828
}
27932829

@@ -2864,7 +2900,7 @@ func Test_Ingester_LabelValue_MaxInflightQueryRequest(t *testing.T) {
28642900
// Mock request
28652901
ctx := user.InjectOrgID(context.Background(), "test")
28662902

2867-
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
2903+
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000)
28682904
_, err = i.Push(ctx, wreq)
28692905
require.NoError(t, err)
28702906

@@ -3023,7 +3059,7 @@ func Test_Ingester_Query_MaxInflightQueryRequest(t *testing.T) {
30233059
// Mock request
30243060
ctx := user.InjectOrgID(context.Background(), "test")
30253061

3026-
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
3062+
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000)
30273063
_, err = i.Push(ctx, wreq)
30283064
require.NoError(t, err)
30293065

@@ -4904,8 +4940,8 @@ func Test_Ingester_UserStats(t *testing.T) {
49044940
value float64
49054941
timestamp int64
49064942
}{
4907-
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
4908-
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
4943+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
4944+
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
49094945
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
49104946
}
49114947

@@ -4950,8 +4986,8 @@ func Test_Ingester_AllUserStats(t *testing.T) {
49504986
value float64
49514987
timestamp int64
49524988
}{
4953-
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
4954-
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
4989+
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
4990+
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
49554991
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000},
49564992
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000},
49574993
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000},
@@ -5018,8 +5054,8 @@ func Test_Ingester_AllUserStatsHandler(t *testing.T) {
50185054
value float64
50195055
timestamp int64
50205056
}{
5021-
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
5022-
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
5057+
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
5058+
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
50235059
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000},
50245060
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000},
50255061
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000},
@@ -6476,7 +6512,7 @@ func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) {
64766512
// Mock request
64776513
ctx := user.InjectOrgID(context.Background(), "test")
64786514

6479-
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
6515+
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000)
64806516
_, err = i.Push(ctx, wreq)
64816517
require.NoError(t, err)
64826518

pkg/ingester/metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type ingesterMetrics struct {
3131
ingestedHistogramsFail prometheus.Counter
3232
ingestedExemplarsFail prometheus.Counter
3333
ingestedMetadataFail prometheus.Counter
34+
oooLabelsTotal *prometheus.CounterVec
3435
queries prometheus.Counter
3536
queriedSamples prometheus.Histogram
3637
queriedExemplars prometheus.Histogram
@@ -112,6 +113,10 @@ func newIngesterMetrics(r prometheus.Registerer,
112113
Name: "cortex_ingester_ingested_metadata_failures_total",
113114
Help: "The total number of metadata that errored on ingestion.",
114115
}),
116+
oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
117+
Name: "cortex_ingester_out_of_order_labels_total",
118+
Help: "The total number of out of order label found per user.",
119+
}, []string{"user"}),
115120
queries: promauto.With(r).NewCounter(prometheus.CounterOpts{
116121
Name: "cortex_ingester_queries_total",
117122
Help: "The total number of queries the ingester has handled.",
@@ -283,6 +288,7 @@ func newIngesterMetrics(r prometheus.Registerer,
283288
}
284289

285290
func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
291+
m.oooLabelsTotal.DeleteLabelValues(userID)
286292
m.memMetadataCreatedTotal.DeleteLabelValues(userID)
287293
m.memMetadataRemovedTotal.DeleteLabelValues(userID)
288294
m.activeSeriesPerUser.DeleteLabelValues(userID)

0 commit comments

Comments
 (0)