Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit a7785a8

Browse files
authored
Metrics (#9)
* metrics * fixed imports * fixed imports * make the names prometheus friendly * cleanup, and refactoring * metrics * fixed imports * fixed imports * make the names prometheus friendly * cleanup, and refactoring
1 parent 81c93b4 commit a7785a8

File tree

21 files changed

+620
-167
lines changed

21 files changed

+620
-167
lines changed

build.gradle

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,22 @@ project(':metrics-idl') {
8383
apply from: file('../gradle/java.gradle')
8484
}
8585

86+
project(':metrics-micrometer') {
87+
description = 'Netifi Proteus Micrometer Support'
88+
ext.artifactName = 'metrics-prometheus'
89+
90+
apply from: file('../gradle/release.gradle')
91+
apply from: file('../gradle/java.gradle')
92+
}
93+
94+
project(':metrics-prometheus') {
95+
description = 'Netifi Proteus Prometheus Integration'
96+
ext.artifactName = 'metrics-prometheus'
97+
98+
apply from: file('../gradle/release.gradle')
99+
apply from: file('../gradle/java.gradle')
100+
}
101+
86102
project(':protobuf-rpc') {
87103
description = 'Netifi Proteus RPC'
88104
ext.artifactName = 'protobuf-rpc'
@@ -92,7 +108,7 @@ project(':protobuf-rpc') {
92108

93109

94110
project(':tracing-openzipkin') {
95-
description = 'Netifi Openzipkin Integration'
111+
description = 'Netifi Proteus Openzipkin Integration'
96112
ext.artifactName = 'tracing'
97113

98114
apply from: file('../gradle/release.gradle')
@@ -101,7 +117,7 @@ project(':tracing-openzipkin') {
101117

102118

103119
project(':tracing-idl') {
104-
description = 'Netifi Tracing IDL'
120+
description = 'Netifi Proteus Tracing IDL'
105121
ext.artifactName = 'tracing-idl'
106122

107123
apply from: file('../gradle/release.gradle')

client/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ dependencies {
1515
compile 'com.google.guava:guava:22.0'
1616

1717
testProtobuf project (':core-idl')
18-
18+
testProtobuf project (':protobuf-rpc')
1919
testCompile 'io.micrometer:micrometer-registry-atlas:1.0.3'
2020
}
2121

client/src/main/java/io/netifi/proteus/DefaultProteusBrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import org.slf4j.LoggerFactory;
2727
import reactor.core.Disposable;
2828
import reactor.core.publisher.MonoProcessor;
29+
import reactor.retry.Retry;
2930

3031
import java.net.InetSocketAddress;
3132
import java.net.SocketAddress;
33+
import java.time.Duration;
3234
import java.util.*;
3335
import java.util.function.Function;
3436
import java.util.function.Supplier;

client/src/main/java/io/netifi/proteus/Proteus.java

Lines changed: 11 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
package io.netifi.proteus;
22

33
import io.micrometer.core.instrument.MeterRegistry;
4-
import io.netifi.proteus.metrics.ProteusMetricsExporter;
5-
import io.netifi.proteus.metrics.ProteusOperatingSystemMetrics;
6-
import io.netifi.proteus.metrics.om.MetricsSnapshotHandler;
7-
import io.netifi.proteus.metrics.om.MetricsSnapshotHandlerClient;
84
import io.netifi.proteus.rsocket.ProteusSocket;
95
import io.netifi.proteus.rsocket.RequestHandlingRSocket;
106
import io.netty.buffer.ByteBuf;
@@ -20,7 +16,6 @@
2016

2117
import java.net.InetSocketAddress;
2218
import java.net.SocketAddress;
23-
import java.time.Duration;
2419
import java.util.*;
2520
import java.util.concurrent.ConcurrentHashMap;
2621
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +32,7 @@ public class Proteus implements Closeable {
3732
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
3833
}
3934

35+
private final long accesskey;
4036
private final String fromGroup;
4137
private final DestinationNameFactory destinationNameFactory;
4238
private final ProteusBrokerService brokerService;
@@ -56,6 +52,7 @@ private Proteus(
5652
Function<SocketAddress, ClientTransport> clientTransportFactory,
5753
int poolSize,
5854
Supplier<Tracer> tracerSupplier) {
55+
this.accesskey = accessKey;
5956
this.onClose = MonoProcessor.create();
6057
this.fromGroup = group;
6158
this.requestHandlingRSocket = new RequestHandlingRSocket();
@@ -109,15 +106,19 @@ public ProteusSocket destination(String destination, String group) {
109106
public ProteusSocket group(String group) {
110107
return brokerService.group(group);
111108
}
112-
109+
110+
public long getAccesskey() {
111+
return accesskey;
112+
}
113+
113114
public String getGroupName() {
114115
return fromGroup;
115116
}
116-
117+
117118
public String getDestination() {
118119
return destinationNameFactory.rootName();
119120
}
120-
121+
121122
public static class Builder {
122123
private String host = DefaultBuilderConfig.getHost();
123124
private Integer port = DefaultBuilderConfig.getPort();
@@ -134,10 +135,7 @@ public static class Builder {
134135
private DestinationNameFactory destinationNameFactory;
135136

136137
private MeterRegistry registry = null;
137-
private String metricHandlerGroup = DefaultBuilderConfig.getMetricHandlerGroup();
138138
private int batchSize = DefaultBuilderConfig.getBatchSize();
139-
private long exportFrequencySeconds = DefaultBuilderConfig.getExportFrequencySeconds();
140-
private boolean exportSystemMetrics = DefaultBuilderConfig.getExportSystemMetrics();
141139
private Function<SocketAddress, ClientTransport> clientTransportFactory =
142140
address -> TcpClientTransport.create((InetSocketAddress) address);
143141
private int poolSize = Runtime.getRuntime().availableProcessors();
@@ -154,31 +152,11 @@ public Builder poolSize(int poolSize) {
154152
return this;
155153
}
156154

157-
public Builder meterRegistry(MeterRegistry registry) {
158-
this.registry = registry;
159-
return this;
160-
}
161-
162-
public Builder metricHandlerGroup(String metricHandlerGroup) {
163-
this.metricHandlerGroup = metricHandlerGroup;
164-
return this;
165-
}
166-
167155
public Builder metricBatchSize(int batchSize) {
168156
this.batchSize = batchSize;
169157
return this;
170158
}
171159

172-
public Builder metricExportFrequencySeconds(long exportFrequencySeconds) {
173-
this.exportFrequencySeconds = exportFrequencySeconds;
174-
return this;
175-
}
176-
177-
public Builder exportSystemMetrics(boolean exportSystemMetrics) {
178-
this.exportSystemMetrics = exportSystemMetrics;
179-
return this;
180-
}
181-
182160
public Builder keepalive(boolean useKeepAlive) {
183161
this.keepalive = keepalive;
184162
return this;
@@ -208,12 +186,12 @@ public Builder port(int port) {
208186
this.port = port;
209187
return this;
210188
}
211-
189+
212190
public Builder tracerSupplier(Supplier<Tracer> tracerSupplier) {
213191
this.tracerSupplier = tracerSupplier;
214192
return this;
215193
}
216-
194+
217195
public Builder seedAddresses(Collection<SocketAddress> addresses) {
218196
if (addresses instanceof List) {
219197
this.seedAddresses = (List<SocketAddress>) addresses;
@@ -309,35 +287,6 @@ public Proteus build() {
309287
tracerSupplier);
310288
proteus.onClose.doFinally(s -> PROTEUS.remove(proteusKey)).subscribe();
311289

312-
if (registry != null) {
313-
registry
314-
.config()
315-
.commonTags(
316-
"accessKey",
317-
String.valueOf(accessKey),
318-
"group",
319-
group,
320-
"destination",
321-
destination);
322-
323-
ProteusOperatingSystemMetrics systemMetrics =
324-
new ProteusOperatingSystemMetrics(registry);
325-
326-
ProteusSocket socket = proteus.group(metricHandlerGroup);
327-
MetricsSnapshotHandler handler = new MetricsSnapshotHandlerClient(socket);
328-
ProteusMetricsExporter exporter =
329-
new ProteusMetricsExporter(
330-
null, registry, Duration.ofSeconds(exportFrequencySeconds), batchSize);
331-
exporter.run();
332-
proteus
333-
.onClose
334-
.doFinally(
335-
s -> {
336-
exporter.dispose();
337-
})
338-
.subscribe();
339-
}
340-
341290
return proteus;
342291
});
343292
}

client/src/main/java/io/netifi/proteus/presence/BrokerInfoPresenceNotifier.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@
1111
import reactor.core.publisher.DirectProcessor;
1212
import reactor.core.publisher.FluxProcessor;
1313
import reactor.core.publisher.Mono;
14+
import reactor.retry.Retry;
1415

15-
import java.util.*;
16+
import java.time.Duration;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Objects;
1621
import java.util.concurrent.ConcurrentHashMap;
1722
import java.util.concurrent.ConcurrentMap;
1823

client/src/main/java/io/netifi/proteus/rsocket/WeightedReconnectingRSocket.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.slf4j.LoggerFactory;
1616
import reactor.core.Disposable;
1717
import reactor.core.publisher.*;
18+
import reactor.retry.Jitter;
19+
import reactor.retry.Retry;
1820

1921
import java.time.Duration;
2022
import java.util.concurrent.TimeUnit;
@@ -257,7 +259,8 @@ void connect() {
257259
.doOnNext(
258260
_rSocket -> {
259261
availability = 1.0;
260-
ErrorOnDisconnectRSocket rSocket = new ErrorOnDisconnectRSocket(_rSocket);
262+
ErrorOnDisconnectRSocket rSocket =
263+
new ErrorOnDisconnectRSocket(_rSocket);
261264
_rSocket
262265
.onClose()
263266
.doFinally(
@@ -277,9 +280,9 @@ void connect() {
277280
synchronized (WeightedReconnectingRSocket.this) {
278281
connecting = false;
279282
}
280-
283+
281284
rSocket.dispose();
282-
285+
283286
connect();
284287
})
285288
.subscribe();

core/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ sourceCompatibility = 1.8
66
targetCompatibility = 1.8
77

88
dependencies {
9-
protobuf project (':metrics-idl')
109
compile project (':frames')
1110
compile 'io.opentracing:opentracing-api:0.31.0'
1211
compile 'javax.inject:javax.inject:1'
1312
compile 'com.google.protobuf:protobuf-java:3.6.0'
1413
compile 'io.rsocket:rsocket-core:0.11.3'
1514
compile 'io.rsocket:rsocket-transport-netty:0.11.3'
1615
compile 'io.micrometer:micrometer-core:1.0.3'
16+
17+
protobuf project(':metrics-idl')
18+
1719
testCompile 'io.opentracing.brave:brave-opentracing:0.31.2'
1820
testCompile 'io.zipkin.reporter2:zipkin-sender-okhttp3:2.7.6'
1921
}

core/src/main/java/io/netifi/proteus/metrics/ProteusMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class ProteusMetrics {
2727
.tags(tags)
2828
.register(registry);
2929
Timer timer =
30-
Timer.builder(name + ".latency").publishPercentiles(0.5, 0.9, 0.95, 0.99).tags(tags).register(registry);
30+
Timer.builder(name + ".latency").publishPercentiles(0.5, 0.9, 0.95, 0.99).tags(tags).register( registry);
3131
return Operators.lift(
3232
(scannable, subscriber) ->
3333
new ProteusMetricsSubscriber<>(subscriber, next, complete, error, cancelled, timer));

0 commit comments

Comments
 (0)