1515 */
1616package com .arpnetworking .metrics .mad ;
1717
18+ import akka .actor .ActorRef ;
19+ import akka .actor .ActorSystem ;
20+ import akka .actor .PoisonPill ;
21+ import akka .actor .Props ;
1822import com .arpnetworking .commons .builder .OvalBuilder ;
1923import com .arpnetworking .commons .observer .Observable ;
2024import com .arpnetworking .commons .observer .Observer ;
3539import com .google .common .collect .ImmutableSet ;
3640import com .google .common .collect .Lists ;
3741import com .google .common .collect .Maps ;
38- import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
3942import net .sf .oval .constraint .NotNull ;
4043
4144import java .time .Duration ;
4447import java .util .Map ;
4548import java .util .Optional ;
4649import java .util .Set ;
47- import java .util .concurrent .ExecutorService ;
48- import java .util .concurrent .Executors ;
49- import java .util .concurrent .TimeUnit ;
5050import java .util .regex .Pattern ;
5151
5252/**
5656 * @author Ville Koskela (ville dot koskela at inscopemetrics dot io)
5757 * @author Ryan Ascheman (rascheman at groupon dot com)
5858 */
59- // NOTE: The _periodWorkerExecutor is accessed both in synchronized lifecycle methods like launch() and shutdown() but
60- // also non-synchronized methods like notify(). Access to _periodWorkerExecutor does not need to be synchronized.
61- @ SuppressFBWarnings ("IS2_INCONSISTENT_SYNC" )
6259public final class Aggregator implements Observer , Launchable {
6360
6461 @ Override
@@ -67,11 +64,6 @@ public synchronized void launch() {
6764 .setMessage ("Launching aggregator" )
6865 .addData ("aggregator" , this )
6966 .log ();
70-
71- _periodWorkers .clear ();
72- if (!_periods .isEmpty ()) {
73- _periodWorkerExecutor = Executors .newCachedThreadPool (r -> new Thread (r , "PeriodWorker" ));
74- }
7567 }
7668
7769 @ Override
@@ -81,18 +73,10 @@ public synchronized void shutdown() {
8173 .addData ("aggregator" , this )
8274 .log ();
8375
84- for (final List <PeriodWorker > periodCloserList : _periodWorkers .values ()) {
85- periodCloserList .forEach (com .arpnetworking .metrics .mad .PeriodWorker ::shutdown );
86- }
87- _periodWorkers .clear ();
88- if (_periodWorkerExecutor != null ) {
89- _periodWorkerExecutor .shutdown ();
90- try {
91- _periodWorkerExecutor .awaitTermination (10 , TimeUnit .SECONDS );
92- } catch (final InterruptedException e ) {
93- LOGGER .warn ("Unable to shutdown period worker executor" , e );
76+ for (final List <ActorRef > actorRefList : _periodWorkerActors .values ()) {
77+ for (final ActorRef actorRef : actorRefList ) {
78+ actorRef .tell (PoisonPill .getInstance (), ActorRef .noSender ());
9479 }
95- _periodWorkerExecutor = null ;
9680 }
9781 }
9882
@@ -109,12 +93,13 @@ public void notify(final Observable observable, final Object event) {
10993 final Record record = (Record ) event ;
11094 final Key key = new DefaultKey (record .getDimensions ());
11195 LOGGER .trace ()
112- .setMessage ("Processing record" )
96+ .setMessage ("Sending record to aggregation actor " )
11397 .addData ("record" , record )
11498 .addData ("key" , key )
11599 .log ();
116- for (final PeriodWorker periodWorker : _periodWorkers .computeIfAbsent (key , this ::createPeriodWorkers )) {
117- periodWorker .record (record );
100+
101+ for (final ActorRef periodWorkerActor : _periodWorkerActors .computeIfAbsent (key , this ::createActors )) {
102+ periodWorkerActor .tell (record , ActorRef .noSender ());
118103 }
119104 }
120105
@@ -130,7 +115,6 @@ public Object toLogValue() {
130115 .put ("timerStatistics" , _specifiedTimerStatistics )
131116 .put ("counterStatistics" , _specifiedCounterStatistics )
132117 .put ("gaugeStatistics" , _specifiedGaugeStatistics )
133- .put ("periodWorkers" , _periodWorkers )
134118 .build ();
135119 }
136120
@@ -139,30 +123,25 @@ public String toString() {
139123 return toLogValue ().toString ();
140124 }
141125
142- private List <PeriodWorker > createPeriodWorkers (final Key key ) {
143- final List <PeriodWorker > periodWorkerList = Lists .newArrayListWithExpectedSize (_periods .size ());
126+ private List <ActorRef > createActors (final Key key ) {
127+ final List <ActorRef > periodWorkerList = Lists .newArrayListWithExpectedSize (_periods .size ());
144128 for (final Duration period : _periods ) {
145- final PeriodWorker periodWorker = new PeriodWorker .Builder ()
129+ final Bucket .Builder bucketBuilder = new Bucket .Builder ()
130+ .setKey (key )
131+ .setSpecifiedCounterStatistics (_specifiedCounterStatistics )
132+ .setSpecifiedGaugeStatistics (_specifiedGaugeStatistics )
133+ .setSpecifiedTimerStatistics (_specifiedTimerStatistics )
134+ .setDependentCounterStatistics (_dependentCounterStatistics )
135+ .setDependentGaugeStatistics (_dependentGaugeStatistics )
136+ .setDependentTimerStatistics (_dependentTimerStatistics )
137+ .setSpecifiedStatistics (_cachedSpecifiedStatistics )
138+ .setDependentStatistics (_cachedDependentStatistics )
146139 .setPeriod (period )
147- .setBucketBuilder (
148- new Bucket .Builder ()
149- .setKey (key )
150- .setSpecifiedCounterStatistics (_specifiedCounterStatistics )
151- .setSpecifiedGaugeStatistics (_specifiedGaugeStatistics )
152- .setSpecifiedTimerStatistics (_specifiedTimerStatistics )
153- .setDependentCounterStatistics (_dependentCounterStatistics )
154- .setDependentGaugeStatistics (_dependentGaugeStatistics )
155- .setDependentTimerStatistics (_dependentTimerStatistics )
156- .setSpecifiedStatistics (_cachedSpecifiedStatistics )
157- .setDependentStatistics (_cachedDependentStatistics )
158- .setPeriod (period )
159- .setSink (_sink ))
160- .build ();
161- periodWorkerList .add (periodWorker );
162- _periodWorkerExecutor .execute (periodWorker );
140+ .setSink (_sink );
141+ periodWorkerList .add (_actorSystem .actorOf (Props .create (PeriodWorker .class , period , bucketBuilder )));
163142 }
164143 LOGGER .debug ()
165- .setMessage ("Created period workers " )
144+ .setMessage ("Created period worker actors " )
166145 .addData ("key" , key )
167146 .addData ("periodWorkersSize" , periodWorkerList .size ())
168147 .log ();
@@ -178,6 +157,7 @@ private ImmutableSet<Statistic> computeDependentStatistics(final ImmutableSet<St
178157 }
179158
180159 private Aggregator (final Builder builder ) {
160+ _actorSystem = builder ._actorSystem ;
181161 _periods = ImmutableSet .copyOf (builder ._periods );
182162 _sink = builder ._sink ;
183163 _specifiedCounterStatistics = ImmutableSet .copyOf (builder ._counterStatistics );
@@ -223,8 +203,9 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
223203 return statistics .map (statisticImmutableSet -> computeDependentStatistics (statisticImmutableSet ));
224204 }
225205 });
226- }
206+ }
227207
208+ private final ActorSystem _actorSystem ;
228209 private final ImmutableSet <Duration > _periods ;
229210 private final Sink _sink ;
230211 private final ImmutableSet <Statistic > _specifiedTimerStatistics ;
@@ -236,9 +217,7 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
236217 private final ImmutableMap <Pattern , ImmutableSet <Statistic >> _statistics ;
237218 private final LoadingCache <String , Optional <ImmutableSet <Statistic >>> _cachedSpecifiedStatistics ;
238219 private final LoadingCache <String , Optional <ImmutableSet <Statistic >>> _cachedDependentStatistics ;
239- private final Map <Key , List <PeriodWorker >> _periodWorkers = Maps .newConcurrentMap ();
240-
241- private ExecutorService _periodWorkerExecutor = null ;
220+ private final Map <Key , List <ActorRef >> _periodWorkerActors = Maps .newConcurrentMap ();
242221
243222 private static final Logger LOGGER = LoggerFactory .getLogger (Aggregator .class );
244223
@@ -254,6 +233,17 @@ public Builder() {
254233 super (Aggregator ::new );
255234 }
256235
236+ /**
237+ * Set the Akka {@link ActorSystem}. Cannot be null.
238+ *
239+ * @param value The Akka {@link ActorSystem}.
240+ * @return This <code>Builder</code> instance.
241+ */
242+ public Builder setActorSystem (final ActorSystem value ) {
243+ _actorSystem = value ;
244+ return this ;
245+ }
246+
257247 /**
258248 * Set the sink. Cannot be null or empty.
259249 *
@@ -321,6 +311,8 @@ public Builder setStatistics(final Map<String, Set<Statistic>> value) {
321311 return this ;
322312 }
323313
314+ @ NotNull
315+ private ActorSystem _actorSystem ;
324316 @ NotNull
325317 private Sink _sink ;
326318 @ NotNull
0 commit comments