32
32
import com .arpnetworking .tsdcore .model .FQDSN ;
33
33
import com .arpnetworking .tsdcore .model .PeriodicData ;
34
34
import com .arpnetworking .tsdcore .statistics .Statistic ;
35
- import com .arpnetworking .tsdcore .statistics .StatisticFactory ;
36
35
import com .google .common .base .Optional ;
37
36
import com .google .common .collect .ImmutableList ;
38
37
import com .google .common .collect .ImmutableMap ;
42
41
import java .net .InetAddress ;
43
42
import java .net .InetSocketAddress ;
44
43
import java .net .UnknownHostException ;
44
+ import java .time .Duration ;
45
45
import java .util .Collections ;
46
46
import java .util .Map ;
47
47
import java .util .regex .Pattern ;
@@ -161,8 +161,10 @@ private void processMessages() {
161
161
.log ();
162
162
getContext ().parent ().tell (setRecord , getSelf ());
163
163
if (setRecord .getStatisticsCount () > 0 ) {
164
- final PeriodicData periodicData = buildPeriodicData (setRecord );
165
- getContext ().parent ().tell (periodicData , self ());
164
+ final Optional <PeriodicData > periodicData = buildPeriodicData (setRecord );
165
+ if (periodicData .isPresent ()) {
166
+ getContext ().parent ().tell (periodicData .get (), self ());
167
+ }
166
168
}
167
169
} else if (gm instanceof Messages .HeartbeatRecord ) {
168
170
LOGGER .debug ()
@@ -182,7 +184,7 @@ private void processMessages() {
182
184
if (!messageOptional .isPresent () && current .length () > 4 ) {
183
185
LOGGER .debug ()
184
186
.setMessage ("buffer did not deserialize completely" )
185
- .addData ("remainingBytes" , ( Integer ) current .length ())
187
+ .addData ("remainingBytes" , current .length ())
186
188
.addContext ("actor" , self ())
187
189
.log ();
188
190
}
@@ -191,9 +193,57 @@ private void processMessages() {
191
193
_buffer = current ;
192
194
}
193
195
194
- private PeriodicData buildPeriodicData (final Messages .StatisticSetRecord setRecord ) {
196
+ private Optional < PeriodicData > buildPeriodicData (final Messages .StatisticSetRecord setRecord ) {
195
197
final CombinedMetricData combinedMetricData = CombinedMetricData .Builder .fromStatisticSetRecord (setRecord ).build ();
196
198
final ImmutableList .Builder <AggregatedData > builder = ImmutableList .builder ();
199
+ final ImmutableMap .Builder <String , String > dimensionBuilder = ImmutableMap .builder ();
200
+
201
+ Optional <String > host = Optional .absent ();
202
+ Optional <String > service = Optional .absent ();
203
+ Optional <String > cluster = Optional .absent ();
204
+ for (final Messages .DimensionEntry dimensionEntry : setRecord .getDimensionsList ()) {
205
+ if (HOST_KEY .equals (dimensionEntry .getKey ())) {
206
+ host = Optional .fromNullable (dimensionEntry .getValue ());
207
+ } else if (SERVICE_KEY .equals (dimensionEntry .getKey ())) {
208
+ service = Optional .fromNullable (dimensionEntry .getValue ());
209
+ } else if (CLUSTER_KEY .equals (dimensionEntry .getKey ())) {
210
+ cluster = Optional .fromNullable (dimensionEntry .getValue ());
211
+ } else {
212
+ dimensionBuilder .put (dimensionEntry .getKey (), dimensionEntry .getValue ());
213
+ }
214
+ }
215
+
216
+ if (!service .isPresent ()) {
217
+ service = Optional .fromNullable (setRecord .getService ());
218
+ }
219
+
220
+ if (!cluster .isPresent ()) {
221
+ cluster = Optional .fromNullable (setRecord .getCluster ());
222
+ if (!cluster .isPresent ()) {
223
+ cluster = _clusterName ;
224
+ }
225
+ }
226
+
227
+ if (!host .isPresent ()) {
228
+ host = _hostName ;
229
+ }
230
+
231
+ dimensionBuilder .put (HOST_KEY , host .or ("" ));
232
+ dimensionBuilder .put (SERVICE_KEY , service .or ("" ));
233
+ dimensionBuilder .put (CLUSTER_KEY , cluster .or ("" ));
234
+
235
+ if (!(host .isPresent () && service .isPresent () && cluster .isPresent ())) {
236
+ INCOMPLETE_RECORD_LOGGER .warn ()
237
+ .setMessage ("Cannot process StatisticSet record, missing required fields." )
238
+ .addData ("host" , host )
239
+ .addData ("service" , service )
240
+ .addData ("cluster" , cluster )
241
+ .log ();
242
+ return Optional .absent ();
243
+ }
244
+
245
+ final ImmutableMap <String , String > dimensions = dimensionBuilder .build ();
246
+
197
247
for (final Map .Entry <Statistic , CombinedMetricData .StatisticValue > record
198
248
: combinedMetricData .getCalculatedValues ().entrySet ()) {
199
249
final AggregatedData aggregatedData = new AggregatedData .Builder ()
@@ -203,7 +253,7 @@ private PeriodicData buildPeriodicData(final Messages.StatisticSetRecord setReco
203
253
.setService (setRecord .getService ())
204
254
.setStatistic (record .getKey ())
205
255
.build ())
206
- .setHost (_hostName . or ( "" ))
256
+ .setHost (host . get ( ))
207
257
.setIsSpecified (record .getValue ().getUserSpecified ())
208
258
.setPeriod (combinedMetricData .getPeriod ())
209
259
.setPopulationSize (1L )
@@ -214,24 +264,29 @@ private PeriodicData buildPeriodicData(final Messages.StatisticSetRecord setReco
214
264
.build ();
215
265
builder .add (aggregatedData );
216
266
}
217
- return new PeriodicData .Builder ()
267
+ return Optional . of ( new PeriodicData .Builder ()
218
268
.setData (builder .build ())
219
269
.setConditions (ImmutableList .of ())
220
- .setDimensions (ImmutableMap . of ( "host" , _hostName . or ( "" )) )
270
+ .setDimensions (dimensions )
221
271
.setPeriod (combinedMetricData .getPeriod ())
222
272
.setStart (combinedMetricData .getPeriodStart ())
223
- .build ();
273
+ .build ()) ;
224
274
}
225
275
226
276
private Optional <String > _hostName = Optional .absent ();
227
277
private Optional <String > _clusterName = Optional .absent ();
228
278
private ByteString _buffer = ByteString .empty ();
229
279
private final ActorRef _connection ;
230
280
private final InetSocketAddress _remoteAddress ;
231
- private final StatisticFactory _statisticFactory = new StatisticFactory ();
232
281
private static final Logger LOGGER = LoggerFactory .getLogger (AggClientConnection .class );
233
-
282
+ private static final Logger INCOMPLETE_RECORD_LOGGER = LoggerFactory .getRateLimitLogger (
283
+ AggClientConnection .class ,
284
+ Duration .ofSeconds (30 ));
234
285
private static final boolean IS_ENABLED ;
286
+ private static final String HOST_KEY = "host" ;
287
+ private static final String SERVICE_KEY = "service" ;
288
+ private static final String CLUSTER_KEY = "cluster" ;
289
+
235
290
236
291
static {
237
292
// Determine the local host name
0 commit comments