33import java .io .OutputStream ;
44import java .text .MessageFormat ;
55import java .util .Arrays ;
6+ import java .util .Collection ;
67import java .util .Map ;
78import java .util .TreeMap ;
89import java .util .regex .Matcher ;
@@ -25,6 +26,9 @@ public abstract class CollectDPluginParent {
2526 private static final String GAUGE = "GAUGE" ;
2627 private static final String DERIVE = "DERIVE" ;
2728 private static final String ABSOLUTE = "ABSOLUTE" ;
29+ private static final Collection <String > HADOOP_CONTEXTS = Arrays .asList ("NameNode" , "DataNode" , "JobManager" , "NodeManager" , "JobHistoryServer" ,
30+ "ResourceManager" , "MRAppMaster" );
31+ private static final Pattern HADOOP_PATTERN_4_GROUPS = Pattern .compile ("([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)" );
2832 private static final Pattern STATSD_PATTERN_3_GROUPS = Pattern .compile ("([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)" );
2933 private static final Pattern STATSD_PATTERN_4_GROUPS = Pattern .compile ("([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)" );
3034 private static final Pattern STATSD_PATTERN_6_GROUPS = Pattern .compile ("([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_]+)\\ .([\\ w-_#]+)\\ .([\\ w-_]+)" );
@@ -64,15 +68,17 @@ public int config(OConfigItem config) {
6468
6569 protected void addTag (Map <String ,String > m , String k , String v ) {
6670 if (k != null && !k .isBlank () && v != null && !v .isBlank ()) {
71+ k = k .replaceAll ("=" , ":" );
72+ v = v .replaceAll ("=" , ":" );
6773 m .put (k .trim (), v .trim ());
6874 }
6975 }
7076
7177 protected void addTag (Map <String ,String > m , String kv ) {
7278 if (kv != null && !kv .isBlank ()) {
7379 String [] parts = kv .split ("=" );
74- if (parts .length == 2 && ! parts [ 0 ]. isBlank () && ! parts [ 1 ]. isBlank () ) {
75- m . put ( parts [0 ]. trim () , parts [1 ]. trim () );
80+ if (parts .length == 2 ) {
81+ addTag ( m , parts [0 ], parts [1 ]);
7682 }
7783 }
7884 }
@@ -99,9 +105,10 @@ private void processInternal(ValueList vl, OutputStream out) {
99105 } else {
100106 addTag (tagMap , "host" , host );
101107 }
102- int nIdx = host .lastIndexOf ('n' );
108+ String [] hostSplit = host .split ("\\ ." );
109+ int nIdx = hostSplit [0 ].lastIndexOf ('n' );
103110 if (-1 != nIdx ) {
104- addTag (tagMap , "rack" , host .substring (0 , nIdx ));
111+ addTag (tagMap , "rack" , hostSplit [ 0 ] .substring (0 , nIdx ));
105112 }
106113 tagMap .putAll (addlTags );
107114
@@ -112,23 +119,53 @@ private void processInternal(ValueList vl, OutputStream out) {
112119
113120 switch (plugin ) {
114121 case STATSD_PREFIX :
122+ Matcher hadoop_4_groups = HADOOP_PATTERN_4_GROUPS .matcher (typeInstance );
115123 Matcher statsd_3_groups = STATSD_PATTERN_3_GROUPS .matcher (typeInstance );
116124 Matcher statsd_4_groups = STATSD_PATTERN_4_GROUPS .matcher (typeInstance );
117125 Matcher statsd_6_groups = STATSD_PATTERN_6_GROUPS .matcher (typeInstance );
118126 String instance = null ;
119127 if (!typeInstance .startsWith ("nsq" )) {
120128 String [] parts = typeInstance .split ("\\ ." );
121- if (parts .length % 2 == 1 ) {
122- if (parts [0 ].equals ("dwquery" ) && parts .length >= 4 ) {
123- metric .append (STATSD_PREFIX ).append (PERIOD ).append (parts [1 ]).append (PERIOD ).append (parts [3 ]);
124- addTag (tagMap , "queryId" , parts [0 ]);
125- } else {
126- // EtsyStatsD format -- metric.(tagName.tagValue)*
127- metric .append (STATSD_PREFIX ).append (PERIOD ).append (parts [0 ]);
128- for (int x = 1 ; x < parts .length ; x += 2 ) {
129- addTag (tagMap , parts [x ], parts [x + 1 ]);
129+ if (parts .length >= 4 && (HADOOP_CONTEXTS .contains (parts [0 ]) || hadoop_4_groups .matches ())) {
130+ // Here we are processing the statsd metrics coming from the Hadoop Metrics2 StatsDSink without the host name.
131+ // The format of metric is: serviceName.contextName.recordName.metricName
132+ // The serviceName is used as the instance.
133+ instance = parts [0 ];
134+ String contextName = parts [1 ];
135+ int firstPeriod = typeInstance .indexOf ("." );
136+ int lastPeriod = typeInstance .lastIndexOf ("." );
137+ String metricName = typeInstance .substring (lastPeriod + 1 );
138+ String recordName = typeInstance .substring (firstPeriod + 1 , lastPeriod );
139+ // the recordName may contain useful tags such as op=<operation> and user<user>
140+ // in addition to a possibly duplicative recordName
141+ String [] recordNameSplits = recordName .split ("\\ ." );
142+ metric .append (STATSD_PREFIX ).append (PERIOD ).append (contextName ).append (PERIOD );
143+ for (String split : recordNameSplits ) {
144+ // avoid duplication with either the contextName or instance
145+ if (!split .equalsIgnoreCase (contextName ) && !split .equalsIgnoreCase (instance )) {
146+ if (split .startsWith ("NNTopUserOpCounts" ) && split .contains ("windowMs=" )) {
147+ metric .append ("NNTopUserOpCounts" ).append (PERIOD );
148+ addTag (tagMap , split .substring (split .indexOf ("windowMs=" )));
149+ } else if (split .contains ("=" )) {
150+ addTag (tagMap , split );
151+ } else {
152+ addTag (tagMap , "record" , split );
153+ }
130154 }
131155 }
156+ metric .append (metricName );
157+ } else if (parts .length >= 2 && parts [1 ].equals ("dwquery" )) {
158+ // These come from Datawave's QueryIterator metrics - the first split part is the queryId
159+ // Use statsd plus the rest of the typeInstance after the first period as the metric name
160+ int x = typeInstance .indexOf ("." );
161+ metric .append (STATSD_PREFIX ).append (PERIOD ).append (typeInstance .substring (x + 1 ));
162+ addTag (tagMap , "queryId" , parts [0 ]);
163+ } else if (parts .length % 2 == 1 ) {
164+ // EtsyStatsD format -- metric.(tagName.tagValue)*
165+ metric .append (STATSD_PREFIX ).append (PERIOD ).append (parts [0 ]);
166+ for (int x = 1 ; x < parts .length ; x += 2 ) {
167+ addTag (tagMap , parts [x ], parts [x + 1 ]);
168+ }
132169 } else if (statsd_4_groups .matches ()) {
133170 // Here we are processing the statsd metrics coming from the Hadoop Metrics2 StatsDSink without the host name.
134171 // The format of metric is: serviceName.contextName.recordName.metricName. The recordName is typically duplicative
@@ -218,6 +255,12 @@ private void processInternal(ValueList vl, OutputStream out) {
218255 metric .append ("sys." ).append (plugin ).append (PERIOD ).append (type );
219256 addTag (tagMap , INSTANCE , typeInstance .replaceAll (" " , "_" ));
220257 break ;
258+ case "load" :
259+ metric .append ("sys.load." ).append (plugin ).append (PERIOD ).append (type );
260+ // plugin returns 1 min, 5 min, 15 min averages, but Timely can
261+ // downsample on its own so we only use the first (1 min) value
262+ vl .setValues (Arrays .asList (vl .getValues ().get (0 )));
263+ break ;
221264 case "GenericJMX" :
222265 metric .append ("sys." ).append (plugin ).append (PERIOD ).append (type ).append (PERIOD ).append (typeInstance );
223266 String [] pluginInstanceSplit = pluginInstance .split ("-" );
@@ -227,6 +270,7 @@ private void processInternal(ValueList vl, OutputStream out) {
227270 if (pluginInstanceSplit .length > 1 ) {
228271 addTag (tagMap , NAME , pluginInstanceSplit [1 ].replaceAll (" " , "_" ));
229272 }
273+ break ;
230274 default :
231275 if (notEmpty (type ) && notEmpty (typeInstance ) && notEmpty (plugin ) && notEmpty (pluginInstance )) {
232276 metric .append ("sys." ).append (plugin ).append (PERIOD ).append (type ).append (PERIOD ).append (typeInstance );
@@ -253,9 +297,15 @@ private void processInternal(ValueList vl, OutputStream out) {
253297 }
254298 Double value = vl .getValues ().get (i ).doubleValue ();
255299 String tagString = tagMap .entrySet ().stream ().map (e -> e .getKey () + "=" + e .getValue ()).collect (Collectors .joining (" " ));
256- String datapoint = MessageFormat .format (PUT , metricName , timestamp .toString (), value .toString (), tagString );
257- logDebug (String .format ("Writing: %s" , datapoint ));
258- write (datapoint + "\n " , out );
300+
301+ if (metricName .isBlank () || timestamp == null || value == null ) {
302+ logDebug (String .format ("Not writing unhandled metric: plugin:%s pluginInstance:%s type:%s typeInstance:%s" ,
303+ plugin , pluginInstance , type , typeInstance ));
304+ } else {
305+ String datapoint = MessageFormat .format (PUT , metricName , timestamp .toString (), value .toString (), tagString );
306+ logDebug (String .format ("Writing: %s" , datapoint ));
307+ write (datapoint + "\n " , out );
308+ }
259309 }
260310 }
261311
@@ -306,5 +356,4 @@ private String convertType(int type) {
306356 private boolean notEmpty (String arg ) {
307357 return (null != arg ) && !(arg .equals ("" ));
308358 }
309-
310359}
0 commit comments