Skip to content

Commit 6ebf030

Browse files
authored
Fixes to http logging, hocon configuration parsing and kryo serialization. (#93)
1 parent a59fc26 commit 6ebf030

File tree

9 files changed

+321
-93
lines changed

9 files changed

+321
-93
lines changed

README.md

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -90,22 +90,32 @@ For example:
9090
"stdout-loglevel": "DEBUG",
9191
"logging-filter": "akka.event.slf4j.Slf4jLoggingFilter",
9292
"actor": {
93+
"provider": "akka.cluster.ClusterActorRefProvider",
9394
"debug": {
9495
"unhandled": "on"
9596
},
96-
"provider": "akka.cluster.ClusterActorRefProvider",
9797
"serializers": {
98-
"loggingJava": "com.arpnetworking.akka.LoggingSerializer"
98+
"kryo": "com.romix.akka.serialization.kryo.KryoSerializer"
9999
},
100-
"serialization-bindings" : {
101-
"\"com.arpnetworking.tsdcore.model.AggregatedData\"": "loggingJava"
100+
"serialization-bindings": {
101+
"java.lang.Object": "kryo",
102+
"java.io.Serializable": "none"
103+
},
104+
"kryo": {
105+
"type": "graph",
106+
"idstrategy": "default",
107+
"buffer-size": 4096,
108+
"max-buffer-size": -1,
109+
"kryo-custom-serializer-init": "com.arpnetworking.clusteraggregator.kryo.KryoInitialization"
102110
}
103111
},
104112
"cluster": {
113+
"sharding": {
114+
"state-store-mode": "persistence"
115+
},
105116
"seed-nodes": [
106117
"akka.tcp://[email protected]:2551"
107-
],
108-
"auto-down-unreachable-after": "300s"
118+
]
109119
},
110120
"remote": {
111121
"log-remote-lifecycle-events": "on",
@@ -115,23 +125,6 @@ For example:
115125
"port": 2551
116126
}
117127
}
118-
},
119-
"contrib": {
120-
"cluster": {
121-
"sharding": {
122-
"guardian-name": "sharding",
123-
"role": "",
124-
"retry-interval": "2 s",
125-
"buffer-size": 100000,
126-
"handoff-timeout": "60 s",
127-
"rebalance-interval": "10 s",
128-
"snapshot-interval": "720 s",
129-
"least-shard-allocation-strategy": {
130-
"rebalance-threshold": 10,
131-
"max-simultaneous-rebalance": 3
132-
}
133-
}
134-
}
135128
}
136129
}
137130
}

