@@ -110,6 +110,8 @@ public HttpSinkActor(
110
110
_evictedRequestsName = "sinks/http_post/" + _sink .getMetricSafeName () + "/evicted_requests" ;
111
111
_requestLatencyName = "sinks/http_post/" + _sink .getMetricSafeName () + "/request_latency" ;
112
112
_inQueueLatencyName = "sinks/http_post/" + _sink .getMetricSafeName () + "/queue_time" ;
113
+ _pendingRequestsQueueSizeName = "sinks/http_post/" + _sink .getMetricSafeName () + "/queue_size" ;
114
+ _inflightRequestsCountName = "sinks/http_post/" + _sink .getMetricSafeName () + "/inflight_count" ;
113
115
_requestSuccessName = "sinks/http_post/" + _sink .getMetricSafeName () + "/success" ;
114
116
_responseStatusName = "sinks/http_post/" + _sink .getMetricSafeName () + "/status" ;
115
117
_httpSinkAttemptsName = "sinks/http_post/" + _sink .getMetricSafeName () + "/attempts" ;
@@ -309,6 +311,7 @@ private void processEmitAggregation(final EmitAggregation emitMessage) {
309
311
.addData ("conditionsSize" , periodicData .getConditions ().size ())
310
312
.addContext ("actor" , self ())
311
313
.log ();
314
+ _periodicMetrics .recordGauge (_inflightRequestsCountName , _inflightRequestsCount );
312
315
313
316
if (!periodicData .getData ().isEmpty () || !periodicData .getConditions ().isEmpty ()) {
314
317
final Collection <RequestEntry .Builder > requestEntryBuilders = _sink .createRequests (_client , periodicData );
@@ -331,6 +334,8 @@ private void processEmitAggregation(final EmitAggregation emitMessage) {
331
334
.log ();
332
335
}
333
336
337
+ _periodicMetrics .recordGauge (_pendingRequestsQueueSizeName , _pendingRequests .size ());
338
+
334
339
if (_spreadingDelayMillis > 0 ) {
335
340
// If we don't currently have anything in-flight, we'll need to wait the spreading duration.
336
341
if (!_waiting && pendingWasEmpty ) {
@@ -442,6 +447,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) {
442
447
private final String _evictedRequestsName ;
443
448
private final String _requestLatencyName ;
444
449
private final String _inQueueLatencyName ;
450
+ private final String _pendingRequestsQueueSizeName ;
451
+ private final String _inflightRequestsCountName ;
445
452
private final String _requestSuccessName ;
446
453
private final String _responseStatusName ;
447
454
private final String _httpSinkAttemptsName ;
0 commit comments