2626import com .arpnetworking .metrics .incubator .PeriodicMetrics ;
2727import com .arpnetworking .metrics .proxy .models .messages .Command ;
2828import com .arpnetworking .metrics .proxy .models .messages .Connect ;
29+ import com .arpnetworking .metrics .proxy .models .messages .MetricReport ;
2930import com .arpnetworking .metrics .proxy .models .protocol .MessageProcessorsFactory ;
3031import com .arpnetworking .metrics .proxy .models .protocol .MessagesProcessor ;
3132import com .arpnetworking .steno .LogValueMapFactory ;
3233import com .arpnetworking .steno .Logger ;
3334import com .arpnetworking .steno .LoggerFactory ;
35+ import com .arpnetworking .tsdcore .model .AggregatedData ;
36+ import com .arpnetworking .tsdcore .model .Key ;
37+ import com .arpnetworking .tsdcore .model .PeriodicData ;
3438import com .fasterxml .jackson .core .JsonProcessingException ;
3539import com .fasterxml .jackson .databind .ObjectMapper ;
3640import com .fasterxml .jackson .databind .node .JsonNodeFactory ;
3741import com .fasterxml .jackson .databind .node .ObjectNode ;
42+ import com .google .common .collect .Maps ;
43+ import com .google .common .collect .Sets ;
3844
3945import java .util .List ;
46+ import java .util .Map ;
47+ import java .util .Set ;
4048
4149/**
4250 * Actor class to hold the state for a single connection.
@@ -96,6 +104,9 @@ public Receive createReceive() {
96104 .log ();
97105 getSelf ().tell (PoisonPill .getInstance (), getSelf ());
98106 })
107+ .match (PeriodicData .class , message -> {
108+ processPeriodicData (message );
109+ })
99110 .matchAny (message -> {
100111 if (_channel == null ) {
101112 LOGGER .warn ()
@@ -179,6 +190,52 @@ public void sendCommand(final String command, final ObjectNode data) {
179190 send (message );
180191 }
181192
193+ /**
194+ * Subscribe the connection to a stream.
195+ *
196+ * @param service The service to subscribe to.
197+ * @param metric The metric to subscribe to.
198+ * @param statistic The statistic to subscribe to.
199+ */
200+ public void subscribe (final String service , final String metric , final String statistic ) {
201+ if (!_subscriptions .containsKey (service )) {
202+ _subscriptions .put (service , Maps .newHashMap ());
203+ }
204+
205+ final Map <String , Set <String >> metrics = _subscriptions .get (service );
206+ if (!metrics .containsKey (metric )) {
207+ metrics .put (metric , Sets .newHashSet ());
208+ }
209+
210+ final Set <String > statistics = metrics .get (metric );
211+ if (!statistics .contains (statistic )) {
212+ statistics .add (statistic );
213+ }
214+ }
215+
216+ /**
217+ * Unsubscribe the connection from a stream.
218+ *
219+ * @param service The service to unsubscribe from.
220+ * @param metric The metric to unsubscribe from.
221+ * @param statistic The statistic to unsubscribe from.
222+ */
223+ public void unsubscribe (final String service , final String metric , final String statistic ) {
224+ if (!_subscriptions .containsKey (service )) {
225+ return ;
226+ }
227+
228+ final Map <String , Set <String >> metrics = _subscriptions .get (service );
229+ if (!metrics .containsKey (metric )) {
230+ return ;
231+ }
232+
233+ final Set <String > statistics = metrics .get (metric );
234+ if (statistics .contains (statistic )) {
235+ statistics .remove (statistic );
236+ }
237+ }
238+
182239 /**
183240 * Accessor to this Connection's Telemetry actor.
184241 *
@@ -198,6 +255,7 @@ public Object toLogValue() {
198255 return LogValueMapFactory .builder (this )
199256 .put ("connection" , _channel )
200257 .put ("messageProcessors" , _messageProcessors )
258+ .put ("subscriptions" , _subscriptions )
201259 .build ();
202260 }
203261
@@ -206,11 +264,66 @@ public String toString() {
206264 return toLogValue ().toString ();
207265 }
208266
267+ private void processPeriodicData (final PeriodicData message ) {
268+ final Key dimensions = message .getDimensions ();
269+ final String host = dimensions .getHost ();
270+ final String service = dimensions .getService ();
271+ final Map <String , Set <String >> metrics = _subscriptions .get (service );
272+ if (metrics == null ) {
273+ LOGGER .trace ()
274+ .setMessage ("Not sending MetricReport" )
275+ .addData ("reason" , "service not found in subscriptions" )
276+ .addData ("service" , service )
277+ .log ();
278+ return ;
279+ }
280+
281+ for (final Map .Entry <String , AggregatedData > entry : message .getData ().entries ()) {
282+ final String metric = entry .getKey ();
283+ final AggregatedData datum = entry .getValue ();
284+
285+ final Set <String > stats = metrics .get (metric );
286+ if (stats == null ) {
287+ LOGGER .trace ()
288+ .setMessage ("Not sending MetricReport" )
289+ .addData ("reason" , "metric not found in subscriptions" )
290+ .addData ("metric" , metric )
291+ .log ();
292+ continue ;
293+ }
294+
295+ final String statisticName = datum .getStatistic ().getName ();
296+ if (!stats .contains (statisticName )) {
297+ LOGGER .trace ()
298+ .setMessage ("Not sending MetricReport" )
299+ .addData ("reason" , "statistic not found in subscriptions" )
300+ .addData ("statistic" , statisticName )
301+ .log ();
302+ continue ;
303+ }
304+
305+ final MetricReport metricReport = new MetricReport (
306+ service ,
307+ host ,
308+ statisticName ,
309+ metric ,
310+ datum .getValue ().getValue (),
311+ datum .getValue ().getUnit (),
312+ message .getStart ());
313+ for (final MessagesProcessor messagesProcessor : _messageProcessors ) {
314+ if (messagesProcessor .handleMessage (metricReport )) {
315+ break ;
316+ }
317+ }
318+ }
319+ }
320+
209321 private ActorRef _telemetry ;
210322 private ActorRef _channel ;
211323
212324 private final PeriodicMetrics _metrics ;
213325 private final List <MessagesProcessor > _messageProcessors ;
326+ private final Map <String , Map <String , Set <String >>> _subscriptions = Maps .newHashMap ();
214327
215328 private static final String METRICS_PREFIX = "actors/connection/" ;
216329 private static final String UNKNOWN_COMMAND_COUNTER = METRICS_PREFIX + "command/UNKNOWN" ;
0 commit comments