@@ -8,6 +8,7 @@ package worker
88import (
99 "context"
1010 "fmt"
11+ "math/rand"
1112 "net/http"
1213 "strings"
1314 "time"
@@ -37,18 +38,23 @@ import (
3738 "github.com/grafana/phlare/pkg/util/httpgrpcutil"
3839)
3940
41+ func init () {
42+ rand .Seed (time .Now ().UnixNano ())
43+ }
44+
4045var processorBackoffConfig = backoff.Config {
4146 MinBackoff : 250 * time .Millisecond ,
4247 MaxBackoff : 2 * time .Second ,
4348}
4449
4550func newSchedulerProcessor (cfg Config , handler RequestHandler , log log.Logger , reg prometheus.Registerer ) (* schedulerProcessor , []services.Service ) {
4651 p := & schedulerProcessor {
47- log : log ,
48- handler : handler ,
49- maxMessageSize : cfg .GRPCClientConfig .MaxSendMsgSize ,
50- querierID : cfg .QuerierID ,
51- grpcConfig : cfg .GRPCClientConfig ,
52+ log : log ,
53+ handler : handler ,
54+ maxMessageSize : cfg .GRPCClientConfig .MaxSendMsgSize ,
55+ querierID : cfg .QuerierID ,
56+ grpcConfig : cfg .GRPCClientConfig ,
57+ maxLoopDuration : cfg .MaxLoopDuration ,
5258
5359 schedulerClientFactory : func (conn * grpc.ClientConn ) schedulerpb.SchedulerForQuerierClient {
5460 return schedulerpb .NewSchedulerForQuerierClient (conn )
@@ -78,11 +84,12 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
7884
7985// Handles incoming queries from query-scheduler.
8086type schedulerProcessor struct {
81- log log.Logger
82- handler RequestHandler
83- grpcConfig grpcclient.Config
84- maxMessageSize int
85- querierID string
87+ log log.Logger
88+ handler RequestHandler
89+ grpcConfig grpcclient.Config
90+ maxMessageSize int
91+ querierID string
92+ maxLoopDuration time.Duration
8693
8794 frontendPool * client.Pool
8895 frontendClientRequestDuration * prometheus.HistogramVec
@@ -111,42 +118,68 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
111118
112119 backoff := backoff .New (execCtx , processorBackoffConfig )
113120 for backoff .Ongoing () {
114- c , err := schedulerClient .QuerierLoop (execCtx )
115- if err == nil {
116- err = c .Send (& schedulerpb.QuerierToScheduler {QuerierID : sp .querierID })
117- }
118-
119- if err != nil {
120- level .Warn (sp .log ).Log ("msg" , "error contacting scheduler" , "err" , err , "addr" , address )
121- backoff .Wait ()
122- continue
123- }
124-
125- if err := sp .querierLoop (c , address , inflightQuery ); err != nil {
126- // Do not log an error is the query-scheduler is shutting down.
127- if s , ok := status .FromError (err ); ! ok ||
128- (! strings .Contains (s .Message (), schedulerpb .ErrSchedulerIsNotRunning .Error ()) &&
129- ! strings .Contains (s .Message (), context .DeadlineExceeded .Error ()) &&
130- ! strings .Contains (s .Message (), "stream terminated" )) {
131- level .Error (sp .log ).Log ("msg" , "error processing requests from scheduler" , "err" , err , "addr" , address )
132- }
133- if strings .Contains (err .Error (), context .DeadlineExceeded .Error ()) || strings .Contains (err .Error (), "stream terminated" ) {
134- backoff .Reset ()
135- continue
121+ func () {
122+ if err := sp .querierLoop (execCtx , schedulerClient , address , inflightQuery ); err != nil {
123+ // Do not log an error is the query-scheduler is shutting down.
124+ if s , ok := status .FromError (err ); ! ok ||
125+ (! strings .Contains (s .Message (), schedulerpb .ErrSchedulerIsNotRunning .Error ()) &&
126+ ! strings .Contains (s .Message (), context .Canceled .Error ()) &&
127+ ! strings .Contains (s .Message (), "stream terminated" )) {
128+ level .Error (sp .log ).Log ("msg" , "error processing requests from scheduler" , "err" , err , "addr" , address )
129+ }
130+ if strings .Contains (err .Error (), context .Canceled .Error ()) || strings .Contains (err .Error (), "stream terminated" ) {
131+ backoff .Reset ()
132+ return
133+ }
134+ backoff .Wait ()
135+ return
136136 }
137- backoff .Wait ()
138- continue
139- }
140137
141- backoff .Reset ()
138+ backoff .Reset ()
139+ }()
142140 }
143141}
144142
145143// process loops processing requests on an established stream.
146- func (sp * schedulerProcessor ) querierLoop (c schedulerpb.SchedulerForQuerier_QuerierLoopClient , address string , inflightQuery * atomic.Bool ) error {
147- // Build a child context so we can cancel a query when the stream is closed.
148- ctx , cancel := context .WithCancel (c .Context ())
149- defer cancel ()
144+ func (sp * schedulerProcessor ) querierLoop (parentCtx context.Context , schedulerClient schedulerpb.SchedulerForQuerierClient , address string , inflightQuery * atomic.Bool ) error {
145+ loopCtx , loopCancel := context .WithCancel (parentCtx )
146+ defer loopCancel ()
147+
148+ if sp .maxLoopDuration > 0 {
149+ go func () {
150+ timer := time .NewTimer (jitter (sp .maxLoopDuration , 0.3 ))
151+ defer timer .Stop ()
152+
153+ select {
154+ case <- timer .C :
155+ level .Debug (sp .log ).Log ("msg" , "waiting for inflight queries to complete" )
156+ for inflightQuery .Load () {
157+ select {
158+ case <- parentCtx .Done ():
159+ // In the meanwhile, the execution context has been explicitly canceled, so we should just terminate.
160+ return
161+ default :
162+ // Wait and check again inflight queries.
163+ time .Sleep (100 * time .Millisecond )
164+ }
165+ }
166+ level .Debug (sp .log ).Log ("msg" , "refreshing scheduler connection" )
167+ loopCancel ()
168+ case <- parentCtx .Done ():
169+ return
170+ }
171+ }()
172+ }
173+
174+ c , err := schedulerClient .QuerierLoop (loopCtx )
175+ if err == nil {
176+ err = c .Send (& schedulerpb.QuerierToScheduler {QuerierID : sp .querierID })
177+ }
178+
179+ if err != nil {
180+ level .Warn (sp .log ).Log ("msg" , "error contacting scheduler" , "err" , err , "addr" , address )
181+ return err
182+ }
150183
151184 for {
152185 request , err := c .Recv ()
@@ -165,7 +198,7 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
165198 defer inflightQuery .Store (false )
166199
167200 // We need to inject user into context for sending response back.
168- ctx := user .InjectOrgID (ctx , request .UserID )
201+ ctx := user .InjectOrgID (c . Context () , request .UserID )
169202
170203 tracer := opentracing .GlobalTracer ()
171204 // Ignore errors here. If we cannot get parent span, we just don't create new one.
@@ -188,6 +221,11 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
188221 }
189222}
190223
224+ func jitter (d time.Duration , factor float64 ) time.Duration {
225+ maxJitter := time .Duration (float64 (d ) * factor )
226+ return d - time .Duration (rand .Int63n (int64 (maxJitter )))
227+ }
228+
191229func (sp * schedulerProcessor ) runRequest (ctx context.Context , logger log.Logger , queryID uint64 , frontendAddress string , statsEnabled bool , request * httpgrpc.HTTPRequest ) {
192230 var stats * querier_stats.Stats
193231 if statsEnabled {
0 commit comments