Skip to content

Commit 158884e

Browse files
authored
Fix automatic reload. (#198)
* Fix automatic reload. * Converted to PatternsCS. * Restored future get timeouts.
1 parent d166507 commit 158884e

File tree

9 files changed

+260
-18
lines changed

9 files changed

+260
-18
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/ActorSource.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@
1919
import akka.actor.ActorSystem;
2020
import akka.actor.PoisonPill;
2121
import akka.actor.Props;
22+
import akka.pattern.PatternsCS;
23+
import com.arpnetworking.steno.Logger;
24+
import com.arpnetworking.steno.LoggerFactory;
2225
import com.fasterxml.jackson.annotation.JacksonInject;
2326
import net.sf.oval.constraint.NotEmpty;
2427
import net.sf.oval.constraint.NotNull;
28+
import scala.concurrent.duration.FiniteDuration;
2529

30+
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.TimeoutException;
2633
import java.util.function.Function;
2734

2835
/**
@@ -41,11 +48,52 @@ public void start() {
4148
@Override
4249
public void stop() {
4350
if (_actor != null) {
44-
_actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
51+
try {
52+
PatternsCS.gracefulStop(
53+
_actor,
54+
SHUTDOWN_TIMEOUT,
55+
PoisonPill.getInstance()).toCompletableFuture().get(
56+
SHUTDOWN_TIMEOUT.toMillis(),
57+
TimeUnit.MILLISECONDS);
58+
} catch (final InterruptedException e) {
59+
LOGGER.warn()
60+
.setMessage("Interrupted stopping actor source")
61+
.addData("name", getName())
62+
.addData("actor", _actor)
63+
.addData("actorName", _actorName)
64+
.log();
65+
} catch (final TimeoutException | ExecutionException e) {
66+
LOGGER.error()
67+
.setMessage("Actor source stop timed out or failed")
68+
.addData("name", getName())
69+
.addData("actor", _actor)
70+
.addData("actorName", _actorName)
71+
.addData("timeout", SHUTDOWN_TIMEOUT)
72+
.setThrowable(e)
73+
.log();
74+
}
4575
_actor = null;
4676
}
4777
}
4878

79+
/**
80+
* Return the {@link ActorSystem} used by this source.
81+
*
82+
* @return The {@link ActorSystem} used by this source.
83+
*/
84+
protected ActorSystem getActorSystem() {
85+
return _actorSystem;
86+
}
87+
88+
/**
89+
* Return an {@link ActorRef} to this source's Akka actor.
90+
*
91+
* @return An {@link ActorRef} to this source's Akka actor.
92+
*/
93+
protected ActorRef getActor() {
94+
return _actor;
95+
}
96+
4997
/**
5098
* Create a props for the actor to be created at the provided path.
5199
*
@@ -69,6 +117,9 @@ protected ActorSource(final Builder<?, ? extends ActorSource> builder) {
69117
private final String _actorName;
70118
private final ActorSystem _actorSystem;
71119

120+
private static final FiniteDuration SHUTDOWN_TIMEOUT = FiniteDuration.apply(1, TimeUnit.SECONDS);
121+
private static final Logger LOGGER = LoggerFactory.getLogger(ActorSource.class);
122+
72123
/**
73124
* ActorSource {@link BaseSource.Builder} implementation.
74125
*

src/main/java/com/arpnetworking/metrics/common/sources/BaseTcpSource.java

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import akka.actor.Props;
2121
import akka.io.Tcp;
2222
import akka.io.TcpMessage;
23+
import akka.pattern.PatternsCS;
24+
import akka.util.Timeout;
25+
import com.arpnetworking.steno.LogBuilder;
2326
import com.arpnetworking.steno.Logger;
2427
import com.arpnetworking.steno.LoggerFactory;
2528
import net.sf.oval.constraint.Min;
@@ -28,7 +31,11 @@
2831
import net.sf.oval.constraint.Range;
2932

3033
import java.net.InetSocketAddress;
34+
import java.util.concurrent.ExecutionException;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.TimeoutException;
3137
import java.util.function.Function;
38+
import javax.annotation.Nullable;
3239

3340
/**
3441
* Base source that listens on a tcp port. Subclasses should set appropriate
@@ -50,10 +57,38 @@ protected BaseTcpSource(final Builder<?, ?> builder) {
5057
_acceptQueue = builder._acceptQueue;
5158
}
5259

60+
@Override
61+
public void stop() {
62+
final ActorRef tcpManager = Tcp.get(getActorSystem()).manager();
63+
try {
64+
PatternsCS.ask(
65+
getActor(),
66+
BaseTcpListenerActor.UNBIND,
67+
UNBIND_TIMEOUT).toCompletableFuture().get(
68+
UNBIND_TIMEOUT.duration().toMillis(),
69+
TimeUnit.MILLISECONDS);
70+
} catch (final InterruptedException e) {
71+
LOGGER.warn()
72+
.setMessage("Interrupted unbinding tcp source")
73+
.addData("name", getName())
74+
.addData("tcpManager", tcpManager)
75+
.log();
76+
} catch (final TimeoutException | ExecutionException e) {
77+
LOGGER.error()
78+
.setMessage("Tcp source unbind on close timed out or failed")
79+
.addData("name", getName())
80+
.addData("tcpManager", tcpManager)
81+
.setThrowable(e)
82+
.log();
83+
}
84+
super.stop();
85+
}
86+
5387
private final String _host;
5488
private final int _port;
5589
private final int _acceptQueue;
5690

91+
private static final Timeout UNBIND_TIMEOUT = Timeout.apply(1, TimeUnit.SECONDS);
5792
private static final Logger LOGGER = LoggerFactory.getLogger(BaseTcpSource.class);
5893

5994
/**
@@ -84,13 +119,54 @@ public void preStart() {
84119
@Override
85120
public Receive createReceive() {
86121
return receiveBuilder()
87-
.matchEquals(IS_READY, message -> {
88-
getSender().tell(_isReady, getSelf());
122+
.matchEquals(IS_READY, message -> getSender().tell(_isReady, getSelf()))
123+
.matchEquals(UNBIND, unbindRequest -> {
124+
final ActorRef requester = getSender();
125+
if (_connectionActor != null) {
126+
LOGGER.debug()
127+
.setMessage("Tcp server starting unbinding...")
128+
.addData("requester", requester)
129+
.addData("connectionActor", _connectionActor)
130+
.addData("name", _sink.getName())
131+
.log();
132+
PatternsCS.ask(
133+
_connectionActor,
134+
TcpMessage.unbind(),
135+
UNBIND_TIMEOUT)
136+
.whenComplete(
137+
(response, throwable) -> {
138+
final LogBuilder logBuilder;
139+
if (throwable != null) {
140+
logBuilder = LOGGER.warn()
141+
.setThrowable(throwable);
142+
} else {
143+
logBuilder = LOGGER.info();
144+
}
145+
logBuilder
146+
.setMessage("Tcp server completed unbinding")
147+
.addData("requester", requester)
148+
.addData("connectionActor", _connectionActor)
149+
.addData("name", _sink.getName())
150+
.addData("success", throwable == null)
151+
.log();
152+
153+
requester.tell(response, getSelf());
154+
});
155+
} else {
156+
LOGGER.warn()
157+
.setMessage("Tcp server cannot unbind; no connection")
158+
.addData("sender", getSender())
159+
.addData("name", _sink.getName())
160+
.log();
161+
requester.tell(new Tcp.Unbound$(), getSelf());
162+
}
89163
})
90164
.match(Tcp.Bound.class, tcpBound -> {
165+
_connectionActor = getSender();
91166
LOGGER.info()
92167
.setMessage("Tcp server binding complete")
93168
.addData("name", _sink.getName())
169+
.addData("connectionActor", _connectionActor)
94170
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
95171
.addData("port", tcpBound.localAddress().getPort())
96172
.log();
@@ -145,12 +221,14 @@ protected BaseTcpListenerActor(final BaseTcpSource source) {
145221
}
146222

147223
private boolean _isReady = false;
224+
@Nullable private ActorRef _connectionActor = null;
148225
private final BaseTcpSource _sink;
149226
private final String _host;
150227
private final int _port;
151228
private final int _acceptQueue;
152229

153230
private static final String IS_READY = "IsReady";
231+
private static final String UNBIND = "Unbind";
154232
}
155233

156234
/**

src/main/java/com/arpnetworking/metrics/mad/Aggregator.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import akka.actor.ActorSystem;
2020
import akka.actor.PoisonPill;
2121
import akka.actor.Props;
22+
import akka.pattern.PatternsCS;
23+
import akka.util.Timeout;
2224
import com.arpnetworking.commons.builder.OvalBuilder;
25+
import com.arpnetworking.commons.java.util.concurrent.CompletableFutures;
2326
import com.arpnetworking.commons.observer.Observable;
2427
import com.arpnetworking.commons.observer.Observer;
2528
import com.arpnetworking.logback.annotations.LogValue;
@@ -42,11 +45,16 @@
4245
import net.sf.oval.constraint.NotNull;
4346

4447
import java.time.Duration;
48+
import java.util.ArrayList;
4549
import java.util.Collections;
4650
import java.util.List;
4751
import java.util.Map;
4852
import java.util.Optional;
4953
import java.util.Set;
54+
import java.util.concurrent.CompletableFuture;
55+
import java.util.concurrent.ExecutionException;
56+
import java.util.concurrent.TimeUnit;
57+
import java.util.concurrent.TimeoutException;
5058
import java.util.regex.Pattern;
5159

5260
/**
@@ -73,11 +81,32 @@ public synchronized void shutdown() {
7381
.addData("aggregator", this)
7482
.log();
7583

84+
final List<CompletableFuture<Object>> shutdownStages = new ArrayList<>();
7685
for (final List<ActorRef> actorRefList : _periodWorkerActors.values()) {
7786
for (final ActorRef actorRef : actorRefList) {
78-
actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
87+
shutdownStages.add(
88+
PatternsCS.ask(
89+
actorRef,
90+
PoisonPill.getInstance(),
91+
SHUTDOWN_TIMEOUT).toCompletableFuture());
92+
93+
7994
}
8095
}
96+
try {
97+
CompletableFutures.allOf(shutdownStages).get(
98+
SHUTDOWN_TIMEOUT.duration().toMillis(),
99+
TimeUnit.MILLISECONDS);
100+
} catch (final InterruptedException e) {
101+
LOGGER.warn()
102+
.setMessage("Interrupted waiting for actors to shutdown")
103+
.log();
104+
} catch (final TimeoutException | ExecutionException e) {
105+
LOGGER.error()
106+
.setMessage("Waiting for actors to shutdown timed out or failed")
107+
.setThrowable(e)
108+
.log();
109+
}
81110
}
82111

83112
@Override
@@ -219,6 +248,7 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
219248
private final LoadingCache<String, Optional<ImmutableSet<Statistic>>> _cachedDependentStatistics;
220249
private final Map<Key, List<ActorRef>> _periodWorkerActors = Maps.newConcurrentMap();
221250

251+
private static final Timeout SHUTDOWN_TIMEOUT = Timeout.apply(1, TimeUnit.SECONDS);
222252
private static final Logger LOGGER = LoggerFactory.getLogger(Aggregator.class);
223253

224254
/**

src/main/java/com/arpnetworking/metrics/mad/Main.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package com.arpnetworking.metrics.mad;
1717

18+
import akka.actor.ActorRef;
1819
import akka.actor.ActorSystem;
20+
import akka.actor.DeadLetter;
1921
import akka.actor.Props;
2022
import akka.actor.Terminated;
2123
import akka.dispatch.Dispatcher;
@@ -44,6 +46,7 @@
4446
import com.arpnetworking.metrics.incubator.impl.TsdPeriodicMetrics;
4547
import com.arpnetworking.metrics.jvm.ExecutorServiceMetricsRunnable;
4648
import com.arpnetworking.metrics.jvm.JvmMetricsRunnable;
49+
import com.arpnetworking.metrics.mad.actors.DeadLetterLogger;
4750
import com.arpnetworking.metrics.mad.actors.Status;
4851
import com.arpnetworking.metrics.mad.configuration.AggregatorConfiguration;
4952
import com.arpnetworking.metrics.mad.configuration.PipelineConfiguration;
@@ -225,6 +228,12 @@ private void launchActors(final Injector injector) {
225228
// Retrieve the actor system
226229
final ActorSystem actorSystem = injector.getInstance(ActorSystem.class);
227230

231+
// Create the dead letter logger
232+
if (_configuration.getLogDeadLetters()) {
233+
final ActorRef deadMailMan = actorSystem.actorOf(Props.create(DeadLetterLogger.class), "deadmailman");
234+
actorSystem.eventStream().subscribe(deadMailMan, DeadLetter.class);
235+
}
236+
228237
// Create the status actor
229238
actorSystem.actorOf(Props.create(Status.class), "status");
230239

src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.arpnetworking.metrics.mad;
1717

1818
import akka.actor.AbstractActor;
19+
import akka.actor.Cancellable;
1920
import com.arpnetworking.logback.annotations.LogValue;
2021
import com.arpnetworking.metrics.mad.model.Record;
2122
import com.arpnetworking.steno.LogValueMapFactory;
@@ -32,6 +33,7 @@
3233
import java.util.NavigableMap;
3334
import java.util.Optional;
3435
import java.util.TreeMap;
36+
import javax.annotation.Nullable;
3537

3638
/**
3739
* Actor that aggregates a particular slice of the data set over time and metric.
@@ -60,6 +62,19 @@ public void preStart() {
6062
// problem at a time).
6163
}
6264

65+
@Override
66+
public void postStop() {
67+
if (_nextScheduledRotation != null) {
68+
final boolean cancelResult = _nextScheduledRotation.cancel();
69+
_nextScheduledRotation = null;
70+
71+
LOGGER.trace()
72+
.setMessage("Shutdown canceled next scheduled rotation")
73+
.addData("cancelResult", cancelResult)
74+
.log();
75+
}
76+
}
77+
6378
@Override
6479
public AbstractActor.Receive createReceive() {
6580
return receiveBuilder()
@@ -91,7 +106,7 @@ private void scheduleRotation(final ZonedDateTime now) {
91106
timeToRotate = MINIMUM_ROTATION_CHECK_INTERVAL;
92107
}
93108

94-
context().system().scheduler().scheduleOnce(
109+
_nextScheduledRotation = context().system().scheduler().scheduleOnce(
95110
timeToRotate,
96111
self(),
97112
ROTATE_MESSAGE,
@@ -238,6 +253,7 @@ private void processRecord(final Record record) {
238253
}
239254

240255
private boolean _hasRotateScheduled;
256+
@Nullable private Cancellable _nextScheduledRotation;
241257

242258
private final Duration _period;
243259
private final Bucket.Builder _bucketBuilder;

0 commit comments

Comments
 (0)