Skip to content

Commit 2c21a4c

Browse files
authored
Fix Actor Reaping Bottleneck (#242)
* Refactor sample metrics. * Minor refactoring. * Refactored more actor message handlers. * Disable idle check in period worker. * Key not serializable. * Move and retype _periodWorkers map. * Remove key from props. * Undo aggregator as actor. * partial restore. * Multiple aggregator actors. * Hash the work between the aggregator actors. * Per aggregator actor metrics. * Debug logging. * Fixed! * Remove detailed metrics. * Code review feedback.
1 parent e157cf7 commit 2c21a4c

File tree

11 files changed

+445
-341
lines changed

11 files changed

+445
-341
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/BaseTcpSource.java

Lines changed: 78 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -120,79 +120,88 @@ public void preStart() {
120120
public Receive createReceive() {
121121
return receiveBuilder()
122122
.matchEquals(IS_READY, message -> getSender().tell(_isReady, getSelf()))
123-
.matchEquals(UNBIND, unbindRequest -> {
124-
final ActorRef requester = getSender();
125-
if (_connectionActor != null) {
126-
LOGGER.debug()
127-
.setMessage("Tcp server starting unbinding...")
128-
.addData("requester", requester)
129-
.addData("connectionActor", _connectionActor)
130-
.addData("name", _sink.getName())
131-
.log();
132-
PatternsCS.ask(
133-
_connectionActor,
134-
TcpMessage.unbind(),
135-
UNBIND_TIMEOUT)
136-
.whenComplete(
137-
(response, throwable) -> {
138-
final LogBuilder logBuilder;
139-
if (throwable != null) {
140-
logBuilder = LOGGER.warn()
141-
.setThrowable(throwable);
142-
} else {
143-
logBuilder = LOGGER.info();
144-
}
145-
logBuilder
146-
.setMessage("Tcp server completed unbinding")
147-
.addData("requester", requester)
148-
.addData("connectionActor", _connectionActor)
149-
.addData("name", _sink.getName())
150-
.addData("success", throwable == null)
151-
.log();
123+
.matchEquals(UNBIND, unbindRequest -> socketUnbind())
124+
.match(Tcp.Bound.class, this::socketBound)
125+
.match(Tcp.CommandFailed.class, this::commandFailed)
126+
.match(Tcp.Connected.class, this::connected)
127+
.build();
128+
}
152129

153-
requester.tell(response, getSelf());
154-
});
155-
} else {
156-
LOGGER.warn()
157-
.setMessage("Tcp server cannot unbind; no connection")
158-
.addData("sender", getSender())
159-
.addData("name", _sink.getName())
160-
.log();
161-
requester.tell(new Tcp.Unbound$(), getSelf());
162-
}
163-
})
164-
.match(Tcp.Bound.class, tcpBound -> {
165-
_connectionActor = getSender();
166-
LOGGER.info()
167-
.setMessage("Tcp server binding complete")
168-
.addData("name", _sink.getName())
169-
.addData("connectionActor", _connectionActor)
170-
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
171-
.addData("port", tcpBound.localAddress().getPort())
172-
.log();
130+
private void socketUnbind() {
131+
final ActorRef requester = getSender();
132+
if (_connectionActor != null) {
133+
LOGGER.debug()
134+
.setMessage("Tcp server starting unbinding...")
135+
.addData("requester", requester)
136+
.addData("connectionActor", _connectionActor)
137+
.addData("name", _sink.getName())
138+
.log();
139+
PatternsCS.ask(
140+
_connectionActor,
141+
TcpMessage.unbind(),
142+
UNBIND_TIMEOUT)
143+
.whenComplete(
144+
(response, throwable) -> {
145+
final LogBuilder logBuilder;
146+
if (throwable != null) {
147+
logBuilder = LOGGER.warn()
148+
.setThrowable(throwable);
149+
} else {
150+
logBuilder = LOGGER.info();
151+
}
152+
logBuilder
153+
.setMessage("Tcp server completed unbinding")
154+
.addData("requester", requester)
155+
.addData("connectionActor", _connectionActor)
156+
.addData("name", _sink.getName())
157+
.addData("success", throwable == null)
158+
.log();
173159

174-
_isReady = true;
175-
})
176-
.match(Tcp.CommandFailed.class, failed -> {
177-
LOGGER.warn()
178-
.setMessage("Tcp server bad command")
179-
.addData("name", _sink.getName())
180-
.log();
160+
requester.tell(response, getSelf());
161+
});
162+
} else {
163+
LOGGER.warn()
164+
.setMessage("Tcp server cannot unbind; no connection")
165+
.addData("sender", getSender())
166+
.addData("name", _sink.getName())
167+
.log();
168+
requester.tell(new Tcp.Unbound$(), getSelf());
169+
}
170+
}
181171

182-
getContext().stop(getSelf());
183-
})
184-
.match(Tcp.Connected.class, tcpConnected -> {
185-
LOGGER.debug()
186-
.setMessage("Tcp connection established")
187-
.addData("name", _sink.getName())
188-
.addData("remoteAddress", tcpConnected.remoteAddress().getAddress().getHostAddress())
189-
.addData("remotePort", tcpConnected.remoteAddress().getPort())
190-
.log();
172+
private void socketBound(final Tcp.Bound tcpBound) {
173+
_connectionActor = getSender();
174+
LOGGER.info()
175+
.setMessage("Tcp server binding complete")
176+
.addData("name", _sink.getName())
177+
.addData("connectionActor", _connectionActor)
178+
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
179+
.addData("port", tcpBound.localAddress().getPort())
180+
.log();
191181

192-
final ActorRef handler = createHandler(_sink, tcpConnected);
193-
getSender().tell(TcpMessage.register(handler), getSelf());
194-
})
195-
.build();
182+
_isReady = true;
183+
}
184+
185+
private void commandFailed(final Tcp.CommandFailed failure) {
186+
LOGGER.warn()
187+
.setMessage("Tcp server bad command")
188+
.addData("name", _sink.getName())
189+
.setThrowable(failure.cause().getOrElse(null))
190+
.log();
191+
192+
getContext().stop(getSelf());
193+
}
194+
195+
private void connected(final Tcp.Connected tcpConnected) {
196+
LOGGER.debug()
197+
.setMessage("Tcp connection established")
198+
.addData("name", _sink.getName())
199+
.addData("remoteAddress", tcpConnected.remoteAddress().getAddress().getHostAddress())
200+
.addData("remotePort", tcpConnected.remoteAddress().getPort())
201+
.log();
202+
203+
final ActorRef handler = createHandler(_sink, tcpConnected);
204+
getSender().tell(TcpMessage.register(handler), getSelf());
196205
}
197206

198207
/**

src/main/java/com/arpnetworking/metrics/common/sources/HttpSource.java

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.List;
5858
import java.util.concurrent.CompletableFuture;
5959
import java.util.concurrent.CompletionStage;
60+
import java.util.concurrent.atomic.AtomicLong;
6061
import java.util.function.Function;
6162

6263
/**
@@ -102,33 +103,35 @@ protected HttpSource(final Builder<?, ? extends HttpSource> builder) {
102103
@Override
103104
public Receive createReceive() {
104105
return receiveBuilder()
105-
.match(RequestReply.class, requestReply -> {
106-
// TODO(barp): Fix the ugly HttpRequest cast here due to java vs scala dsl
107-
akka.stream.javadsl.Source.single(requestReply.getRequest())
108-
.log("http source stream failure")
109-
.via(_processGraph)
110-
.toMat(_sink, Keep.right())
111-
.run(_materializer)
112-
.whenComplete((done, err) -> {
113-
final CompletableFuture<HttpResponse> responseFuture = requestReply.getResponse();
114-
if (err == null) {
115-
responseFuture.complete(HttpResponse.create().withStatus(200));
116-
} else {
117-
BAD_REQUEST_LOGGER.warn()
118-
.setMessage("Error handling http post")
119-
.setThrowable(err)
120-
.log();
121-
if (err instanceof ParsingException) {
122-
responseFuture.complete(HttpResponse.create().withStatus(400));
123-
} else {
124-
responseFuture.complete(HttpResponse.create().withStatus(500));
125-
}
126-
}
127-
});
128-
})
106+
.match(RequestReply.class, this::requestReply)
129107
.build();
130108
}
131109

110+
private void requestReply(final RequestReply requestReply) {
111+
// TODO(barp): Fix the ugly HttpRequest cast here due to java vs scala dsl
112+
akka.stream.javadsl.Source.single(requestReply.getRequest())
113+
.log("http source stream failure")
114+
.via(_processGraph)
115+
.toMat(_sink, Keep.right())
116+
.run(_materializer)
117+
.whenComplete((done, err) -> {
118+
final CompletableFuture<HttpResponse> responseFuture = requestReply.getResponse();
119+
if (err == null) {
120+
responseFuture.complete(HttpResponse.create().withStatus(200));
121+
} else {
122+
BAD_REQUEST_LOGGER.warn()
123+
.setMessage("Error handling http post")
124+
.setThrowable(err)
125+
.log();
126+
if (err instanceof ParsingException) {
127+
responseFuture.complete(HttpResponse.create().withStatus(400));
128+
} else {
129+
responseFuture.complete(HttpResponse.create().withStatus(500));
130+
}
131+
}
132+
});
133+
}
134+
132135
/**
133136
* Constructor.
134137
*
@@ -144,6 +147,17 @@ public Receive createReceive() {
144147
.withSupervisionStrategy(Supervision.stoppingDecider()),
145148
context());
146149

150+
_periodicMetrics.registerPolledMetric(m -> {
151+
// TODO(vkoskela): There needs to be a way to deregister these callbacks
152+
// This is not an immediate issue since new Aggregator instances are
153+
// only created when pipelines are reloaded. To avoid recording values
154+
// for dead pipelines this explicitly avoids recording zeroes.
155+
final long samples = _receivedSamples.getAndSet(0);
156+
if (samples > 0) {
157+
m.recordGauge(String.format("sources/http/%s/metric_samples", _metricSafeName), samples);
158+
}
159+
});
160+
147161
_processGraph = GraphDSL.create(builder -> {
148162

149163
// Flows
@@ -218,19 +232,22 @@ private List<Record> parseRecords(final com.arpnetworking.metrics.mad.model.Http
218232
}
219233
}
220234
}
221-
_periodicMetrics.recordGauge(
222-
String.format("sources/http/%s/metric_samples", _metricSafeName),
223-
samples);
235+
_receivedSamples.addAndGet(samples);
224236

225237
return records;
226238
}
227239

240+
// WARNING: Consider carefully the volume of samples recorded.
241+
// PeriodicMetrics reduces the number of scopes creates, but each sample is
242+
// still stored in-memory until it is flushed.
228243
private final PeriodicMetrics _periodicMetrics;
244+
229245
private final String _metricSafeName;
230246
private final Sink<Record, CompletionStage<Done>> _sink;
231247
private final Parser<List<Record>, com.arpnetworking.metrics.mad.model.HttpRequest> _parser;
232248
private final Materializer _materializer;
233249
private final Graph<FlowShape<HttpRequest, Record>, NotUsed> _processGraph;
250+
private final AtomicLong _receivedSamples = new AtomicLong(0);
234251

235252
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
236253
private static final Logger BAD_REQUEST_LOGGER =

src/main/java/com/arpnetworking/metrics/common/sources/StatsdSource.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -99,26 +99,7 @@ public Receive createReceive() {
9999
.addData("socket", _socket)
100100
.log();
101101
})
102-
.match(Udp.Received.class, updReceived -> {
103-
LOGGER.trace()
104-
.setMessage("Statsd received datagram")
105-
.addData("bytes", updReceived.data().size())
106-
.addData("socket", _socket)
107-
.log();
108-
109-
try {
110-
// NOTE: The parsing occurs in the actor itself which can become a bottleneck
111-
// if there are more records to be parsed then a single thread can handle.
112-
final List<Record> records = PARSER.parse(updReceived.data().toByteBuffer());
113-
records.forEach(_sink::notify);
114-
} catch (final ParsingException e) {
115-
BAD_REQUEST_LOGGER.warn()
116-
.setMessage("Error handling statsd datagram")
117-
.addData("socket", _socket)
118-
.setThrowable(e)
119-
.log();
120-
}
121-
})
102+
.match(Udp.Received.class, this::updReceived)
122103
.matchEquals(UdpMessage.unbind(), message -> {
123104
LOGGER.debug()
124105
.setMessage("Statsd unbind")
@@ -136,6 +117,27 @@ public Receive createReceive() {
136117
.build();
137118
}
138119

120+
private void updReceived(final Udp.Received updReceived) {
121+
LOGGER.trace()
122+
.setMessage("Statsd received datagram")
123+
.addData("bytes", updReceived.data().size())
124+
.addData("socket", _socket)
125+
.log();
126+
127+
try {
128+
// NOTE: The parsing occurs in the actor itself which can become a bottleneck
129+
// if there are more records to be parsed then a single thread can handle.
130+
final List<Record> records = PARSER.parse(updReceived.data().toByteBuffer());
131+
records.forEach(_sink::notify);
132+
} catch (final ParsingException e) {
133+
BAD_REQUEST_LOGGER.warn()
134+
.setMessage("Error handling statsd datagram")
135+
.addData("socket", _socket)
136+
.setThrowable(e)
137+
.log();
138+
}
139+
}
140+
139141
/**
140142
* Constructor.
141143
*

0 commit comments

Comments
 (0)