Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit c4dc7a8

Browse files
OlegDokukarobertroeser
authored andcommitted
update to latest rsocket-rpc-java and reactor versions (#18)
1 parent 42a5128 commit c4dc7a8

File tree

8 files changed

+49
-52
lines changed

8 files changed

+49
-52
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.netifi.proteus
2-
version=0.9.4
2+
version=0.9.5

gradle/java.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ task ciVersion {
3535
build.dependsOn ciVersion
3636

3737
ext {
38-
rsocketRpcVersion = '0.2.2'
38+
rsocketRpcVersion = '0.2.3'
3939
protobufVersion = '3.6.1'
4040
}
4141

@@ -88,14 +88,14 @@ dependencies {
8888
testCompile 'junit:junit:4.12'
8989

9090
testCompile 'javax.inject:javax.inject:1'
91-
testCompile 'io.projectreactor:reactor-test:3.1.9.RELEASE'
91+
testCompile 'io.projectreactor:reactor-test:3.2.0.RELEASE'
9292
testCompile "com.google.protobuf:protobuf-java:$protobufVersion"
9393
testCompile 'org.hdrhistogram:HdrHistogram:2.1.10'
9494
testCompile 'org.apache.logging.log4j:log4j-api:2.9.0'
9595
testCompile 'org.apache.logging.log4j:log4j-core:2.9.0'
9696
testCompile 'org.apache.logging.log4j:log4j-slf4j-impl:2.9.0'
97-
testCompile 'io.rsocket:rsocket-transport-netty:0.11.7'
98-
testCompile 'io.rsocket:rsocket-transport-local:0.11.7'
97+
testCompile 'io.rsocket:rsocket-transport-netty:0.11.8'
98+
testCompile 'io.rsocket:rsocket-transport-local:0.11.8'
9999
testCompile 'org.mockito:mockito-all:1.10.19'
100100
}
101101

proteus-client/src/main/java/io/netifi/proteus/Proteus.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import reactor.core.Exceptions;
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.publisher.MonoProcessor;
29-
import reactor.ipc.netty.tcp.TcpClient;
29+
import reactor.netty.tcp.TcpClient;
3030

3131
/** This is where the magic happens */
3232
public class Proteus implements Closeable {
@@ -256,7 +256,7 @@ public Proteus build() {
256256
if (sslDisabled) {
257257
clientTransportFactory =
258258
address -> {
259-
TcpClient client = TcpClient.create(opts -> opts.connectAddress(() -> address));
259+
TcpClient client = TcpClient.create().addressSupplier(() -> address);
260260
return TcpClientTransport.create(client);
261261
};
262262
} else {
@@ -277,8 +277,7 @@ public Proteus build() {
277277
clientTransportFactory =
278278
address -> {
279279
TcpClient client =
280-
TcpClient.create(
281-
opts -> opts.connectAddress(() -> address).sslContext(sslContext));
280+
TcpClient.create().addressSupplier(() -> address).secure(sslContext);
282281
return TcpClientTransport.create(client);
283282
};
284283
} catch (Exception sslException) {

proteus-metrics-prometheus/src/main/java/io/netifi/proteus/prometheus/ProteusPrometheusBridge.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import reactor.core.publisher.DirectProcessor;
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
29-
import reactor.ipc.netty.http.server.HttpServer;
30-
import reactor.ipc.netty.http.server.HttpServerRequest;
31-
import reactor.ipc.netty.http.server.HttpServerResponse;
29+
import reactor.netty.http.server.HttpServer;
30+
import reactor.netty.http.server.HttpServerRequest;
31+
import reactor.netty.http.server.HttpServerResponse;
3232

3333
@Named("ProteusPrometheusBridge")
3434
public class ProteusPrometheusBridge implements MetricsSnapshotHandler {
@@ -99,8 +99,11 @@ public static void main(String... args) {
9999
}
100100

101101
private void init() {
102-
HttpServer.create(bindAddress, bindPort)
103-
.newRouter(routes -> routes.post(metricsUrl, this::handle).get(metricsUrl, this::handle))
102+
HttpServer.create()
103+
.host(bindAddress)
104+
.port(bindPort)
105+
.route(routes -> routes.post(metricsUrl, this::handle).get(metricsUrl, this::handle))
106+
.bind()
104107
.subscribe();
105108
}
106109

proteus-tracing-openzipkin/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ dependencies {
1010
protobuf project (':proteus-tracing-idl')
1111
compile project (':proteus-client')
1212

13-
compile 'io.projectreactor.addons:reactor-adapter:3.1.7.RELEASE'
13+
compile 'io.projectreactor.addons:reactor-adapter:3.2.0.RELEASE'
1414
compile "com.google.protobuf:protobuf-java-util:$protobufVersion"
1515

1616
compile 'io.opentracing:opentracing-api:0.31.0'

proteus-tracing-openzipkin/src/main/java/io/netifi/proteus/tracing/ProteusZipkinHttpBridge.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
import reactor.core.Exceptions;
1414
import reactor.core.publisher.Flux;
1515
import reactor.core.publisher.Mono;
16-
import reactor.ipc.netty.http.client.HttpClient;
17-
import reactor.ipc.netty.resources.PoolResources;
16+
import reactor.netty.ByteBufFlux;
17+
import reactor.netty.http.client.HttpClient;
18+
import reactor.netty.resources.ConnectionProvider;
1819
import zipkin2.proto3.Span;
1920

2021
public class ProteusZipkinHttpBridge implements ProteusTracingService {
@@ -82,17 +83,15 @@ public static void main(String... args) {
8283
private synchronized HttpClient getClient() {
8384
if (httpClient == null) {
8485
this.httpClient =
85-
HttpClient.builder()
86-
.options(
87-
builder ->
88-
builder
89-
.compression(true)
90-
.poolResources(PoolResources.fixed("proteusZipkinBridge"))
91-
.option(ChannelOption.SO_KEEPALIVE, true)
92-
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30_000)
86+
HttpClient.create(ConnectionProvider.fixed("proteusZipkinBridge"))
87+
.compress(true)
88+
.port(port)
89+
.tcpConfiguration(
90+
tcpClient ->
91+
tcpClient
9392
.host(host)
94-
.port(port))
95-
.build();
93+
.option(ChannelOption.SO_KEEPALIVE, true)
94+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30_000));
9695
}
9796
return httpClient;
9897
}
@@ -127,12 +126,11 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
127126
.concatMap(
128127
spans ->
129128
getClient()
130-
.post(
131-
zipkinSpansUrl,
132-
request -> {
133-
request.addHeader("Content-Type", "application/json");
134-
return request.sendString(Mono.just(spans));
135-
})
129+
.headers(hh -> hh.add("Content-Type", "application/json"))
130+
.post()
131+
.uri(zipkinSpansUrl)
132+
.send(ByteBufFlux.fromString(Mono.just(spans)))
133+
.response()
136134
.timeout(Duration.ofSeconds(30))
137135
.doOnError(throwable -> resetHttpClient()),
138136
8)

proteus-tracing-openzipkin/src/main/java/io/netifi/proteus/tracing/TracesStreamer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import reactor.core.Exceptions;
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20-
import reactor.ipc.netty.http.client.HttpClient;
20+
import reactor.netty.http.client.HttpClient;
2121
import zipkin2.proto3.Span;
2222

2323
public class TracesStreamer {
@@ -66,13 +66,12 @@ private static Function<Integer, Publisher<InputStream>> zipkinServerStream(
6666
return lookbackSeconds ->
6767
client.flatMapMany(
6868
c ->
69-
c.get(
70-
zipkinQuery(zipkinUrl, lookbackSeconds),
71-
req -> {
72-
req.context().addHandler(new JsonObjectDecoder(true));
73-
return Mono.empty();
74-
})
75-
.flatMapMany(resp -> resp.receive().asInputStream()));
69+
c.doOnRequest(
70+
(__, connection) -> connection.addHandler(new JsonObjectDecoder(true)))
71+
.get()
72+
.uri(zipkinQuery(zipkinUrl, lookbackSeconds))
73+
.responseContent()
74+
.asInputStream());
7675
}
7776

7877
private static String zipkinQuery(String zipkinUrl, int lookbackSeconds) {

proteus-tracing-openzipkin/src/test/java/io/netifi/proteus/tracing/ZipkinTracesStreamerTest.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import org.junit.Test;
1515
import reactor.core.publisher.Flux;
1616
import reactor.core.publisher.Mono;
17-
import reactor.ipc.netty.http.client.HttpClient;
18-
import reactor.ipc.netty.resources.PoolResources;
17+
import reactor.netty.http.client.HttpClient;
18+
import reactor.netty.resources.ConnectionProvider;
1919

2020
public class ZipkinTracesStreamerTest {
2121

@@ -70,16 +70,14 @@ public void streamerIntegrationTest() {
7070

7171
private Mono<HttpClient> client() {
7272
return Mono.just(
73-
HttpClient.builder()
74-
.options(
75-
builder ->
76-
builder
77-
.compression(true)
78-
.poolResources(PoolResources.fixed("proteusZipkinBridge"))
73+
HttpClient.create(ConnectionProvider.fixed("proteusZipkinBridge"))
74+
.compress(true)
75+
.port(9411)
76+
.tcpConfiguration(
77+
tcpClient ->
78+
tcpClient
7979
.option(ChannelOption.SO_KEEPALIVE, true)
8080
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30_000)
81-
.host("127.0.0.1")
82-
.port(9411))
83-
.build());
81+
.host("127.0.0.1")));
8482
}
8583
}

0 commit comments

Comments
 (0)