config/config.conf

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,6 @@ rebalanceConfiguration {
1616
maxParallel=100
1717
threshold=500
1818
}
19-
//databaseConfigurations {
20-
// metrics_clusteragg {
21-
// jdbcUrl="jdbc:h2:/opt/cluster-aggregator/data/metrics:clusteragg;AUTO_SERVER=TRUE;AUTO_SERVER_PORT=7067;MODE=PostgreSQL;INIT=create schema if not exists clusteragg;DB_CLOSE_DELAY=-1"
22-
// driverName="org.h2.Driver"
23-
// username="sa"
24-
// password="secret"
25-
// maximumPoolSize=2
26-
// minimumIdle=2
27-
// idleTimeout=0
28-
// modelPackages=[ "com.arpnetworking.clusteraggregator.models.ebean" ]
29-
// }
30-
// metrics_clusteragg_ddl {
31-
// jdbcUrl="jdbc:h2:/opt/cluster-aggregator/data/metrics:clusteragg;AUTO_SERVER=TRUE;AUTO_SERVER_PORT=7067;MODE=PostgreSQL;INIT=create schema if not exists clusteragg;DB_CLOSE_DELAY=-1"
32-
// driverName="org.h2.Driver"
33-
// username="sa"
34-
// password="secret"
35-
// migrationLocations=["db/migration/metrics_clusteragg/common"]
36-
// migrationSchemas=["clusteragg"]
37-
// maximumPoolSize=2
38-
// minimumIdle=0
39-
// idleTimeout=10000
40-
// modelPackages=[ "com.arpnetworking.clusteraggregator.models.ebean" ]
41-
// }
42-
//}
4319
akkaConfiguration {
4420
akka {
4521
extensions=["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
@@ -48,6 +24,7 @@ akkaConfiguration {
4824
stdout-loglevel="DEBUG"
4925
logging-filter="akka.event.slf4j.Slf4jLoggingFilter"
5026
actor {
27+
provider="akka.cluster.ClusterActorRefProvider"
5128
serializers {
5229
kryo="com.romix.akka.serialization.kryo.KryoSerializer"
5330
}
@@ -60,33 +37,23 @@ akkaConfiguration {
6037
idstrategy="default"
6138
buffer-size=4096
6239
max-buffer-size=-1
40+
kryo-custom-serializer-init="com.arpnetworking.clusteraggregator.kryo.KryoInitialization"
6341
}
6442
debug {
6543
unhandled="on"
6644
}
67-
provider="akka.cluster.ClusterActorRefProvider"
6845
}
6946
cluster {
7047
seed-nodes=["akka.tcp://[email protected]:2551"]
71-
auto-down-unreachable-after="300s"
7248
sharding {
73-
guardian-name="sharding"
74-
role=""
75-
retry-interval="2 s"
76-
buffer-size=100000
77-
handoff-timeout="60 s"
78-
rebalance-interval="10 s"
79-
snapshot-interval="720 s"
8049
state-store-mode="persistence"
81-
least-shard-allocation-strategy {
82-
rebalance-threshold=10
83-
max-simultaneous-rebalance=3
84-
}
8550
}
8651
}
87-
remote.log-remote-lifecycle-events="on"
88-
remote.netty.tcp.hostname="127.0.0.1"
89-
remote.netty.tcp.port=2551
52+
remote {
53+
log-remote-lifecycle-events="on"
54+
netty.tcp.hostname="127.0.0.1"
55+
netty.tcp.port=2551
56+
}
9057
persistence {
9158
journal {
9259
plugin = "akka.persistence.journal.leveldb"

pom.xml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@
101101

102102
<properties>
103103
<!--Dependency versions-->
104-
<akka.version>2.5.13</akka.version>
105-
<akka.http.version>10.0.6</akka.http.version>
104+
<akka.version>2.5.16</akka.version>
105+
<akka.http.version>10.1.5</akka.http.version>
106106
<akka.persistence.cassandra.version>0.89</akka.persistence.cassandra.version>
107-
<akka.kryo.version>0.5.1</akka.kryo.version>
107+
<akka.kryo.version>0.5.2</akka.kryo.version>
108108
<apache.httpclient.version>4.5.2</apache.httpclient.version>
109109
<apache.httpcore.version>4.4.5</apache.httpcore.version>
110110
<arpnetworking.commons.version>1.13.3</arpnetworking.commons.version>
@@ -134,6 +134,8 @@
134134
<logback.steno.version>1.18.0</logback.steno.version>
135135
<log4j.over.slf4j.version>1.7.22</log4j.over.slf4j.version>
136136
<luaj.version>3.0.1</luaj.version>
137+
<kryo.version>4.0.0</kryo.version>
138+
<kryo.serializers.version>0.42</kryo.serializers.version>
137139
<metrics.client.version>0.10.0</metrics.client.version>
138140
<metrics.jvm.extra.version>0.9.0</metrics.jvm.extra.version>
139141
<metrics.http.extra.version>0.9.1</metrics.http.extra.version>
@@ -675,6 +677,16 @@
675677
<version>${akka.kryo.version}</version>
676678
<scope>runtime</scope>
677679
</dependency>
680+
<dependency>
681+
<groupId>com.esotericsoftware</groupId>
682+
<artifactId>kryo</artifactId>
683+
<version>${kryo.version}</version>
684+
</dependency>
685+
<dependency>
686+
<groupId>de.javakaffee</groupId>
687+
<artifactId>kryo-serializers</artifactId>
688+
<version>${kryo.serializers.version}</version>
689+
</dependency>
678690
<!-- Database -->
679691
<dependency>
680692
<groupId>javax.persistence</groupId>

src/main/java/com/arpnetworking/clusteraggregator/Status.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,17 @@
2323
import akka.cluster.MemberStatus;
2424
import akka.pattern.PatternsCS;
2525
import akka.remote.AssociationErrorEvent;
26-
import akka.util.Timeout;
2726
import com.arpnetworking.clusteraggregator.models.BookkeeperData;
2827
import com.arpnetworking.clusteraggregator.models.MetricsRequest;
2928
import com.arpnetworking.clusteraggregator.models.PeriodMetrics;
3029
import com.arpnetworking.clusteraggregator.models.StatusResponse;
3130
import com.arpnetworking.utility.CastMapper;
3231
import org.joda.time.Period;
3332

33+
import java.time.Duration;
3434
import java.util.Map;
3535
import java.util.concurrent.CompletableFuture;
3636
import java.util.concurrent.CompletionStage;
37-
import java.util.concurrent.TimeUnit;
3837
import java.util.function.Function;
3938

4039
/**
@@ -104,7 +103,7 @@ public Receive createReceive() {
104103
.ask(
105104
_clusterStatusCache,
106105
new ClusterStatusCache.GetRequest(),
107-
Timeout.apply(3, TimeUnit.SECONDS))
106+
Duration.ofSeconds(3))
108107
.thenApply(CAST_MAPPER);
109108
PatternsCS.pipe(stateFuture, context().dispatcher()).to(self(), sender());
110109
})
@@ -120,15 +119,15 @@ private void processStatusRequest() {
120119
final CompletableFuture<BookkeeperData> bookkeeperFuture = PatternsCS.ask(
121120
_metricsBookkeeper,
122121
new MetricsRequest(),
123-
Timeout.apply(3, TimeUnit.SECONDS))
122+
Duration.ofSeconds(3))
124123
.<BookkeeperData>thenApply(new CastMapper<>())
125124
.exceptionally(new AsNullRecovery<>())
126125
.toCompletableFuture();
127126
final CompletableFuture<ClusterStatusCache.StatusResponse> clusterStateFuture =
128127
PatternsCS.ask(
129128
_clusterStatusCache,
130129
new ClusterStatusCache.GetRequest(),
131-
Timeout.apply(3, TimeUnit.SECONDS))
130+
Duration.ofSeconds(3))
132131
.thenApply(CAST_MAPPER)
133132
.exceptionally(new AsNullRecovery<>())
134133
.toCompletableFuture();
@@ -137,7 +136,7 @@ private void processStatusRequest() {
137136
PatternsCS.ask(
138137
_localMetrics,
139138
new MetricsRequest(),
140-
Timeout.apply(3, TimeUnit.SECONDS))
139+
Duration.ofSeconds(3))
141140
.<Map<Period, PeriodMetrics>>thenApply(new CastMapper<>())
142141
.exceptionally(new AsNullRecovery<>())
143142
.toCompletableFuture();

src/main/java/com/arpnetworking/clusteraggregator/http/Routes.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import akka.japi.function.Function;
3030
import akka.pattern.PatternsCS;
3131
import akka.util.ByteString;
32-
import akka.util.Timeout;
3332
import com.arpnetworking.clusteraggregator.Status;
3433
import com.arpnetworking.clusteraggregator.models.StatusResponse;
3534
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
@@ -53,9 +52,12 @@
5352
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
5453

5554
import java.io.IOException;
55+
import java.time.Duration;
56+
import java.util.UUID;
5657
import java.util.concurrent.CompletableFuture;
5758
import java.util.concurrent.CompletionStage;
58-
import java.util.concurrent.TimeUnit;
59+
import java.util.stream.Collectors;
60+
import java.util.stream.StreamSupport;
5961

6062
/**
6163
* Http server routes.
@@ -99,32 +101,60 @@ public CompletionStage<HttpResponse> apply(final HttpRequest request) {
99101
createMetricName(request, BODY_SIZE_METRIC),
100102
request.entity().getContentLengthOption().orElse(0L),
101103
Units.BYTE);
102-
LOGGER.trace()
103-
.setEvent("http.in.start")
104-
.addData("method", request.method())
105-
.addData("url", request.getUri())
106-
.addData("headers", request.getHeaders())
107-
.log();
104+
final UUID requestId = UUID.randomUUID();
105+
if (LOGGER.isTraceEnabled()) {
106+
LOGGER.trace()
107+
.setEvent("http.in.start")
108+
.addContext("requestId", requestId)
109+
.addData("method", request.method().toString())
110+
.addData("url", request.getUri().toString())
111+
.addData(
112+
"headers",
113+
StreamSupport.stream(request.getHeaders().spliterator(), false)
114+
.map(h -> h.name() + "=" + h.value())
115+
.collect(Collectors.toList()))
116+
.log();
117+
}
108118
return process(request).<HttpResponse>whenComplete(
109119
(response, failure) -> {
110120
timer.close();
111-
final int responseStatusClass = response.status().intValue() / 100;
121+
122+
final int responseStatus;
123+
if (response != null) {
124+
responseStatus = response.status().intValue();
125+
} else {
126+
// TODO(ville): Figure out how to intercept post-exception mapping.
127+
responseStatus = 599;
128+
}
129+
final int responseStatusClass = responseStatus / 100;
112130
for (final int i : STATUS_CLASSES) {
113131
metrics.incrementCounter(
114132
createMetricName(request, String.format("%s/%dxx", STATUS_METRIC, i)),
115133
responseStatusClass == i ? 1 : 0);
116134
}
117135
metrics.close();
118-
final LogBuilder log = LOGGER.trace()
119-
.setEvent("http.in")
120-
.addData("method", request.method())
121-
.addData("url", request.getUri())
122-
.addData("status", response.status().intValue())
123-
.addData("headers", request.getHeaders());
124-
if (failure != null) {
125-
log.setEvent("http.in.error").addData("exception", failure);
136+
137+
final LogBuilder log;
138+
if (failure != null || responseStatusClass == 5) {
139+
log = LOGGER.info().setEvent("http.in.failure");
140+
if (failure != null) {
141+
log.setThrowable(failure);
142+
}
143+
if (!LOGGER.isTraceEnabled() && LOGGER.isInfoEnabled()) {
144+
log.addData("method", request.method().toString())
145+
.addData("url", request.getUri().toString())
146+
.addData(
147+
"headers",
148+
StreamSupport.stream(request.getHeaders().spliterator(), false)
149+
.map(h -> h.name() + "=" + h.value())
150+
.collect(Collectors.toList()));
151+
}
152+
} else {
153+
log = LOGGER.trace().setEvent("http.in.complete");
126154
}
127-
log.log();
155+
log.addContext("requestId", requestId)
156+
.addData("status", responseStatus)
157+
.log();
128158
});
129159
}
130160

@@ -176,7 +206,7 @@ private <T> CompletionStage<T> ask(final String actorPath, final Object request,
176206
PatternsCS.ask(
177207
_actorSystem.actorSelection(actorPath),
178208
request,
179-
Timeout.apply(5, TimeUnit.SECONDS))
209+
Duration.ofSeconds(5))
180210
.thenApply(o -> (T) o)
181211
.exceptionally(throwable -> {
182212
LOGGER.error()

0 commit comments

Comments
 (0)