Skip to content

Commit 6291bfb

Browse files
vjkoskelaBrandonArp
authored andcommitted
Convert to akka 2.5 (#68)
Convert to akka 2.5
1 parent 94d8ead commit 6291bfb

File tree

13 files changed

+350
-370
lines changed

13 files changed

+350
-370
lines changed

pom.xml

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,46 +71,46 @@
7171

7272
<properties>
7373
<!--Dependency versions-->
74-
<akka.version>2.4.17</akka.version>
75-
<akka.http.version>2.4.11.1</akka.http.version>
74+
<akka.version>2.5.6</akka.version>
75+
<akka.http.version>10.0.11</akka.http.version>
7676
<apache.httpclient.version>4.5.1</apache.httpclient.version>
7777
<apache.httpcore.version>4.4.3</apache.httpcore.version>
7878
<arpnetworking.commons.version>1.9.0</arpnetworking.commons.version>
7979
<aspectjrt.version>1.8.9</aspectjrt.version>
80-
<cglib.version>3.2.1</cglib.version>
80+
<cglib.version>3.2.5</cglib.version>
8181
<client.protocol.version>0.9.3</client.protocol.version>
8282
<commons.codec.version>1.10</commons.codec.version>
8383
<commons.io.version>2.4</commons.io.version>
8484
<commons.math3.version>3.3</commons.math3.version>
8585
<findbugs.annotations.version>3.0.1</findbugs.annotations.version>
86-
<guava.version>21.0</guava.version>
87-
<guice.version>4.0</guice.version>
86+
<guava.version>23.4-jre</guava.version>
87+
<guice.version>4.1.0</guice.version>
8888
<hamcrest.version>2.0.0.0</hamcrest.version>
89-
<jackson.version>2.7.3</jackson.version>
90-
<javassist.version>3.20.0-GA</javassist.version>
91-
<javassist.maven.core.version>0.1.2</javassist.maven.core.version>
92-
<joda.time.version>2.8.2</joda.time.version>
93-
<jsr305.version>3.0.0</jsr305.version>
89+
<jackson.version>2.9.2</jackson.version>
90+
<javassist.version>3.22.0-GA</javassist.version>
91+
<javassist.maven.core.version>0.2.1</javassist.maven.core.version>
92+
<joda.time.version>2.9.9</joda.time.version>
93+
<jsr305.version>3.0.2</jsr305.version>
9494
<junit.benchmarks.version>0.7.2</junit.benchmarks.version>
9595
<junit.version>4.12</junit.version>
9696
<logback.version>1.2.3</logback.version>
97-
<logback.steno.version>1.17.0</logback.steno.version>
97+
<logback.steno.version>1.18.0</logback.steno.version>
9898
<log4j.over.slf4j.version>1.7.12</log4j.over.slf4j.version>
9999
<metrics.aggregator.protocol.version>1.0.3</metrics.aggregator.protocol.version>
100-
<metrics.client.version>0.6.0</metrics.client.version>
101-
<metrics.client.http.version>0.6.0</metrics.client.http.version>
100+
<metrics.client.version>0.10.0</metrics.client.version>
101+
<metrics.client.http.version>0.8.2</metrics.client.http.version>
102102
<metrics.client.incubator.version>0.6.0</metrics.client.incubator.version>
103103
<metrics.generator.version>1.1.0</metrics.generator.version>
104-
<metrics.jvm.extra.version>0.4.2</metrics.jvm.extra.version>
105-
<mockito.version>1.10.19</mockito.version>
106-
<oval.version>1.86</oval.version>
104+
<metrics.jvm.extra.version>0.7.0</metrics.jvm.extra.version>
105+
<mockito.version>2.12.0</mockito.version>
106+
<oval.version>1.90</oval.version>
107107
<performance.test.version>1.1.0</performance.test.version>
108108
<protobuf.version>3.0.2</protobuf.version>
109109
<reflections.version>0.9.11</reflections.version>
110110
<scala.version>2.11</scala.version>
111111
<scala.java.compat.version>0.7.0</scala.java.compat.version>
112112
<scala.library.version>2.11.7</scala.library.version>
113-
<slf4j.version>1.7.12</slf4j.version>
113+
<slf4j.version>1.7.25</slf4j.version>
114114
<statsd.client.timgroup>3.0.1</statsd.client.timgroup>
115115
<typesafe.config.version>1.3.1</typesafe.config.version>
116116
<vertx.core.version>2.1.6</vertx.core.version>
@@ -195,7 +195,6 @@
195195
<artifactId>wrapper-maven-plugin</artifactId>
196196
<inherited>false</inherited>
197197
</plugin>
198-
199198
<!-- Project Specific Plugins -->
200199
<plugin>
201200
<groupId>org.codehaus.mojo</groupId>
@@ -527,7 +526,7 @@
527526
<dependency>
528527
<groupId>com.typesafe.akka</groupId>
529528
<artifactId>akka-stream_${scala.version}</artifactId>
530-
<version>${akka.http.version}</version>
529+
<version>${akka.version}</version>
531530
</dependency>
532531
<dependency>
533532
<groupId>com.typesafe</groupId>

src/main/java/com/arpnetworking/metrics/common/sources/BaseTcpSource.java

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package com.arpnetworking.metrics.common.sources;
1717

18+
import akka.actor.AbstractActor;
1819
import akka.actor.ActorRef;
1920
import akka.actor.Props;
20-
import akka.actor.UntypedActor;
2121
import akka.io.Tcp;
2222
import akka.io.TcpMessage;
2323
import com.arpnetworking.steno.Logger;
@@ -28,7 +28,6 @@
2828
import net.sf.oval.constraint.Range;
2929

3030
import java.net.InetSocketAddress;
31-
import java.util.Objects;
3231
import java.util.function.Function;
3332

3433
/**
@@ -60,7 +59,7 @@ protected BaseTcpSource(final Builder<?, ?> builder) {
6059
/**
6160
* Internal actor to process requests.
6261
*/
63-
/* package private */ abstract static class BaseTcpListenerActor extends UntypedActor {
62+
/* package private */ abstract static class BaseTcpListenerActor extends AbstractActor {
6463
/**
6564
* Creates a {@link Props} for this actor.
6665
*
@@ -82,40 +81,41 @@ public void preStart() {
8281
}
8382

8483
@Override
85-
public void onReceive(final Object message) throws Exception {
86-
if (Objects.equals(IS_READY, message)) {
87-
getSender().tell(_isReady, getSelf());
88-
} else if (message instanceof Tcp.Bound) {
89-
final Tcp.Bound tcpBound = (Tcp.Bound) message;
90-
_isReady = true;
91-
_tcpManager.tell(message, getSelf());
92-
LOGGER.info()
93-
.setMessage("Tcp server binding complete")
94-
.addData("name", _sink.getName())
95-
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
96-
.addData("port", tcpBound.localAddress().getPort())
97-
.log();
98-
} else if (message instanceof Tcp.CommandFailed) {
99-
getContext().stop(getSelf());
100-
LOGGER.warn()
101-
.setMessage("Tcp server bad command")
102-
.addData("name", _sink.getName())
103-
.log();
104-
} else if (message instanceof Tcp.Connected) {
105-
final Tcp.Connected tcpConnected = (Tcp.Connected) message;
106-
_tcpManager.tell(message, getSelf());
107-
LOGGER.debug()
108-
.setMessage("Tcp connection established")
109-
.addData("name", _sink.getName())
110-
.addData("remoteAddress", tcpConnected.remoteAddress().getAddress().getHostAddress())
111-
.addData("remotePort", tcpConnected.remoteAddress().getPort())
112-
.log();
113-
114-
final ActorRef handler = createHandler(_sink, tcpConnected);
115-
getSender().tell(TcpMessage.register(handler), getSelf());
116-
} else {
117-
unhandled(message);
118-
}
84+
public Receive createReceive() {
85+
return receiveBuilder()
86+
.matchEquals(IS_READY, message -> {
87+
getSender().tell(_isReady, getSelf());
88+
})
89+
.match(Tcp.Bound.class, tcpBound -> {
90+
_isReady = true;
91+
_tcpManager.tell(tcpBound, getSelf());
92+
LOGGER.info()
93+
.setMessage("Tcp server binding complete")
94+
.addData("name", _sink.getName())
95+
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
96+
.addData("port", tcpBound.localAddress().getPort())
97+
.log();
98+
})
99+
.match(Tcp.CommandFailed.class, failed -> {
100+
getContext().stop(getSelf());
101+
LOGGER.warn()
102+
.setMessage("Tcp server bad command")
103+
.addData("name", _sink.getName())
104+
.log();
105+
})
106+
.match(Tcp.Connected.class, tcpConnected -> {
107+
_tcpManager.tell(tcpConnected, getSelf());
108+
LOGGER.debug()
109+
.setMessage("Tcp connection established")
110+
.addData("name", _sink.getName())
111+
.addData("remoteAddress", tcpConnected.remoteAddress().getAddress().getHostAddress())
112+
.addData("remotePort", tcpConnected.remoteAddress().getPort())
113+
.log();
114+
115+
final ActorRef handler = createHandler(_sink, tcpConnected);
116+
getSender().tell(TcpMessage.register(handler), getSelf());
117+
})
118+
.build();
119119
}
120120

121121
/**

src/main/java/com/arpnetworking/metrics/common/sources/HttpSource.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
import akka.Done;
1919
import akka.NotUsed;
20+
import akka.actor.AbstractActor;
2021
import akka.actor.Props;
21-
import akka.actor.UntypedActor;
2222
import akka.http.javadsl.model.HttpHeader;
2323
import akka.http.javadsl.model.HttpRequest;
2424
import akka.http.javadsl.model.HttpResponse;
@@ -82,7 +82,7 @@ protected HttpSource(final Builder<?, ? extends HttpSource> builder) {
8282
/**
8383
* Internal actor to process requests.
8484
*/
85-
/* package private */ static final class Actor extends UntypedActor {
85+
/* package private */ static final class Actor extends AbstractActor {
8686
/**
8787
* Creates a {@link Props} for this actor.
8888
*
@@ -94,33 +94,32 @@ protected HttpSource(final Builder<?, ? extends HttpSource> builder) {
9494
}
9595

9696
@Override
97-
public void onReceive(final Object message) throws Exception {
98-
if (message instanceof RequestReply) {
99-
final RequestReply requestReply = (RequestReply) message;
100-
// TODO(barp): Fix the ugly HttpRequest cast here due to java vs scala dsl
101-
akka.stream.javadsl.Source.single(requestReply.getRequest())
102-
.via(_processGraph)
103-
.toMat(_sink, Keep.right())
104-
.run(_materializer)
105-
.whenComplete((done, err) -> {
106-
final CompletableFuture<HttpResponse> responseFuture = requestReply.getResponse();
107-
if (err == null) {
108-
responseFuture.complete(HttpResponse.create().withStatus(200));
109-
} else {
110-
BAD_REQUEST_LOGGER.warn()
111-
.setMessage("Error handling http post")
112-
.setThrowable(err)
113-
.log();
114-
if (err instanceof ParsingException) {
115-
responseFuture.complete(HttpResponse.create().withStatus(400));
116-
} else {
117-
responseFuture.complete(HttpResponse.create().withStatus(500));
118-
}
119-
}
120-
});
121-
} else {
122-
unhandled(message);
123-
}
97+
public Receive createReceive() {
98+
return receiveBuilder()
99+
.match(RequestReply.class, requestReply -> {
100+
// TODO(barp): Fix the ugly HttpRequest cast here due to java vs scala dsl
101+
akka.stream.javadsl.Source.single(requestReply.getRequest())
102+
.via(_processGraph)
103+
.toMat(_sink, Keep.right())
104+
.run(_materializer)
105+
.whenComplete((done, err) -> {
106+
final CompletableFuture<HttpResponse> responseFuture = requestReply.getResponse();
107+
if (err == null) {
108+
responseFuture.complete(HttpResponse.create().withStatus(200));
109+
} else {
110+
BAD_REQUEST_LOGGER.warn()
111+
.setMessage("Error handling http post")
112+
.setThrowable(err)
113+
.log();
114+
if (err instanceof ParsingException) {
115+
responseFuture.complete(HttpResponse.create().withStatus(400));
116+
} else {
117+
responseFuture.complete(HttpResponse.create().withStatus(500));
118+
}
119+
}
120+
});
121+
})
122+
.build();
124123
}
125124

126125
/**

src/main/java/com/arpnetworking/metrics/common/sources/StatsdSource.java

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package com.arpnetworking.metrics.common.sources;
1717

18+
import akka.actor.AbstractActor;
1819
import akka.actor.ActorRef;
1920
import akka.actor.Props;
20-
import akka.actor.UntypedActor;
2121
import akka.io.Udp;
2222
import akka.io.UdpMessage;
2323
import com.arpnetworking.metrics.common.parsers.Parser;
@@ -34,7 +34,6 @@
3434
import java.nio.ByteBuffer;
3535
import java.time.Duration;
3636
import java.util.List;
37-
import java.util.Objects;
3837

3938
/**
4039
* Source that uses Statsd as input.
@@ -73,7 +72,7 @@ private StatsdSource(final Builder builder) {
7372
/**
7473
* Internal actor to process requests.
7574
*/
76-
/* package private */ static final class Actor extends UntypedActor {
75+
/* package private */ static final class Actor extends AbstractActor {
7776
/**
7877
* Creates a {@link Props} for this actor.
7978
*
@@ -85,57 +84,56 @@ private StatsdSource(final Builder builder) {
8584
}
8685

8786
@Override
88-
public void onReceive(final Object message) throws Exception {
89-
if (Objects.equals(IS_READY, message)) {
90-
getSender().tell(_isReady, getSelf());
91-
} else if (message instanceof Udp.Bound) {
92-
final Udp.Bound updBound = (Udp.Bound) message;
93-
_socket = getSender();
94-
_isReady = true;
95-
LOGGER.info()
96-
.setMessage("Statsd server binding complete")
97-
.addData("address", updBound.localAddress().getAddress().getHostAddress())
98-
.addData("port", updBound.localAddress().getPort())
99-
.addData("socket", _socket)
100-
.log();
101-
} else if (message instanceof Udp.Received) {
102-
final Udp.Received updReceived = (Udp.Received) message;
103-
LOGGER.trace()
104-
.setMessage("Statsd received datagram")
105-
.addData("bytes", updReceived.data().size())
106-
.addData("socket", _socket)
107-
.log();
108-
109-
try {
110-
// NOTE: The parsing occurs in the actor itself which can become a bottleneck
111-
// if there are more records to be parsed then a single thread can handle.
112-
final List<Record> records = PARSER.parse(updReceived.data().toByteBuffer());
113-
records.forEach(_sink::notify);
114-
} catch (final ParsingException e) {
115-
BAD_REQUEST_LOGGER.warn()
116-
.setMessage("Error handling statsd datagram")
117-
.addData("socket", _socket)
118-
.setThrowable(e)
119-
.log();
120-
}
121-
122-
} else if (Objects.equals(message, UdpMessage.unbind())) {
123-
LOGGER.debug()
124-
.setMessage("Statsd unbind")
125-
.addData("socket", _socket)
126-
.log();
127-
_socket.tell(message, getSelf());
128-
129-
} else if (message instanceof Udp.Unbound) {
130-
LOGGER.debug()
131-
.setMessage("Statsd unbound")
132-
.addData("socket", _socket)
133-
.log();
134-
getContext().stop(getSelf());
135-
136-
} else {
137-
unhandled(message);
138-
}
87+
public Receive createReceive() {
88+
return receiveBuilder()
89+
.matchEquals(IS_READY, message -> {
90+
getSender().tell(_isReady, getSelf());
91+
})
92+
.match(Udp.Bound.class, updBound -> {
93+
_socket = getSender();
94+
_isReady = true;
95+
LOGGER.info()
96+
.setMessage("Statsd server binding complete")
97+
.addData("address", updBound.localAddress().getAddress().getHostAddress())
98+
.addData("port", updBound.localAddress().getPort())
99+
.addData("socket", _socket)
100+
.log();
101+
})
102+
.match(Udp.Received.class, updReceived -> {
103+
LOGGER.trace()
104+
.setMessage("Statsd received datagram")
105+
.addData("bytes", updReceived.data().size())
106+
.addData("socket", _socket)
107+
.log();
108+
109+
try {
110+
// NOTE: The parsing occurs in the actor itself which can become a bottleneck
111+
// if there are more records to be parsed then a single thread can handle.
112+
final List<Record> records = PARSER.parse(updReceived.data().toByteBuffer());
113+
records.forEach(_sink::notify);
114+
} catch (final ParsingException e) {
115+
BAD_REQUEST_LOGGER.warn()
116+
.setMessage("Error handling statsd datagram")
117+
.addData("socket", _socket)
118+
.setThrowable(e)
119+
.log();
120+
}
121+
})
122+
.matchEquals(UdpMessage.unbind(), message -> {
123+
LOGGER.debug()
124+
.setMessage("Statsd unbind")
125+
.addData("socket", _socket)
126+
.log();
127+
_socket.tell(message, getSelf());
128+
})
129+
.match(Udp.Unbound.class, message -> {
130+
LOGGER.debug()
131+
.setMessage("Statsd unbound")
132+
.addData("socket", _socket)
133+
.log();
134+
getContext().stop(getSelf());
135+
})
136+
.build();
139137
}
140138

141139
/**

0 commit comments

Comments
 (0)