@@ -42,12 +42,12 @@ public class FeedRefreshEngine {
4242
4343 private final BlockingDeque <Feed > queue ;
4444
45- private final ExecutorService feedProcessingLoopExecutor ;
46- private final ExecutorService refillLoopExecutor ;
47- private final ExecutorService refillExecutor ;
48- private final ThreadPoolExecutor workerExecutor ;
49- private final ThreadPoolExecutor databaseUpdaterExecutor ;
50- private final ThreadPoolExecutor notifierExecutor ;
45+ private ExecutorService feedProcessingLoopExecutor ;
46+ private ExecutorService refillLoopExecutor ;
47+ private ThreadPoolExecutor refillExecutor ;
48+ private ThreadPoolExecutor workerExecutor ;
49+ private ThreadPoolExecutor databaseUpdaterExecutor ;
50+ private ThreadPoolExecutor notifierExecutor ;
5151
5252 public FeedRefreshEngine (UnitOfWork unitOfWork , FeedDAO feedDAO , FeedRefreshWorker worker , FeedRefreshUpdater updater ,
5353 FeedUpdateNotifier notifier , CommaFeedConfiguration config , MetricRegistry metrics ) {
@@ -61,22 +61,26 @@ public FeedRefreshEngine(UnitOfWork unitOfWork, FeedDAO feedDAO, FeedRefreshWork
6161
6262 this .queue = new LinkedBlockingDeque <>();
6363
64+ metrics .register (MetricRegistry .name (getClass (), "queue" , "size" ), (Gauge <Integer >) queue ::size );
65+ metrics .register (MetricRegistry .name (getClass (), "worker" , "active" ), (Gauge <Integer >) () -> workerExecutor .getActiveCount ());
66+ metrics .register (MetricRegistry .name (getClass (), "updater" , "active" ),
67+ (Gauge <Integer >) () -> databaseUpdaterExecutor .getActiveCount ());
68+ metrics .register (MetricRegistry .name (getClass (), "notifier" , "active" ), (Gauge <Integer >) () -> notifierExecutor .getActiveCount ());
69+ metrics .register (MetricRegistry .name (getClass (), "notifier" , "queue" ), (Gauge <Integer >) () -> notifierExecutor .getQueue ().size ());
70+ }
71+
72+ private void createExecutors () {
6473 this .feedProcessingLoopExecutor = Executors .newSingleThreadExecutor ();
6574 this .refillLoopExecutor = Executors .newSingleThreadExecutor ();
6675 this .refillExecutor = newDiscardingSingleThreadExecutorService ();
6776 this .workerExecutor = newBlockingExecutorService (config .feedRefresh ().httpThreads ());
6877 this .databaseUpdaterExecutor = newBlockingExecutorService (config .feedRefresh ().databaseThreads ());
6978 this .notifierExecutor = newDiscardingExecutorService (config .pushNotifications ().threads (),
7079 config .pushNotifications ().queueCapacity ());
71-
72- metrics .register (MetricRegistry .name (getClass (), "queue" , "size" ), (Gauge <Integer >) queue ::size );
73- metrics .register (MetricRegistry .name (getClass (), "worker" , "active" ), (Gauge <Integer >) workerExecutor ::getActiveCount );
74- metrics .register (MetricRegistry .name (getClass (), "updater" , "active" ), (Gauge <Integer >) databaseUpdaterExecutor ::getActiveCount );
75- metrics .register (MetricRegistry .name (getClass (), "notifier" , "active" ), (Gauge <Integer >) notifierExecutor ::getActiveCount );
76- metrics .register (MetricRegistry .name (getClass (), "notifier" , "queue" ), (Gauge <Integer >) () -> notifierExecutor .getQueue ().size ());
7780 }
7881
7982 public void start () {
83+ createExecutors ();
8084 startFeedProcessingLoop ();
8185 startRefillLoop ();
8286 }
@@ -204,6 +208,8 @@ public void stop() {
204208 MoreExecutors .shutdownAndAwaitTermination (this .workerExecutor , config .shutdownTimeout ());
205209 MoreExecutors .shutdownAndAwaitTermination (this .databaseUpdaterExecutor , config .shutdownTimeout ());
206210 MoreExecutors .shutdownAndAwaitTermination (this .notifierExecutor , config .shutdownTimeout ());
211+
212+ queue .clear ();
207213 }
208214
209215 /**
0 commit comments