From 7e979eb850c83ee9e7a78ece20c4870e642a82a4 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Wed, 18 Jun 2025 16:45:07 +0800 Subject: [PATCH 1/6] feat(v2): add request-level write path overrides --- pkg/distributor/distributor.go | 14 ++-- pkg/distributor/write_path/router.go | 40 ++++++---- pkg/distributor/write_path/write_path_test.go | 73 ++++++++----------- 3 files changed, 60 insertions(+), 67 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6cee42c250..3aeac63109 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -140,9 +140,9 @@ type Limits interface { EnforceLabelsOrder(tenantID string) bool IngestionRelabelingRules(tenantID string) []*relabel.Config DistributorUsageGroups(tenantID string) *validation.UsageGroupConfig + WritePathOverrides(tenantID string) writepath.Config validation.ProfileValidationLimits aggregator.Limits - writepath.Overrides } func New( @@ -184,11 +184,7 @@ func New( ingesterRoute := writepath.IngesterFunc(d.sendRequestsToIngester) segmentWriterRoute := writepath.IngesterFunc(d.sendRequestsToSegmentWriter) - d.router = writepath.NewRouter( - logger, reg, limits, - ingesterRoute, - segmentWriterRoute, - ) + d.router = writepath.NewRouter(logger, reg, ingesterRoute, segmentWriterRoute) var err error subservices := []services.Service(nil) @@ -433,7 +429,8 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push // functions to send the request to the appropriate service; these are // called independently, and may be called concurrently: the request is // cloned in this case – the callee may modify the request safely. - if err = d.router.Send(ctx, req); err != nil { + config := d.limits.WritePathOverrides(req.TenantID) + if err = d.router.Send(ctx, req, config); err != nil { return nil, err } @@ -518,7 +515,8 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.PushR Annotations: annotations, }}, } - return d.router.Send(localCtx, aggregated) + config := d.limits.WritePathOverrides(req.TenantID) + return d.router.Send(localCtx, aggregated, config) })() if sendErr != nil { _ = level.Error(d.logger).Log("msg", "failed to handle aggregation", "tenant", req.TenantID, "err", err) diff --git a/pkg/distributor/write_path/router.go b/pkg/distributor/write_path/router.go index 34e1f8f5eb..6b83b0c0f9 100644 --- a/pkg/distributor/write_path/router.go +++ b/pkg/distributor/write_path/router.go @@ -43,17 +43,12 @@ func (f IngesterFunc) Push( return f(ctx, req) } -type Overrides interface { - WritePathOverrides(tenantID string) Config -} - type Router struct { service services.Service inflight sync.WaitGroup - logger log.Logger - overrides Overrides - metrics *metrics + logger log.Logger + metrics *metrics ingester IngesterClient segwriter IngesterClient @@ -62,13 +57,11 @@ type Router struct { func NewRouter( logger log.Logger, registerer prometheus.Registerer, - overrides Overrides, ingester IngesterClient, segwriter IngesterClient, ) *Router { r := &Router{ logger: logger, - overrides: overrides, metrics: newMetrics(registerer), ingester: ingester, segwriter: segwriter, @@ -92,15 +85,14 @@ func (m *Router) running(ctx context.Context) error { return nil } -func (m *Router) Send(ctx context.Context, req *distributormodel.PushRequest) error { - config := m.overrides.WritePathOverrides(req.TenantID) +func (m *Router) Send(ctx context.Context, req *distributormodel.PushRequest, config Config) error { switch config.WritePath { case SegmentWriterPath: - return m.send(m.segwriterRoute(true))(ctx, req) + return m.sendToSegmentWriterOnly(ctx, req, &config) case CombinedPath: return m.sendToBoth(ctx, req, &config) default: - return m.send(m.ingesterRoute())(ctx, req) + return m.sendToIngesterOnly(ctx, req) } } @@ -160,7 +152,7 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque // The request is to be sent to both asynchronously, therefore we're // cloning it. We do not wait for the secondary request to complete. // On shutdown, however, we will wait for all inflight requests. - segwriter.client = m.sendClone(ctx, req.Clone(), segwriter.client, config) + segwriter.client = m.detachedClient(ctx, req.Clone(), segwriter.client, config) } if segwriter != nil { @@ -179,6 +171,22 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque return nil } +func (m *Router) sendToSegmentWriterOnly(ctx context.Context, req *distributormodel.PushRequest, config *Config) error { + r := m.segwriterRoute(true) + if !config.AsyncIngest { + return m.send(r)(ctx, req) + } + r.client = m.detachedClient(ctx, req, r.client, config) + m.sendAsync(ctx, req, r) + return nil +} + +func (m *Router) sendToIngesterOnly(ctx context.Context, req *distributormodel.PushRequest) error { + // NOTE(kolesnikovae): If we also want to support async requests to ingesters, + // we should implement it here and in sendToBoth. + return m.send(m.ingesterRoute())(ctx, req) +} + type sendFunc func(context.Context, *distributormodel.PushRequest) error type route struct { @@ -187,7 +195,9 @@ type route struct { primary bool } -func (m *Router) sendClone(ctx context.Context, req *distributormodel.PushRequest, client IngesterClient, config *Config) IngesterFunc { +// detachedClient creates a new IngesterFunc that wraps the call with a local context +// that has a timeout and tenant ID injected so it can be used for asynchronous requests. +func (m *Router) detachedClient(ctx context.Context, req *distributormodel.PushRequest, client IngesterClient, config *Config) IngesterFunc { return func(context.Context, *distributormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error) { localCtx, cancel := context.WithTimeout(context.Background(), config.SegmentWriterTimeout) localCtx = tenant.InjectTenantID(localCtx, req.TenantID) diff --git a/pkg/distributor/write_path/write_path_test.go b/pkg/distributor/write_path/write_path_test.go index bded0d5671..d2ffeffaf2 100644 --- a/pkg/distributor/write_path/write_path_test.go +++ b/pkg/distributor/write_path/write_path_test.go @@ -26,24 +26,15 @@ type routerTestSuite struct { router *Router logger log.Logger registry *prometheus.Registry - overrides *mockOverrides ingester *mockwritepath.MockIngesterClient segwriter *mockwritepath.MockIngesterClient request *distributormodel.PushRequest } -type mockOverrides struct{ mock.Mock } - -func (m *mockOverrides) WritePathOverrides(tenantID string) Config { - args := m.Called(tenantID) - return args.Get(0).(Config) -} - func (s *routerTestSuite) SetupTest() { s.logger = log.NewLogfmtLogger(io.Discard) s.registry = prometheus.NewRegistry() - s.overrides = new(mockOverrides) s.ingester = new(mockwritepath.MockIngesterClient) s.segwriter = new(mockwritepath.MockIngesterClient) @@ -67,7 +58,6 @@ func (s *routerTestSuite) SetupTest() { s.router = NewRouter( s.logger, s.registry, - s.overrides, s.ingester, s.segwriter, ) @@ -78,10 +68,6 @@ func (s *routerTestSuite) BeforeTest(_, _ string) { s.Require().NoError(svc.StartAsync(context.Background())) s.Require().NoError(svc.AwaitRunning(context.Background())) s.Require().Equal(services.Running, svc.State()) - - s.overrides.AssertExpectations(s.T()) - s.ingester.AssertExpectations(s.T()) - s.segwriter.AssertExpectations(s.T()) } func (s *routerTestSuite) AfterTest(_, _ string) { @@ -90,7 +76,6 @@ func (s *routerTestSuite) AfterTest(_, _ string) { s.Require().NoError(svc.AwaitTerminated(context.Background())) s.Require().Equal(services.Terminated, svc.State()) - s.overrides.AssertExpectations(s.T()) s.ingester.AssertExpectations(s.T()) s.segwriter.AssertExpectations(s.T()) } @@ -98,27 +83,27 @@ func (s *routerTestSuite) AfterTest(_, _ string) { func TestRouterSuite(t *testing.T) { suite.Run(t, new(routerTestSuite)) } func (s *routerTestSuite) Test_IngesterPath() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: IngesterPath, - }) + } s.ingester.On("Push", mock.Anything, s.request). Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_SegmentWriterPath() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: SegmentWriterPath, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath() { @@ -129,11 +114,11 @@ func (s *routerTestSuite) Test_CombinedPath() { d = 0.3 // Allowed delta: note that f is just a probability. ) - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, IngesterWeight: 1, SegmentWriterWeight: f, - }) + } var sentIngester atomic.Uint32 s.ingester.On("Push", mock.Anything, mock.Anything). @@ -157,7 +142,7 @@ func (s *routerTestSuite) Test_CombinedPath() { for i := 0; i < w; i++ { for j := 0; j < N; j++ { - s.Assert().NoError(s.router.Send(context.Background(), s.request.Clone())) + s.Assert().NoError(s.router.Send(context.Background(), s.request.Clone(), config)) } } @@ -170,30 +155,30 @@ func (s *routerTestSuite) Test_CombinedPath() { } func (s *routerTestSuite) Test_UnspecifiedWriterPath() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{}) + config := Config{} // Default should route to ingester s.ingester.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath_ZeroWeights() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, - }) + } - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath_IngesterError() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // We ensure that request is sent to both. IngesterWeight: 1, SegmentWriterWeight: 1, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), nil). @@ -203,16 +188,16 @@ func (s *routerTestSuite) Test_CombinedPath_IngesterError() { Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). Once() - s.Assert().Error(s.router.Send(context.Background(), s.request), context.Canceled) + s.Assert().Error(s.router.Send(context.Background(), s.request, config), context.Canceled) } func (s *routerTestSuite) Test_CombinedPath_SegmentWriterError() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // We ensure that request is sent to both. IngesterWeight: 1, SegmentWriterWeight: 1, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). @@ -222,45 +207,45 @@ func (s *routerTestSuite) Test_CombinedPath_SegmentWriterError() { Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath_Ingester_Exclusive_Error() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // The request is only sent to ingester. IngesterWeight: 1, SegmentWriterWeight: 0, - }) + } s.ingester.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). Once() - s.Assert().Error(s.router.Send(context.Background(), s.request), context.Canceled) + s.Assert().Error(s.router.Send(context.Background(), s.request, config), context.Canceled) } func (s *routerTestSuite) Test_CombinedPath_SegmentWriter_Exclusive_Error() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // The request is only sent to segment writer. IngesterWeight: 0, SegmentWriterWeight: 1, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). Once() - s.Assert().Error(s.router.Send(context.Background(), s.request), context.Canceled) + s.Assert().Error(s.router.Send(context.Background(), s.request, config), context.Canceled) } func (s *routerTestSuite) Test_SegmentWriter_MultipleProfiles() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: SegmentWriterPath, IngesterWeight: 0, SegmentWriterWeight: 1, - }) + } x := s.request.Series[0] x.Samples = append(x.Samples, &distributormodel.ProfileSample{Profile: &pprof.Profile{}}) @@ -269,5 +254,5 @@ func (s *routerTestSuite) Test_SegmentWriter_MultipleProfiles() { Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } From 22f10b8137011234017b4b251c3230e48897578d Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Wed, 18 Jun 2025 19:16:46 +0800 Subject: [PATCH 2/6] feat(v2): cancellable artificial delay --- pkg/util/delayhandler/connect.go | 14 ++++- pkg/util/delayhandler/connect_test.go | 89 +++++++++++++++++++++++++++ pkg/util/delayhandler/delay.go | 10 +++ pkg/util/delayhandler/http.go | 11 ++++ pkg/util/delayhandler/http_test.go | 28 +++++++-- 5 files changed, 145 insertions(+), 7 deletions(-) create mode 100644 pkg/util/delayhandler/connect_test.go diff --git a/pkg/util/delayhandler/connect.go b/pkg/util/delayhandler/connect.go index 1f0dd0842b..f4bf3f43a1 100644 --- a/pkg/util/delayhandler/connect.go +++ b/pkg/util/delayhandler/connect.go @@ -17,6 +17,14 @@ func NewConnect(limits Limits) connect.Interceptor { func (i *delayInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, req connect.AnyRequest) (resp connect.AnyResponse, err error) { start := timeNow() + delay := getDelay(ctx, i.limits) + delayCtx := context.Background() + if delay > 0 { + var cancel context.CancelFunc + delayCtx, cancel = context.WithCancel(context.Background()) + defer cancel() + ctx = context.WithValue(ctx, delayCancelCtxKey{}, cancel) + } // now run the chain after me resp, err = next(ctx, req) @@ -26,8 +34,10 @@ func (i *delayInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return resp, err } - // get delay from the context - delay := getDelay(ctx, i.limits) + // The delay has been cancelled down the chain. + if delayCtx.Err() != nil { + return resp, err + } // no delay, return immediately if delay <= 0 { diff --git a/pkg/util/delayhandler/connect_test.go b/pkg/util/delayhandler/connect_test.go new file mode 100644 index 0000000000..b89d89095a --- /dev/null +++ b/pkg/util/delayhandler/connect_test.go @@ -0,0 +1,89 @@ +package delayhandler + +import ( + "context" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/pyroscope/pkg/tenant" +) + +func TestConnectInterceptor(t *testing.T) { + now := time.Unix(1718211600, 0) + tenantID := "tenant" + + tests := []struct { + name string + configDelay time.Duration + cancelDelay bool + expectSleep bool + expectHeader bool + }{ + { + name: "no delay", + configDelay: 0, + expectSleep: false, + expectHeader: false, + }, + { + name: "with delay", + configDelay: 100 * time.Millisecond, + expectSleep: true, + expectHeader: true, + }, + { + name: "cancelled delay", + configDelay: 100 * time.Millisecond, + cancelDelay: true, + expectSleep: false, + expectHeader: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + timeNowMock(t, []time.Time{now, now.Add(5 * time.Millisecond)}) + sleeps, cleanUpSleep := timeAfterMock() + defer cleanUpSleep() + + limits := newMockLimits() + limits.setDelay(tenantID, tt.configDelay) + + interceptor := NewConnect(limits) + + handler := connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if tt.cancelDelay { + CancelDelay(ctx) + } + return connect.NewResponse(&struct{}{}), nil + }) + + wrappedHandler := interceptor.WrapUnary(handler) + req := connect.NewRequest(&struct{}{}) + ctx := tenant.InjectTenantID(context.Background(), tenantID) + + resp, err := wrappedHandler(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + + if tt.expectSleep { + require.Len(t, sleeps.values, 1) + assert.Greater(t, sleeps.values[0], 80*time.Millisecond) + assert.Less(t, sleeps.values[0], 120*time.Millisecond) + } else { + require.Len(t, sleeps.values, 0) + } + + if tt.expectHeader { + assert.Contains(t, resp.Header().Get("Server-Timing"), "artificial_delay") + } else { + assert.Empty(t, resp.Header().Get("Server-Timing")) + } + }) + } +} diff --git a/pkg/util/delayhandler/delay.go b/pkg/util/delayhandler/delay.go index 49bc911655..1002dca0e7 100644 --- a/pkg/util/delayhandler/delay.go +++ b/pkg/util/delayhandler/delay.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grafana/dskit/tenant" + "github.com/grafana/pyroscope/pkg/util" ) @@ -21,10 +22,19 @@ type Limits interface { IngestionArtificialDelay(tenantID string) time.Duration } +type delayCancelCtxKey struct{} + +func CancelDelay(ctx context.Context) { + if cancel, ok := ctx.Value(delayCancelCtxKey{}).(context.CancelFunc); ok && cancel != nil { + cancel() + } +} + func addDelayHeader(h http.Header, delay time.Duration) { durationInMs := strconv.FormatFloat(float64(delay)/float64(time.Millisecond), 'f', -1, 64) h.Add("Server-Timing", fmt.Sprintf("artificial_delay;dur=%s", durationInMs)) } + func getDelay(ctx context.Context, limits Limits) time.Duration { tenantID, err := tenant.TenantID(ctx) if err != nil { diff --git a/pkg/util/delayhandler/http.go b/pkg/util/delayhandler/http.go index 48ffe1fd6d..81403c09d8 100644 --- a/pkg/util/delayhandler/http.go +++ b/pkg/util/delayhandler/http.go @@ -1,6 +1,7 @@ package delayhandler import ( + "context" "net/http" "time" ) @@ -68,7 +69,12 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler { delay := getDelay(r.Context(), limits) var delayRw *delayedResponseWriter + delayCtx := context.Background() if delay > 0 { + var cancel context.CancelFunc + delayCtx, cancel = context.WithCancel(delayCtx) + defer cancel() + r = r.WithContext(context.WithValue(r.Context(), delayCancelCtxKey{}, cancel)) w, delayRw = wrapResponseWriter(w, start.Add(delay)) } @@ -85,6 +91,11 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler { return } + // The delay has been cancelled down the chain. + if delayCtx.Err() != nil { + return + } + delayLeft := delayRw.end.Sub(timeNow()) // nothing to do if we're past the end time if delayLeft <= 0 { diff --git a/pkg/util/delayhandler/http_test.go b/pkg/util/delayhandler/http_test.go index ec04eaae4d..43ae4cbffb 100644 --- a/pkg/util/delayhandler/http_test.go +++ b/pkg/util/delayhandler/http_test.go @@ -46,13 +46,17 @@ func (m *mockLimits) setDelay(tenantID string, delay time.Duration) { // Test handler that records what happened type testHandler struct { - statusCode int - body string - called bool + statusCode int + body string + called bool + cancelDelay bool } func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.called = true + if h.cancelDelay { + CancelDelay(r.Context()) + } if h.statusCode != 0 { w.WriteHeader(h.statusCode) } @@ -120,6 +124,7 @@ func TestNewHTTP(t *testing.T) { handlerBody string handlerDelay time.Duration // delay in handler middlewareDelay time.Duration // delay in other middlewares + cancelDelay bool expectDelay bool expectDelayHeader bool }{ @@ -181,6 +186,18 @@ func TestNewHTTP(t *testing.T) { handlerBody: "slow middlewares success", middlewareDelay: 200 * time.Millisecond, }, + { + name: "enabled/cancel delay", + configDelay: 100 * time.Millisecond, + handlerBody: "success", + cancelDelay: true, + }, + { + name: "disabled/cancel delay no effect", + configDelay: 0, + handlerBody: "success", + cancelDelay: true, + }, } for _, tt := range tests { @@ -218,8 +235,9 @@ func TestNewHTTP(t *testing.T) { middleware := NewHTTP(limits) handler := &testHandler{ - statusCode: tt.handlerStatusCode, - body: tt.handlerBody, + statusCode: tt.handlerStatusCode, + body: tt.handlerBody, + cancelDelay: tt.cancelDelay, } req := httptest.NewRequest(http.MethodPost, "/test", strings.NewReader("test")) From be8a17c99e186dea9110878e61983ce542507b8b Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Wed, 18 Jun 2025 19:59:41 +0800 Subject: [PATCH 3/6] feat(v2): add async ingest tests --- pkg/distributor/write_path/write_path_test.go | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/pkg/distributor/write_path/write_path_test.go b/pkg/distributor/write_path/write_path_test.go index d2ffeffaf2..fe9c09c191 100644 --- a/pkg/distributor/write_path/write_path_test.go +++ b/pkg/distributor/write_path/write_path_test.go @@ -256,3 +256,55 @@ func (s *routerTestSuite) Test_SegmentWriter_MultipleProfiles() { s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } + +func (s *routerTestSuite) Test_AsyncIngest_Synchronous() { + config := Config{ + WritePath: SegmentWriterPath, + AsyncIngest: false, + } + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + err := s.router.Send(context.Background(), s.request, config) + s.Assert().Error(err) +} + +func (s *routerTestSuite) Test_AsyncIngest_Asynchronous() { + config := Config{ + WritePath: SegmentWriterPath, + AsyncIngest: true, + } + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + err := s.router.Send(context.Background(), s.request, config) + s.Assert().NoError(err) + + s.router.inflight.Wait() +} + +func (s *routerTestSuite) Test_AsyncIngest_CombinedPath() { + config := Config{ + WritePath: CombinedPath, + IngesterWeight: 1, + SegmentWriterWeight: 1, + AsyncIngest: true, + } + + s.ingester.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + err := s.router.Send(context.Background(), s.request, config) + s.Assert().Error(err) + + s.router.inflight.Wait() +} From afce63c54aabce83627fd7c00e1410092a8cd992 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Wed, 18 Jun 2025 20:33:04 +0800 Subject: [PATCH 4/6] feat(v2): add example use case for request-level overrides --- pkg/distributor/distributor.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3aeac63109..72868ab5fb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -11,6 +11,7 @@ import ( "math/rand" "net/http" "sort" + "strings" "sync" "time" @@ -52,6 +53,7 @@ import ( "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/usagestats" "github.com/grafana/pyroscope/pkg/util" + "github.com/grafana/pyroscope/pkg/util/delayhandler" "github.com/grafana/pyroscope/pkg/validation" ) @@ -430,6 +432,10 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push // called independently, and may be called concurrently: the request is // cloned in this case – the callee may modify the request safely. config := d.limits.WritePathOverrides(req.TenantID) + if isAlloyEBPFRequest(req) { + delayhandler.CancelDelay(ctx) + config.AsyncIngest = true + } if err = d.router.Send(ctx, req, config); err != nil { return nil, err } @@ -1109,3 +1115,13 @@ func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof. e.ExportSamples(n.Profile, samplesCopy) return n } + +func isAlloyEBPFRequest(series *distributormodel.PushRequest) bool { + for _, s := range series.Series { + serviceName := phlaremodel.Labels(s.Labels).Get(phlaremodel.LabelNameServiceName) + if strings.HasPrefix(serviceName, "ebpf/") { + return true + } + } + return false +} From 025ce86f554f93c1af328ba12332393f2ce89780 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 24 Jun 2025 12:50:18 +0800 Subject: [PATCH 5/6] Revert "feat(v2): add example use case for request-level overrides" This reverts commit afce63c54aabce83627fd7c00e1410092a8cd992. --- pkg/distributor/distributor.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 72868ab5fb..3aeac63109 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -11,7 +11,6 @@ import ( "math/rand" "net/http" "sort" - "strings" "sync" "time" @@ -53,7 +52,6 @@ import ( "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/usagestats" "github.com/grafana/pyroscope/pkg/util" - "github.com/grafana/pyroscope/pkg/util/delayhandler" "github.com/grafana/pyroscope/pkg/validation" ) @@ -432,10 +430,6 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push // called independently, and may be called concurrently: the request is // cloned in this case – the callee may modify the request safely. config := d.limits.WritePathOverrides(req.TenantID) - if isAlloyEBPFRequest(req) { - delayhandler.CancelDelay(ctx) - config.AsyncIngest = true - } if err = d.router.Send(ctx, req, config); err != nil { return nil, err } @@ -1115,13 +1109,3 @@ func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof. e.ExportSamples(n.Profile, samplesCopy) return n } - -func isAlloyEBPFRequest(series *distributormodel.PushRequest) bool { - for _, s := range series.Series { - serviceName := phlaremodel.Labels(s.Labels).Get(phlaremodel.LabelNameServiceName) - if strings.HasPrefix(serviceName, "ebpf/") { - return true - } - } - return false -} From 57c51085fbb3b8ca61e31118576b9e3d8d3cadcb Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Tue, 24 Jun 2025 12:50:42 +0800 Subject: [PATCH 6/6] cancel artificial delay on async ingest --- pkg/distributor/write_path/router.go | 4 +++ pkg/distributor/write_path/write_path_test.go | 28 +++++++++++++++++++ pkg/util/delayhandler/connect.go | 2 +- pkg/util/delayhandler/delay.go | 4 +++ pkg/util/delayhandler/http.go | 2 +- 5 files changed, 38 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/write_path/router.go b/pkg/distributor/write_path/router.go index 6b83b0c0f9..85046485d1 100644 --- a/pkg/distributor/write_path/router.go +++ b/pkg/distributor/write_path/router.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util" "github.com/grafana/pyroscope/pkg/util/connectgrpc" + "github.com/grafana/pyroscope/pkg/util/delayhandler" httputil "github.com/grafana/pyroscope/pkg/util/http" ) @@ -86,6 +87,9 @@ func (m *Router) running(ctx context.Context) error { } func (m *Router) Send(ctx context.Context, req *distributormodel.PushRequest, config Config) error { + if config.AsyncIngest { + delayhandler.CancelDelay(ctx) + } switch config.WritePath { case SegmentWriterPath: return m.sendToSegmentWriterOnly(ctx, req, &config) diff --git a/pkg/distributor/write_path/write_path_test.go b/pkg/distributor/write_path/write_path_test.go index fe9c09c191..fc0d7ddb00 100644 --- a/pkg/distributor/write_path/write_path_test.go +++ b/pkg/distributor/write_path/write_path_test.go @@ -18,6 +18,7 @@ import ( distributormodel "github.com/grafana/pyroscope/pkg/distributor/model" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/test/mocks/mockwritepath" + "github.com/grafana/pyroscope/pkg/util/delayhandler" ) type routerTestSuite struct { @@ -308,3 +309,30 @@ func (s *routerTestSuite) Test_AsyncIngest_CombinedPath() { s.router.inflight.Wait() } + +func (s *routerTestSuite) Test_AsyncIngest_DelayCanceled() { + config := Config{ + WritePath: CombinedPath, + IngesterWeight: 1, + SegmentWriterWeight: 1, + AsyncIngest: true, + } + + s.ingester.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + var canceled atomic.Bool + ctx := delayhandler.WithDelayCancel(context.Background(), func() { + canceled.Store(true) + }) + + s.Assert().Error(s.router.Send(ctx, s.request, config)) + s.router.inflight.Wait() + + s.Assert().True(canceled.Load()) +} diff --git a/pkg/util/delayhandler/connect.go b/pkg/util/delayhandler/connect.go index f4bf3f43a1..59549fe04e 100644 --- a/pkg/util/delayhandler/connect.go +++ b/pkg/util/delayhandler/connect.go @@ -23,7 +23,7 @@ func (i *delayInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { var cancel context.CancelFunc delayCtx, cancel = context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, delayCancelCtxKey{}, cancel) + ctx = WithDelayCancel(ctx, cancel) } // now run the chain after me diff --git a/pkg/util/delayhandler/delay.go b/pkg/util/delayhandler/delay.go index 1002dca0e7..74083f768a 100644 --- a/pkg/util/delayhandler/delay.go +++ b/pkg/util/delayhandler/delay.go @@ -30,6 +30,10 @@ func CancelDelay(ctx context.Context) { } } +func WithDelayCancel(ctx context.Context, cancel context.CancelFunc) context.Context { + return context.WithValue(ctx, delayCancelCtxKey{}, cancel) +} + func addDelayHeader(h http.Header, delay time.Duration) { durationInMs := strconv.FormatFloat(float64(delay)/float64(time.Millisecond), 'f', -1, 64) h.Add("Server-Timing", fmt.Sprintf("artificial_delay;dur=%s", durationInMs)) diff --git a/pkg/util/delayhandler/http.go b/pkg/util/delayhandler/http.go index 81403c09d8..58b3a2259d 100644 --- a/pkg/util/delayhandler/http.go +++ b/pkg/util/delayhandler/http.go @@ -74,7 +74,7 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler { var cancel context.CancelFunc delayCtx, cancel = context.WithCancel(delayCtx) defer cancel() - r = r.WithContext(context.WithValue(r.Context(), delayCancelCtxKey{}, cancel)) + r = r.WithContext(WithDelayCancel(r.Context(), cancel)) w, delayRw = wrapResponseWriter(w, start.Add(delay)) }