4242import com .ibm .streams .operator .model .Parameter ;
4343import com .ibm .streams .operator .model .PrimitiveOperator ;
4444import com .ibm .streams .operator .samples .patterns .TupleConsumer ;
45+ import com .ibm .streams .operator .metrics .Metric ;
46+ import com .ibm .streams .operator .model .CustomMetric ;
4547
4648@ PrimitiveOperator (
4749 name ="SendSlackMessage" ,
6668 })
6769public class SendSlackMessage extends TupleConsumer {
6870
71+ /**
72+ * Metrics
73+ */
74+ private Metric nFailedRequests ;
75+ private Metric nInsertedMessages ;
76+ private Metric rateLimitExceeded ;
77+
6978 // ------------------------------------------------------------------------
7079 // Documentation.
7180 // Attention: To add a newline, use \\n instead of \n.
@@ -83,6 +92,34 @@ public class SendSlackMessage extends TupleConsumer {
8392
8493
8594 ;
95+
96+ @ CustomMetric (name = "rateLimitExceeded" , kind = Metric .Kind .GAUGE ,
97+ description = "Describes whether we exceed a rate limit when sending messages to Slack server. "
98+ + "This is set to 1 after response 429 is received. Otherwise the value is 0. "
99+ )
100+ public void setRateLimitExceeded (Metric rateLimitExceeded ) {
101+ this .rateLimitExceeded = rateLimitExceeded ;
102+ }
103+
104+ /**
105+ * nFailedRequests describes the number of failed requests over the lifetime of the operator.
106+ * @param nFailedRequests
107+ */
108+ @ CustomMetric (name = "nFailedRequests" , kind = Metric .Kind .COUNTER ,
109+ description = "The number of failed messages over the lifetime of the operator." )
110+ public void setnFailedRequests (Metric nFailedRequests ) {
111+ this .nFailedRequests = nFailedRequests ;
112+ }
113+
114+ /**
115+ * nInsertedMessages describes the number of successful requests over the lifetime of the operator.
116+ * @param nInsertedMessages
117+ */
118+ @ CustomMetric (name = "nInsertedMessages" , kind = Metric .Kind .COUNTER ,
119+ description = "The number of successful messages over the lifetime of the operator." )
120+ public void setnInsertedMessages (Metric nInsertedMessages ) {
121+ this .nInsertedMessages = nInsertedMessages ;
122+ }
86123
87124 @ ContextCheck (runtime =false )
88125 public static void validateSchema (OperatorContextChecker checker ) {
@@ -137,7 +174,9 @@ private static boolean isStringAttribute(StreamSchema schema) {
137174 description ="Specifies the Slack incoming web hook URL to send messages to."
138175 )
139176 public void setSlackUrl (String slackUrl ) throws IOException {
140- this .slackUrl = slackUrl ;
177+ if (!("" .equals (slackUrl ))) {
178+ this .slackUrl = slackUrl ;
179+ }
141180 }
142181
143182 @ Parameter (
@@ -268,15 +307,27 @@ protected boolean processBatch(Queue<BatchedTuple> batch) throws Exception {
268307 // Send successful - remove message from batch queue.
269308 if (responseCode == HttpStatus .SC_OK ) {
270309 batch .remove ();
271-
310+ this .nInsertedMessages .increment ();
311+ this .rateLimitExceeded .setValue (0 );
272312 // Can only send 1 message to Slack, per second.
273313 Thread .sleep (1000 );
274314 } else {
315+ this .nFailedRequests .increment ();
275316 _trace .error (responseCode + response .toString ());
276317
277318 // With a 404 maybe a URL has been updated in the application config.
278- if (responseCode == HttpStatus .SC_NOT_FOUND )
319+ if (responseCode == HttpStatus .SC_NOT_FOUND ) {
279320 fetchApplicationProperties ();
321+ }
322+ else if (responseCode == 429 ) {
323+ // HTTP 429 Too Many Requests
324+ this .rateLimitExceeded .setValue (1 );
325+ // TODO check Retry-After HTTP header
326+ Thread .sleep (10000 );
327+ }
328+ else {
329+ this .rateLimitExceeded .setValue (0 );
330+ }
280331 }
281332
282333 return true ;
@@ -335,3 +386,4 @@ protected void fetchApplicationProperties() {
335386 }
336387}
337388
389+
0 commit comments