Skip to content

Commit 2e89aaa

Browse files
BrandonArpvjkoskela
authored andcommitted
add a configuration flag to skip cluster aggregations (#70)
1 parent d9d3539 commit 2e89aaa

File tree

3 files changed

+46
-5
lines changed

3 files changed

+46
-5
lines changed

src/main/java/com/arpnetworking/clusteraggregator/client/AggClientConnection.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,17 @@ public class AggClientConnection extends AbstractActor {
5555
* @param connection Reference to the client connection actor.
5656
* @param remote The address of the client socket.
5757
* @param maxConnectionAge The maximum duration to keep a connection open before cycling it.
58+
* @param calculateAggregates True to compute cluster aggregations, false to only publish host aggregations
5859
* @return A new <code>Props</code>.
5960
*/
60-
public static Props props(final ActorRef connection, final InetSocketAddress remote, final FiniteDuration maxConnectionAge) {
61-
return Props.create(AggClientConnection.class, connection, remote, maxConnectionAge);
61+
public static Props props(
62+
final ActorRef connection,
63+
final InetSocketAddress remote,
64+
final FiniteDuration maxConnectionAge,
65+
final boolean calculateAggregates) {
66+
return Props.create(
67+
AggClientConnection.class,
68+
() -> new AggClientConnection(connection, remote, maxConnectionAge, calculateAggregates));
6269
}
6370

6471
/**
@@ -67,13 +74,16 @@ public static Props props(final ActorRef connection, final InetSocketAddress rem
6774
* @param connection Reference to the client connection actor.
6875
* @param remote The address of the client socket.
6976
* @param maxConnectionAge The maximum duration to keep a connection open before cycling it.
77+
* @param calculateAggregates True to compute cluster aggregations, false to only publish host aggregations
7078
*/
7179
public AggClientConnection(
7280
final ActorRef connection,
7381
final InetSocketAddress remote,
74-
final FiniteDuration maxConnectionAge) {
82+
final FiniteDuration maxConnectionAge,
83+
final boolean calculateAggregates) {
7584
_connection = connection;
7685
_remoteAddress = remote;
86+
_calculateAggregates = calculateAggregates;
7787

7888
getContext().watch(connection);
7989

@@ -150,7 +160,11 @@ private void processMessages() {
150160
.addData("aggregation", setRecord)
151161
.addContext("actor", self())
152162
.log();
153-
getContext().parent().tell(setRecord, getSelf());
163+
// StatisticSetRecords get forwarded to the parent, who then forwards them to the shard for cluster aggregating
164+
// If we aren't doing shard aggregating, don't forward it
165+
if (_calculateAggregates) {
166+
getContext().parent().tell(setRecord, getSelf());
167+
}
154168
if (setRecord.getStatisticsCount() > 0) {
155169
final Optional<PeriodicData> periodicData = buildPeriodicData(setRecord);
156170
if (periodicData.isPresent()) {
@@ -268,6 +282,7 @@ private Optional<PeriodicData> buildPeriodicData(final Messages.StatisticSetReco
268282
private ByteString _buffer = ByteString.empty();
269283
private final ActorRef _connection;
270284
private final InetSocketAddress _remoteAddress;
285+
private final boolean _calculateAggregates;
271286
private static final Logger LOGGER = LoggerFactory.getLogger(AggClientConnection.class);
272287
private static final Logger INCOMPLETE_RECORD_LOGGER = LoggerFactory.getRateLimitLogger(
273288
AggClientConnection.class,

src/main/java/com/arpnetworking/clusteraggregator/client/AggClientSupervisor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public AggClientSupervisor(
6161
_emitter = emitter;
6262
_minConnectionTimeout = configuration.getMinConnectionTimeout();
6363
_maxConnectionTimeout = configuration.getMaxConnectionTimeout();
64+
_calculateClusterAggregates = configuration.getCalculateClusterAggregations();
6465
}
6566

6667
@Override
@@ -75,7 +76,11 @@ public Receive createReceive() {
7576
final ActorRef connection = getSender();
7677

7778
final ActorRef handler = getContext().actorOf(
78-
AggClientConnection.props(connection, connected.remoteAddress(), getRandomConnectionTime()),
79+
AggClientConnection.props(
80+
connection,
81+
connected.remoteAddress(),
82+
getRandomConnectionTime(),
83+
_calculateClusterAggregates),
7984
"dataHandler");
8085
connection.tell(TcpMessage.register(handler, true, true), getSelf());
8186
getContext().watch(handler);
@@ -124,6 +129,7 @@ public SupervisorStrategy supervisorStrategy() {
124129
private final ActorRef _emitter;
125130
private final Period _minConnectionTimeout;
126131
private final Period _maxConnectionTimeout;
132+
private final Boolean _calculateClusterAggregates;
127133
private final Random _random = new Random();
128134
private static final Logger LOGGER = LoggerFactory.getLogger(AggClientSupervisor.class);
129135
}

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ public String getClusterHostSuffix() {
114114
return _clusterHostSuffix;
115115
}
116116

117+
public boolean getCalculateClusterAggregations() {
118+
return _calculateClusterAggregations;
119+
}
120+
117121
@Override
118122
public String toString() {
119123
return MoreObjects.toStringHelper(this)
@@ -155,6 +159,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
155159
_jvmMetricsCollectionInterval = builder._jvmMetricsCollectionInterval;
156160
_rebalanceConfiguration = builder._rebalanceConfiguration;
157161
_clusterHostSuffix = builder._clusterHostSuffix;
162+
_calculateClusterAggregations = builder._calculateClusterAggregations;
158163
_databaseConfigurations = Maps.newHashMap(builder._databaseConfigurations);
159164
}
160165

@@ -174,6 +179,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
174179
private final Period _jvmMetricsCollectionInterval;
175180
private final RebalanceConfiguration _rebalanceConfiguration;
176181
private final String _clusterHostSuffix;
182+
private final boolean _calculateClusterAggregations;
177183
private final Map<String, DatabaseConfiguration> _databaseConfigurations;
178184

179185
private static final InterfaceDatabase INTERFACE_DATABASE = ReflectionsDatabase.newInstance();
@@ -387,6 +393,18 @@ public Builder setDatabaseConfigurations(final Map<String, DatabaseConfiguration
387393
return this;
388394
}
389395

396+
/**
397+
* Whether or not to perform cluster-level aggregations. When using a datasource that supports
398+
* native histograms, turning this off will reduce cpu cost. Optional. Defaults to true.
399+
*
400+
* @param value true to perform cluster aggregations, false to just forward host data.
401+
* @return This instance of <code>Builder</code>.
402+
*/
403+
public Builder setCalculateClusterAggregations(final Boolean value) {
404+
_calculateClusterAggregations = value;
405+
return this;
406+
}
407+
390408
@NotNull
391409
@NotEmpty
392410
private String _monitoringCluster;
@@ -426,6 +444,8 @@ public Builder setDatabaseConfigurations(final Map<String, DatabaseConfiguration
426444
private RebalanceConfiguration _rebalanceConfiguration;
427445
@NotNull
428446
private String _clusterHostSuffix = "";
447+
@NotNull
448+
private Boolean _calculateClusterAggregations = true;
429449
private Map<String, DatabaseConfiguration> _databaseConfigurations;
430450
}
431451
}

0 commit comments

Comments
 (0)