diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6cee42c250..3aeac63109 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -140,9 +140,9 @@ type Limits interface { EnforceLabelsOrder(tenantID string) bool IngestionRelabelingRules(tenantID string) []*relabel.Config DistributorUsageGroups(tenantID string) *validation.UsageGroupConfig + WritePathOverrides(tenantID string) writepath.Config validation.ProfileValidationLimits aggregator.Limits - writepath.Overrides } func New( @@ -184,11 +184,7 @@ func New( ingesterRoute := writepath.IngesterFunc(d.sendRequestsToIngester) segmentWriterRoute := writepath.IngesterFunc(d.sendRequestsToSegmentWriter) - d.router = writepath.NewRouter( - logger, reg, limits, - ingesterRoute, - segmentWriterRoute, - ) + d.router = writepath.NewRouter(logger, reg, ingesterRoute, segmentWriterRoute) var err error subservices := []services.Service(nil) @@ -433,7 +429,8 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push // functions to send the request to the appropriate service; these are // called independently, and may be called concurrently: the request is // cloned in this case – the callee may modify the request safely. - if err = d.router.Send(ctx, req); err != nil { + config := d.limits.WritePathOverrides(req.TenantID) + if err = d.router.Send(ctx, req, config); err != nil { return nil, err } @@ -518,7 +515,8 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.PushR Annotations: annotations, }}, } - return d.router.Send(localCtx, aggregated) + config := d.limits.WritePathOverrides(req.TenantID) + return d.router.Send(localCtx, aggregated, config) })() if sendErr != nil { _ = level.Error(d.logger).Log("msg", "failed to handle aggregation", "tenant", req.TenantID, "err", err) diff --git a/pkg/distributor/write_path/router.go b/pkg/distributor/write_path/router.go index 34e1f8f5eb..85046485d1 100644 --- a/pkg/distributor/write_path/router.go +++ b/pkg/distributor/write_path/router.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util" "github.com/grafana/pyroscope/pkg/util/connectgrpc" + "github.com/grafana/pyroscope/pkg/util/delayhandler" httputil "github.com/grafana/pyroscope/pkg/util/http" ) @@ -43,17 +44,12 @@ func (f IngesterFunc) Push( return f(ctx, req) } -type Overrides interface { - WritePathOverrides(tenantID string) Config -} - type Router struct { service services.Service inflight sync.WaitGroup - logger log.Logger - overrides Overrides - metrics *metrics + logger log.Logger + metrics *metrics ingester IngesterClient segwriter IngesterClient @@ -62,13 +58,11 @@ type Router struct { func NewRouter( logger log.Logger, registerer prometheus.Registerer, - overrides Overrides, ingester IngesterClient, segwriter IngesterClient, ) *Router { r := &Router{ logger: logger, - overrides: overrides, metrics: newMetrics(registerer), ingester: ingester, segwriter: segwriter, @@ -92,15 +86,17 @@ func (m *Router) running(ctx context.Context) error { return nil } -func (m *Router) Send(ctx context.Context, req *distributormodel.PushRequest) error { - config := m.overrides.WritePathOverrides(req.TenantID) +func (m *Router) Send(ctx context.Context, req *distributormodel.PushRequest, config Config) error { + if config.AsyncIngest { + delayhandler.CancelDelay(ctx) + } switch config.WritePath { case SegmentWriterPath: - return m.send(m.segwriterRoute(true))(ctx, req) + return m.sendToSegmentWriterOnly(ctx, req, &config) case CombinedPath: return m.sendToBoth(ctx, req, &config) default: - return m.send(m.ingesterRoute())(ctx, req) + return m.sendToIngesterOnly(ctx, req) } } @@ -160,7 +156,7 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque // The request is to be sent to both asynchronously, therefore we're // cloning it. We do not wait for the secondary request to complete. // On shutdown, however, we will wait for all inflight requests. - segwriter.client = m.sendClone(ctx, req.Clone(), segwriter.client, config) + segwriter.client = m.detachedClient(ctx, req.Clone(), segwriter.client, config) } if segwriter != nil { @@ -179,6 +175,22 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque return nil } +func (m *Router) sendToSegmentWriterOnly(ctx context.Context, req *distributormodel.PushRequest, config *Config) error { + r := m.segwriterRoute(true) + if !config.AsyncIngest { + return m.send(r)(ctx, req) + } + r.client = m.detachedClient(ctx, req, r.client, config) + m.sendAsync(ctx, req, r) + return nil +} + +func (m *Router) sendToIngesterOnly(ctx context.Context, req *distributormodel.PushRequest) error { + // NOTE(kolesnikovae): If we also want to support async requests to ingesters, + // we should implement it here and in sendToBoth. + return m.send(m.ingesterRoute())(ctx, req) +} + type sendFunc func(context.Context, *distributormodel.PushRequest) error type route struct { @@ -187,7 +199,9 @@ type route struct { primary bool } -func (m *Router) sendClone(ctx context.Context, req *distributormodel.PushRequest, client IngesterClient, config *Config) IngesterFunc { +// detachedClient creates a new IngesterFunc that wraps the call with a local context +// that has a timeout and tenant ID injected so it can be used for asynchronous requests. +func (m *Router) detachedClient(ctx context.Context, req *distributormodel.PushRequest, client IngesterClient, config *Config) IngesterFunc { return func(context.Context, *distributormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error) { localCtx, cancel := context.WithTimeout(context.Background(), config.SegmentWriterTimeout) localCtx = tenant.InjectTenantID(localCtx, req.TenantID) diff --git a/pkg/distributor/write_path/write_path_test.go b/pkg/distributor/write_path/write_path_test.go index bded0d5671..fc0d7ddb00 100644 --- a/pkg/distributor/write_path/write_path_test.go +++ b/pkg/distributor/write_path/write_path_test.go @@ -18,6 +18,7 @@ import ( distributormodel "github.com/grafana/pyroscope/pkg/distributor/model" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/test/mocks/mockwritepath" + "github.com/grafana/pyroscope/pkg/util/delayhandler" ) type routerTestSuite struct { @@ -26,24 +27,15 @@ type routerTestSuite struct { router *Router logger log.Logger registry *prometheus.Registry - overrides *mockOverrides ingester *mockwritepath.MockIngesterClient segwriter *mockwritepath.MockIngesterClient request *distributormodel.PushRequest } -type mockOverrides struct{ mock.Mock } - -func (m *mockOverrides) WritePathOverrides(tenantID string) Config { - args := m.Called(tenantID) - return args.Get(0).(Config) -} - func (s *routerTestSuite) SetupTest() { s.logger = log.NewLogfmtLogger(io.Discard) s.registry = prometheus.NewRegistry() - s.overrides = new(mockOverrides) s.ingester = new(mockwritepath.MockIngesterClient) s.segwriter = new(mockwritepath.MockIngesterClient) @@ -67,7 +59,6 @@ func (s *routerTestSuite) SetupTest() { s.router = NewRouter( s.logger, s.registry, - s.overrides, s.ingester, s.segwriter, ) @@ -78,10 +69,6 @@ func (s *routerTestSuite) BeforeTest(_, _ string) { s.Require().NoError(svc.StartAsync(context.Background())) s.Require().NoError(svc.AwaitRunning(context.Background())) s.Require().Equal(services.Running, svc.State()) - - s.overrides.AssertExpectations(s.T()) - s.ingester.AssertExpectations(s.T()) - s.segwriter.AssertExpectations(s.T()) } func (s *routerTestSuite) AfterTest(_, _ string) { @@ -90,7 +77,6 @@ func (s *routerTestSuite) AfterTest(_, _ string) { s.Require().NoError(svc.AwaitTerminated(context.Background())) s.Require().Equal(services.Terminated, svc.State()) - s.overrides.AssertExpectations(s.T()) s.ingester.AssertExpectations(s.T()) s.segwriter.AssertExpectations(s.T()) } @@ -98,27 +84,27 @@ func (s *routerTestSuite) AfterTest(_, _ string) { func TestRouterSuite(t *testing.T) { suite.Run(t, new(routerTestSuite)) } func (s *routerTestSuite) Test_IngesterPath() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: IngesterPath, - }) + } s.ingester.On("Push", mock.Anything, s.request). Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_SegmentWriterPath() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: SegmentWriterPath, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath() { @@ -129,11 +115,11 @@ func (s *routerTestSuite) Test_CombinedPath() { d = 0.3 // Allowed delta: note that f is just a probability. ) - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, IngesterWeight: 1, SegmentWriterWeight: f, - }) + } var sentIngester atomic.Uint32 s.ingester.On("Push", mock.Anything, mock.Anything). @@ -157,7 +143,7 @@ func (s *routerTestSuite) Test_CombinedPath() { for i := 0; i < w; i++ { for j := 0; j < N; j++ { - s.Assert().NoError(s.router.Send(context.Background(), s.request.Clone())) + s.Assert().NoError(s.router.Send(context.Background(), s.request.Clone(), config)) } } @@ -170,30 +156,30 @@ func (s *routerTestSuite) Test_CombinedPath() { } func (s *routerTestSuite) Test_UnspecifiedWriterPath() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{}) + config := Config{} // Default should route to ingester s.ingester.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath_ZeroWeights() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, - }) + } - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath_IngesterError() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // We ensure that request is sent to both. IngesterWeight: 1, SegmentWriterWeight: 1, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), nil). @@ -203,16 +189,16 @@ func (s *routerTestSuite) Test_CombinedPath_IngesterError() { Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). Once() - s.Assert().Error(s.router.Send(context.Background(), s.request), context.Canceled) + s.Assert().Error(s.router.Send(context.Background(), s.request, config), context.Canceled) } func (s *routerTestSuite) Test_CombinedPath_SegmentWriterError() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // We ensure that request is sent to both. IngesterWeight: 1, SegmentWriterWeight: 1, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). @@ -222,45 +208,45 @@ func (s *routerTestSuite) Test_CombinedPath_SegmentWriterError() { Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) } func (s *routerTestSuite) Test_CombinedPath_Ingester_Exclusive_Error() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // The request is only sent to ingester. IngesterWeight: 1, SegmentWriterWeight: 0, - }) + } s.ingester.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). Once() - s.Assert().Error(s.router.Send(context.Background(), s.request), context.Canceled) + s.Assert().Error(s.router.Send(context.Background(), s.request, config), context.Canceled) } func (s *routerTestSuite) Test_CombinedPath_SegmentWriter_Exclusive_Error() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: CombinedPath, // The request is only sent to segment writer. IngesterWeight: 0, SegmentWriterWeight: 1, - }) + } s.segwriter.On("Push", mock.Anything, mock.Anything). Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). Once() - s.Assert().Error(s.router.Send(context.Background(), s.request), context.Canceled) + s.Assert().Error(s.router.Send(context.Background(), s.request, config), context.Canceled) } func (s *routerTestSuite) Test_SegmentWriter_MultipleProfiles() { - s.overrides.On("WritePathOverrides", "tenant-a").Return(Config{ + config := Config{ WritePath: SegmentWriterPath, IngesterWeight: 0, SegmentWriterWeight: 1, - }) + } x := s.request.Series[0] x.Samples = append(x.Samples, &distributormodel.ProfileSample{Profile: &pprof.Profile{}}) @@ -269,5 +255,84 @@ func (s *routerTestSuite) Test_SegmentWriter_MultipleProfiles() { Return(new(connect.Response[pushv1.PushResponse]), nil). Once() - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + s.Assert().NoError(s.router.Send(context.Background(), s.request, config)) +} + +func (s *routerTestSuite) Test_AsyncIngest_Synchronous() { + config := Config{ + WritePath: SegmentWriterPath, + AsyncIngest: false, + } + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + err := s.router.Send(context.Background(), s.request, config) + s.Assert().Error(err) +} + +func (s *routerTestSuite) Test_AsyncIngest_Asynchronous() { + config := Config{ + WritePath: SegmentWriterPath, + AsyncIngest: true, + } + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + err := s.router.Send(context.Background(), s.request, config) + s.Assert().NoError(err) + + s.router.inflight.Wait() +} + +func (s *routerTestSuite) Test_AsyncIngest_CombinedPath() { + config := Config{ + WritePath: CombinedPath, + IngesterWeight: 1, + SegmentWriterWeight: 1, + AsyncIngest: true, + } + + s.ingester.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + err := s.router.Send(context.Background(), s.request, config) + s.Assert().Error(err) + + s.router.inflight.Wait() +} + +func (s *routerTestSuite) Test_AsyncIngest_DelayCanceled() { + config := Config{ + WritePath: CombinedPath, + IngesterWeight: 1, + SegmentWriterWeight: 1, + AsyncIngest: true, + } + + s.ingester.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + s.segwriter.On("Push", mock.Anything, mock.Anything). + Return(new(connect.Response[pushv1.PushResponse]), context.Canceled). + Once() + + var canceled atomic.Bool + ctx := delayhandler.WithDelayCancel(context.Background(), func() { + canceled.Store(true) + }) + + s.Assert().Error(s.router.Send(ctx, s.request, config)) + s.router.inflight.Wait() + + s.Assert().True(canceled.Load()) } diff --git a/pkg/util/delayhandler/connect.go b/pkg/util/delayhandler/connect.go index 1f0dd0842b..59549fe04e 100644 --- a/pkg/util/delayhandler/connect.go +++ b/pkg/util/delayhandler/connect.go @@ -17,6 +17,14 @@ func NewConnect(limits Limits) connect.Interceptor { func (i *delayInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, req connect.AnyRequest) (resp connect.AnyResponse, err error) { start := timeNow() + delay := getDelay(ctx, i.limits) + delayCtx := context.Background() + if delay > 0 { + var cancel context.CancelFunc + delayCtx, cancel = context.WithCancel(context.Background()) + defer cancel() + ctx = WithDelayCancel(ctx, cancel) + } // now run the chain after me resp, err = next(ctx, req) @@ -26,8 +34,10 @@ func (i *delayInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return resp, err } - // get delay from the context - delay := getDelay(ctx, i.limits) + // The delay has been cancelled down the chain. + if delayCtx.Err() != nil { + return resp, err + } // no delay, return immediately if delay <= 0 { diff --git a/pkg/util/delayhandler/connect_test.go b/pkg/util/delayhandler/connect_test.go new file mode 100644 index 0000000000..b89d89095a --- /dev/null +++ b/pkg/util/delayhandler/connect_test.go @@ -0,0 +1,89 @@ +package delayhandler + +import ( + "context" + "testing" + "time" + + "connectrpc.com/connect" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/pyroscope/pkg/tenant" +) + +func TestConnectInterceptor(t *testing.T) { + now := time.Unix(1718211600, 0) + tenantID := "tenant" + + tests := []struct { + name string + configDelay time.Duration + cancelDelay bool + expectSleep bool + expectHeader bool + }{ + { + name: "no delay", + configDelay: 0, + expectSleep: false, + expectHeader: false, + }, + { + name: "with delay", + configDelay: 100 * time.Millisecond, + expectSleep: true, + expectHeader: true, + }, + { + name: "cancelled delay", + configDelay: 100 * time.Millisecond, + cancelDelay: true, + expectSleep: false, + expectHeader: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + timeNowMock(t, []time.Time{now, now.Add(5 * time.Millisecond)}) + sleeps, cleanUpSleep := timeAfterMock() + defer cleanUpSleep() + + limits := newMockLimits() + limits.setDelay(tenantID, tt.configDelay) + + interceptor := NewConnect(limits) + + handler := connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if tt.cancelDelay { + CancelDelay(ctx) + } + return connect.NewResponse(&struct{}{}), nil + }) + + wrappedHandler := interceptor.WrapUnary(handler) + req := connect.NewRequest(&struct{}{}) + ctx := tenant.InjectTenantID(context.Background(), tenantID) + + resp, err := wrappedHandler(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + + if tt.expectSleep { + require.Len(t, sleeps.values, 1) + assert.Greater(t, sleeps.values[0], 80*time.Millisecond) + assert.Less(t, sleeps.values[0], 120*time.Millisecond) + } else { + require.Len(t, sleeps.values, 0) + } + + if tt.expectHeader { + assert.Contains(t, resp.Header().Get("Server-Timing"), "artificial_delay") + } else { + assert.Empty(t, resp.Header().Get("Server-Timing")) + } + }) + } +} diff --git a/pkg/util/delayhandler/delay.go b/pkg/util/delayhandler/delay.go index 49bc911655..74083f768a 100644 --- a/pkg/util/delayhandler/delay.go +++ b/pkg/util/delayhandler/delay.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grafana/dskit/tenant" + "github.com/grafana/pyroscope/pkg/util" ) @@ -21,10 +22,23 @@ type Limits interface { IngestionArtificialDelay(tenantID string) time.Duration } +type delayCancelCtxKey struct{} + +func CancelDelay(ctx context.Context) { + if cancel, ok := ctx.Value(delayCancelCtxKey{}).(context.CancelFunc); ok && cancel != nil { + cancel() + } +} + +func WithDelayCancel(ctx context.Context, cancel context.CancelFunc) context.Context { + return context.WithValue(ctx, delayCancelCtxKey{}, cancel) +} + func addDelayHeader(h http.Header, delay time.Duration) { durationInMs := strconv.FormatFloat(float64(delay)/float64(time.Millisecond), 'f', -1, 64) h.Add("Server-Timing", fmt.Sprintf("artificial_delay;dur=%s", durationInMs)) } + func getDelay(ctx context.Context, limits Limits) time.Duration { tenantID, err := tenant.TenantID(ctx) if err != nil { diff --git a/pkg/util/delayhandler/http.go b/pkg/util/delayhandler/http.go index 48ffe1fd6d..58b3a2259d 100644 --- a/pkg/util/delayhandler/http.go +++ b/pkg/util/delayhandler/http.go @@ -1,6 +1,7 @@ package delayhandler import ( + "context" "net/http" "time" ) @@ -68,7 +69,12 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler { delay := getDelay(r.Context(), limits) var delayRw *delayedResponseWriter + delayCtx := context.Background() if delay > 0 { + var cancel context.CancelFunc + delayCtx, cancel = context.WithCancel(delayCtx) + defer cancel() + r = r.WithContext(WithDelayCancel(r.Context(), cancel)) w, delayRw = wrapResponseWriter(w, start.Add(delay)) } @@ -85,6 +91,11 @@ func NewHTTP(limits Limits) func(h http.Handler) http.Handler { return } + // The delay has been cancelled down the chain. + if delayCtx.Err() != nil { + return + } + delayLeft := delayRw.end.Sub(timeNow()) // nothing to do if we're past the end time if delayLeft <= 0 { diff --git a/pkg/util/delayhandler/http_test.go b/pkg/util/delayhandler/http_test.go index ec04eaae4d..43ae4cbffb 100644 --- a/pkg/util/delayhandler/http_test.go +++ b/pkg/util/delayhandler/http_test.go @@ -46,13 +46,17 @@ func (m *mockLimits) setDelay(tenantID string, delay time.Duration) { // Test handler that records what happened type testHandler struct { - statusCode int - body string - called bool + statusCode int + body string + called bool + cancelDelay bool } func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.called = true + if h.cancelDelay { + CancelDelay(r.Context()) + } if h.statusCode != 0 { w.WriteHeader(h.statusCode) } @@ -120,6 +124,7 @@ func TestNewHTTP(t *testing.T) { handlerBody string handlerDelay time.Duration // delay in handler middlewareDelay time.Duration // delay in other middlewares + cancelDelay bool expectDelay bool expectDelayHeader bool }{ @@ -181,6 +186,18 @@ func TestNewHTTP(t *testing.T) { handlerBody: "slow middlewares success", middlewareDelay: 200 * time.Millisecond, }, + { + name: "enabled/cancel delay", + configDelay: 100 * time.Millisecond, + handlerBody: "success", + cancelDelay: true, + }, + { + name: "disabled/cancel delay no effect", + configDelay: 0, + handlerBody: "success", + cancelDelay: true, + }, } for _, tt := range tests { @@ -218,8 +235,9 @@ func TestNewHTTP(t *testing.T) { middleware := NewHTTP(limits) handler := &testHandler{ - statusCode: tt.handlerStatusCode, - body: tt.handlerBody, + statusCode: tt.handlerStatusCode, + body: tt.handlerBody, + cancelDelay: tt.cancelDelay, } req := httptest.NewRequest(http.MethodPost, "/test", strings.NewReader("test"))