From 6af9b134f506dac217a5e68bdae704911eeda228 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 10 Dec 2024 23:07:53 -0800 Subject: [PATCH 1/3] [ExpandedPostingsCache] Quering and adding series concurrently can cache wrong results Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 63 +++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f64368877fa..d1f10cd2748 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5273,6 +5273,69 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { wg.Wait() } +func TestExpandedCachePostings_Race(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + + wg := sync.WaitGroup{} + + for j := 0; j < 100; j++ { + metricName := fmt.Sprintf("test_metric_%d", j) + wg.Add(200) + for k := 0; k < 100; k++ { + + go func() { + defer wg.Done() + _, err := i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, metricName, "k", strconv.Itoa(k))}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + + go func() { + defer wg.Done() + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, + }, &mockQueryStreamServer{ctx: ctx}) + require.NoError(t, err) + }() + } + + wg.Wait() + + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, + }, s) + require.NoError(t, err) + + set, err := seriesSetFromResponseStream(s) + require.NoError(t, err) + res, err := client.MatrixFromSeriesSet(set) + require.NoError(t, err) + require.Equal(t, 100, res.Len()) + } +} + func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} From d725e9830fb78547f73df2d2422feedcff0d9766 Mon Sep 17 00:00:00 2001 From: alanprot Date: Wed, 11 Dec 2024 12:06:42 -0800 Subject: [PATCH 2/3] Expiring the series after commit call Signed-off-by: alanprot --- pkg/ingester/ingester.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d22e1f6dc26..27d094c4416 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1204,6 +1204,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Walk the samples, appending them to the users database app := db.Appender(ctx).(extendedAppender) + var newSeries []labels.Labels + for _, ts := range req.Timeseries { // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -1233,6 +1235,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) // Retain the reference in case there are multiple samples for the series. if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil { + // Keep track of what series needs to be expired on the postings cache + if db.postingCache != nil { + newSeries = append(newSeries, copiedLabels) + } succeededSamplesCount++ continue } @@ -1274,6 +1280,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Copy the label set because both TSDB and the active series tracker may retain it. copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels) if ref, err = app.AppendHistogram(0, copiedLabels, hp.TimestampMs, h, fh); err == nil { + // Keep track of what series needs to be expired on the postings cache + if db.postingCache != nil { + newSeries = append(newSeries, copiedLabels) + } succeededHistogramsCount++ continue } @@ -1342,6 +1352,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if err := app.Commit(); err != nil { return nil, wrapWithUser(err, userID) } + + // This is a workaround of https://github.com/prometheus/prometheus/pull/15579 + // Calling expire here may result in the series names being expired multiple times, + // as there may be multiple Push operations concurrently for the same new timeseries. + // TODO: alanprot remove this when/if the PR is merged + if db.postingCache != nil { + for _, s := range newSeries { + db.postingCache.ExpireSeries(s) + } + } + i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds()) // If only invalid samples or histograms are pushed, don't change "last update", as TSDB was not modified. From 1cbcc8b65781d30d19c11f34b8b895db6ad25397 Mon Sep 17 00:00:00 2001 From: alanprot Date: Wed, 11 Dec 2024 13:01:20 -0800 Subject: [PATCH 3/3] Adding option to run tests with no-race check Signed-off-by: alanprot --- .github/workflows/test-build-deploy.yml | 18 +++++ Makefile | 3 + pkg/ingester/ingester_no_race_test.go | 90 +++++++++++++++++++++++++ pkg/ingester/ingester_test.go | 63 ----------------- 4 files changed, 111 insertions(+), 63 deletions(-) create mode 100644 pkg/ingester/ingester_no_race_test.go diff --git a/.github/workflows/test-build-deploy.yml b/.github/workflows/test-build-deploy.yml index ac250b513fd..d68a32d6a04 100644 --- a/.github/workflows/test-build-deploy.yml +++ b/.github/workflows/test-build-deploy.yml @@ -61,6 +61,24 @@ jobs: ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex - name: Run Tests run: make BUILD_IN_CONTAINER=false test + test-no-race: + runs-on: ubuntu-20.04 + container: + image: quay.io/cortexproject/build-image:master-0ddced051 + steps: + - name: Checkout Repo + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Setup Git safe.directory + run: | + echo "this step is needed because when running in container, actions/checkout does not set safe.directory effectively." + echo "See https://github.com/actions/runner/issues/2033. We should use --system instead of --global" + git config --system --add safe.directory $GITHUB_WORKSPACE + - name: Sym Link Expected Path to Workspace + run: | + mkdir -p /go/src/github.com/cortexproject/cortex + ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex + - name: Run Tests + run: make BUILD_IN_CONTAINER=false test-no-race security: name: CodeQL diff --git a/Makefile b/Makefile index f19e7e03405..1a145e3f726 100644 --- a/Makefile +++ b/Makefile @@ -218,6 +218,9 @@ lint: test: go test -tags netgo -timeout 30m -race -count 1 ./... +test-no-race: + go test -tags netgo -timeout 30m -count 1 ./... + cover: $(eval COVERDIR := $(shell mktemp -d coverage.XXXXXXXXXX)) $(eval COVERFILE := $(shell mktemp $(COVERDIR)/unit.XXXXXXXXXX)) diff --git a/pkg/ingester/ingester_no_race_test.go b/pkg/ingester/ingester_no_race_test.go new file mode 100644 index 00000000000..656a7ab28c4 --- /dev/null +++ b/pkg/ingester/ingester_no_race_test.go @@ -0,0 +1,90 @@ +//go:build !race + +package ingester + +import ( + "context" + "fmt" + "math" + "strconv" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" +) + +// Running this test without race check as there is a known prometheus race condition. +// See https://github.com/prometheus/prometheus/pull/15141 and https://github.com/prometheus/prometheus/pull/15316 +func TestExpandedCachePostings_Race(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + + wg := sync.WaitGroup{} + labelNames := 100 + seriesPerLabelName := 200 + + for j := 0; j < labelNames; j++ { + metricName := fmt.Sprintf("test_metric_%d", j) + wg.Add(seriesPerLabelName * 2) + for k := 0; k < seriesPerLabelName; k++ { + go func() { + defer wg.Done() + _, err := i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, metricName, "k", strconv.Itoa(k))}, + []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + }() + + go func() { + defer wg.Done() + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, + }, &mockQueryStreamServer{ctx: ctx}) + require.NoError(t, err) + }() + } + + wg.Wait() + + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, + }, s) + require.NoError(t, err) + + set, err := seriesSetFromResponseStream(s) + require.NoError(t, err) + res, err := client.MatrixFromSeriesSet(set) + require.NoError(t, err) + require.Equal(t, seriesPerLabelName, res.Len()) + } +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index d1f10cd2748..f64368877fa 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5273,69 +5273,6 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { wg.Wait() } -func TestExpandedCachePostings_Race(t *testing.T) { - cfg := defaultIngesterTestConfig(t) - cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} - cfg.LifecyclerConfig.JoinAfter = 0 - cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true - - r := prometheus.NewRegistry() - i, err := prepareIngesterWithBlocksStorage(t, cfg, r) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - - // Wait until the ingester is ACTIVE - test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { - return i.lifecycler.GetState() - }) - - ctx := user.InjectOrgID(context.Background(), "test") - - wg := sync.WaitGroup{} - - for j := 0; j < 100; j++ { - metricName := fmt.Sprintf("test_metric_%d", j) - wg.Add(200) - for k := 0; k < 100; k++ { - - go func() { - defer wg.Done() - _, err := i.Push(ctx, cortexpb.ToWriteRequest( - []labels.Labels{labels.FromStrings(labels.MetricName, metricName, "k", strconv.Itoa(k))}, - []cortexpb.Sample{{Value: 1, TimestampMs: 9}}, nil, nil, cortexpb.API)) - require.NoError(t, err) - }() - - go func() { - defer wg.Done() - err := i.QueryStream(&client.QueryRequest{ - StartTimestampMs: 0, - EndTimestampMs: math.MaxInt64, - Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, - }, &mockQueryStreamServer{ctx: ctx}) - require.NoError(t, err) - }() - } - - wg.Wait() - - s := &mockQueryStreamServer{ctx: ctx} - err = i.QueryStream(&client.QueryRequest{ - StartTimestampMs: 0, - EndTimestampMs: math.MaxInt64, - Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}}, - }, s) - require.NoError(t, err) - - set, err := seriesSetFromResponseStream(s) - require.NoError(t, err) - res, err := client.MatrixFromSeriesSet(set) - require.NoError(t, err) - require.Equal(t, 100, res.Len()) - } -} - func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}