@@ -100,6 +100,10 @@ public HttpPostSinkActor(
100100 } else {
101101 _spreadingDelayMillis = new Random ().nextInt ((int ) spreadPeriod .toMillis ());
102102 }
103+ LOGGER .info ()
104+ .setMessage ("Http post sink actor spread period" )
105+ .addData ("spreadPeriodMillis" , _spreadingDelayMillis )
106+ .log ();
103107 _periodicMetrics = periodicMetrics ;
104108 _evictedRequestsName = "sinks/http_post/" + sink .getMetricSafeName () + "/evicted_requests" ;
105109 _requestLatencyName = "sinks/http_post/" + sink .getMetricSafeName () + "/request_latency" ;
@@ -227,22 +231,28 @@ private void processEmitAggregation(final EmitAggregation emitMessage) {
227231 .log ();
228232 }
229233
230- // If we don't currently have anything in-flight, we'll need to wait the spreading duration.
231- // If we're already waiting, these requests will be sent after the waiting is over, no need to do anything else.
232- if (pendingWasEmpty && !_waiting && _spreadingDelayMillis > 0 ) {
233- _waiting = true ;
234- LOGGER .debug ()
235- .setMessage ("Scheduling http requests for later transmission" )
236- .addData ("delayMs" , _spreadingDelayMillis )
237- .addContext ("actor" , self ())
238- .log ();
239- context ().system ().scheduler ().scheduleOnce (
240- FiniteDuration .apply (_spreadingDelayMillis , TimeUnit .MILLISECONDS ),
241- self (),
242- new WaitTimeExpired (),
243- context ().dispatcher (),
244- self ());
234+ if (_spreadingDelayMillis > 0 ) {
235+ // If we don't currently have anything in-flight, we'll need to wait the spreading duration.
236+ if (!_waiting && pendingWasEmpty ) {
237+ _waiting = true ;
238+ LOGGER .debug ()
239+ .setMessage ("Scheduling http requests for later transmission" )
240+ .addData ("delayMs" , _spreadingDelayMillis )
241+ .addContext ("actor" , self ())
242+ .log ();
243+ context ().system ().scheduler ().scheduleOnce (
244+ FiniteDuration .apply (_spreadingDelayMillis , TimeUnit .MILLISECONDS ),
245+ self (),
246+ WaitTimeExpired .getInstance (),
247+ context ().dispatcher (),
248+ self ());
249+ } else if (!_waiting ) {
250+ // If we have something in-flight continue to send without waiting
251+ dispatchPending ();
252+ }
253+ // Otherwise we're already waiting, these requests will be sent after the waiting is over, no need to do anything else.
245254 } else {
255+ // Spreading is disable, just keep dispatching the work
246256 dispatchPending ();
247257 }
248258 }
@@ -333,7 +343,7 @@ public void postStop() throws Exception {
333343 private final String _samplesDroppedName ;
334344 private final String _samplesSentName ;
335345
336- private static final Logger LOGGER = LoggerFactory .getLogger (HttpPostSink .class );
346+ private static final Logger LOGGER = LoggerFactory .getLogger (HttpPostSinkActor .class );
337347 private static final Logger POST_ERROR_LOGGER = LoggerFactory .getRateLimitLogger (HttpPostSink .class , Duration .ofSeconds (30 ));
338348 private static final Logger EVICTED_LOGGER = LoggerFactory .getRateLimitLogger (HttpPostSink .class , Duration .ofSeconds (30 ));
339349 private static final com .google .common .collect .ImmutableList <Integer > STATUS_CLASSES = ImmutableList .of (2 , 3 , 4 , 5 );
@@ -419,7 +429,15 @@ public Response getResponse() {
419429 /**
420430 * Message class to indicate that we are now able to send data.
421431 */
422- private static final class WaitTimeExpired { }
432+ private static final class WaitTimeExpired {
433+ private static final WaitTimeExpired INSTANCE = new WaitTimeExpired ();
434+
435+ public static WaitTimeExpired getInstance () {
436+ return INSTANCE ;
437+ }
438+
439+ private WaitTimeExpired () {}
440+ }
423441
424442 private static final class ResponseAsyncCompletionHandler extends AsyncCompletionHandler <Response > {
425443
0 commit comments