Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
* [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978
* [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052
* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003
* [ENHANCEMENT] Modernizes the entire codebase by using go modernize tool. #7005
* [ENHANCEMENT] Overrides Exporter: Expose all fields that can be converted to float64. Also, the label value `max_local_series_per_metric` got renamed to `max_series_per_metric`, and `max_local_series_per_user` got renamed to `max_series_per_user`. #6979
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,10 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
// check each sample and discard if outside limits.
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
for _, ts := range req.Timeseries {
if len(ts.Labels) == 0 {
return nil, nil, nil, nil, 0, 0, 0, nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", "empty labels found")
}

if limits.AcceptHASamples && limits.AcceptMixedHASamples {
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, ts.Labels)
if cluster != "" && replica != "" {
Expand Down
63 changes: 63 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,69 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {

}

func TestPush_EmptyLabels(t *testing.T) {
t.Parallel()

var limits validation.Limits
flagext.DefaultValues(&limits)

limits.IngestionRate = math.MaxFloat64

dists, _, _, _ := prepare(t, prepConfig{
numDistributors: 1,
numIngesters: 3,
happyIngesters: 3,
})

ctx := user.InjectOrgID(context.Background(), "user")

d := dists[0]
ts := time.Now().UnixMilli()

tests := []struct {
desc string
request *cortexpb.WriteRequest
isErr bool
}{
{
desc: "1 series, a series has empty labels",
request: &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
makeWriteRequestTimeseries(
[]cortexpb.LabelAdapter{}, ts, 3, false),
},
},
isErr: true,
},
{
desc: "2 series, one series has empty labels",
request: &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
makeWriteRequestTimeseries(
[]cortexpb.LabelAdapter{}, ts, 3, false),
makeWriteRequestTimeseries(
[]cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
}, ts, 3, false),
},
},
isErr: true,
},
}

for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
_, err := d.Push(ctx, test.request)
require.Error(t, err)
s, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.Code(400), s.Code())
require.Equal(t, "empty labels found", s.Message())
})
}
}

func TestPush_QuorumError(t *testing.T) {
t.Parallel()

Expand Down
Loading