@@ -303,15 +303,18 @@ static class QueueEntry implements Runnable {
303303 WriteCallback cb ;
304304 Object ctx ;
305305 long enqueueTime ;
306+ long enqueueCbThreadPooleQueueTime ;
306307 boolean ackBeforeSync ;
307308
308309 OpStatsLogger journalAddEntryStats ;
310+ OpStatsLogger journalCbQueuedLatency ;
309311 Counter journalCbQueueSize ;
310312 Counter callbackTime ;
311313
312314 static QueueEntry create (ByteBuf entry , boolean ackBeforeSync , long ledgerId , long entryId ,
313315 WriteCallback cb , Object ctx , long enqueueTime , OpStatsLogger journalAddEntryStats ,
314- Counter journalCbQueueSize , Counter callbackTime ) {
316+ Counter journalCbQueueSize , OpStatsLogger journalCbQueuedLatency ,
317+ Counter callbackTime ) {
315318 QueueEntry qe = RECYCLER .get ();
316319 qe .entry = entry ;
317320 qe .ackBeforeSync = ackBeforeSync ;
@@ -321,13 +324,20 @@ static QueueEntry create(ByteBuf entry, boolean ackBeforeSync, long ledgerId, lo
321324 qe .entryId = entryId ;
322325 qe .enqueueTime = enqueueTime ;
323326 qe .journalAddEntryStats = journalAddEntryStats ;
327+ qe .journalCbQueuedLatency = journalCbQueuedLatency ;
324328 qe .journalCbQueueSize = journalCbQueueSize ;
325329 qe .callbackTime = callbackTime ;
326330 return qe ;
327331 }
328332
333+ public void setEnqueueCbThreadPooleQueueTime (long enqueueCbThreadPooleQueueTime ) {
334+ this .enqueueCbThreadPooleQueueTime = enqueueCbThreadPooleQueueTime ;
335+ }
336+
329337 @ Override
330338 public void run () {
339+ journalCbQueuedLatency .registerSuccessfulEvent (
340+ MathUtils .elapsedNanos (enqueueCbThreadPooleQueueTime ), TimeUnit .NANOSECONDS );
331341 long startTime = System .nanoTime ();
332342 if (LOG .isDebugEnabled ()) {
333343 LOG .debug ("Acknowledge Ledger: {}, Entry: {}" , ledgerId , entryId );
@@ -392,6 +402,7 @@ public int process(boolean shouldForceWrite) throws IOException {
392402 for (int i = 0 ; i < forceWriteWaiters .size (); i ++) {
393403 QueueEntry qe = forceWriteWaiters .get (i );
394404 if (qe != null ) {
405+ qe .setEnqueueCbThreadPooleQueueTime (MathUtils .nowInNano ());
395406 cbThreadPool .execute (qe );
396407 }
397408 }
@@ -943,6 +954,7 @@ public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
943954 entry , ackBeforeSync , ledgerId , entryId , cb , ctx , MathUtils .nowInNano (),
944955 journalStats .getJournalAddEntryStats (),
945956 journalStats .getJournalCbQueueSize (),
957+ journalStats .getJournalCbQueuedLatency (),
946958 callbackTime ));
947959 }
948960
@@ -952,6 +964,7 @@ void forceLedger(long ledgerId, WriteCallback cb, Object ctx) {
952964 BookieImpl .METAENTRY_ID_FORCE_LEDGER , cb , ctx , MathUtils .nowInNano (),
953965 journalStats .getJournalForceLedgerStats (),
954966 journalStats .getJournalCbQueueSize (),
967+ journalStats .getJournalCbQueuedLatency (),
955968 callbackTime ));
956969 // Increment afterwards because the add operation could fail.
957970 journalStats .getJournalQueueSize ().inc ();
@@ -1118,6 +1131,7 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
11181131 if (entry != null && (!syncData || entry .ackBeforeSync )) {
11191132 toFlush .set (i , null );
11201133 numEntriesToFlush --;
1134+ entry .setEnqueueCbThreadPooleQueueTime (MathUtils .nowInNano ());
11211135 cbThreadPool .execute (entry );
11221136 }
11231137 }
0 commit comments