99/*
1010==================================================================
1111First created on: Mar/15/2020
12- Last modified on: Oct/04 /2023
12+ Last modified on: Oct/18 /2023
1313
1414This Java operator is an utility operator available in the
1515streamsx.websocket toolkit. It can be used to do HTTP(S) post of
@@ -56,9 +56,11 @@ TLS trust store and keystore creation (Java related) links:
5656import java .security .cert .CertificateException ;
5757import java .util .ArrayList ;
5858import java .util .HashMap ;
59+ import java .util .HashSet ;
5960import java .util .List ;
6061import java .util .Map ;
6162import java .util .Set ;
63+ import java .util .StringTokenizer ;
6264
6365import javax .net .ssl .SSLContext ;
6466import org .apache .http .conn .ssl .NoopHostnameVerifier ;
@@ -185,6 +187,9 @@ public class HttpPost extends AbstractOperator {
185187 private int maxRetryAttempts = 0 ;
186188 // Wait time is specified in Milliseconds
187189 private int waitTimeBetweenRetry = 2000 ;
190+ // Following instance variable will hold the HTTP status codes as
191+ // configured by the user for which we should trigger retry PUT, POST or GET.
192+ private Set <Integer > httpStatusCodesSet = new HashSet <Integer >();
188193
189194 // Create a HTTP put or post object.
190195 // As of May/24/2020, this operator supports the posting of
@@ -452,6 +457,9 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
452457 // No retry attempt count was configured for this operator instance.
453458 // So, no retry attempt needed and we can exit after just making the
454459 // intended PUT, POST or GET operation once.
460+ //
461+ // Set this operator metrics.
462+ nHttpPostFailed .increment ();
455463 break ;
456464 }
457465
@@ -464,6 +472,9 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
464472 ". Giving up the HTTP " + httpMethod +
465473 " operation for the current incoming tuple after making " +
466474 maxRetryAttempts + " retry attempts." );
475+
476+ // Set this operator metrics.
477+ nHttpPostFailed .increment ();
467478 break ;
468479 }
469480
@@ -687,14 +698,6 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
687698
688699 int responseStatusCode = response .getStatusLine ().getStatusCode ();
689700
690- // Update the operator metrics.
691- if (responseStatusCode < 200 || responseStatusCode > 299 ) {
692- nHttpPostFailed .increment ();
693- } else {
694- nDataItemsSent .increment ();
695- nDataBytesSent .setValue (dataBytesSent );
696- }
697-
698701 String responseStatusReason = response .getStatusLine ().getReasonPhrase ();
699702 // Let us now parse all the response headers and populate them in
700703 // a map to be sent in the output tuple later in the code below.
@@ -764,6 +767,28 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
764767 httpPost .releaseConnection ();
765768 }
766769
770+ // Before sending the output tuple, check if the HTTP status code that we received
771+ // now is configured by the user to trigger a retry of PUT or POST operation.
772+ if (httpStatusCodesSet .contains (new Integer (responseStatusCode ))) {
773+ Logger .getLogger (this .getClass ()).error ("Operator=" + context .getName () + ", PE=" +
774+ context .getPE ().getPEId () + ", Job=" + context .getPE ().getJobId () +
775+ ", Incoming tuple number=" + httpPostCnt +
776+ ". HTTP " + httpMethod +
777+ " operation for the current incoming tuple received a HTTP status code " +
778+ responseStatusCode + " that must trigger a retry as configured by the user." );
779+ // Continue the while loop to decide if we must make a
780+ // retry attempt after this executed PUT or POST operation.
781+ continue ;
782+ }
783+
784+ // Update the operator metrics.
785+ if (responseStatusCode < 200 || responseStatusCode > 299 ) {
786+ nHttpPostFailed .increment ();
787+ } else {
788+ nDataItemsSent .increment ();
789+ nDataBytesSent .setValue (dataBytesSent );
790+ }
791+
767792 outTuple .setInt ("statusCode" , responseStatusCode );
768793 outTuple .setString ("statusMessage" , responseStatusReason );
769794 outTuple .setMap ("responseHeaders" , responseHeadersMap );
@@ -914,14 +939,6 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
914939
915940 int responseStatusCode = response .getStatusLine ().getStatusCode ();
916941
917- // Update the operator metrics.
918- if (responseStatusCode < 200 || responseStatusCode > 299 ) {
919- nHttpPostFailed .increment ();
920- } else {
921- nDataItemsSent .increment ();
922- nDataBytesSent .setValue (dataBytesSent );
923- }
924-
925942 String responseStatusReason = response .getStatusLine ().getReasonPhrase ();
926943 // Let us now parse all the response headers and populate them in
927944 // a map to be sent in the output tuple later in the code below.
@@ -986,7 +1003,29 @@ public final void process(StreamingInput<Tuple> inputStream, Tuple tuple) throws
9861003 // If we don't do this, it will start hanging when doing the
9871004 // next HTTP GET/PUT/POST for the next incoming tuple.
9881005 httpGet .releaseConnection ();
989-
1006+
1007+ // Before sending the output tuple, check if the HTTP status code that we received
1008+ // now is configured by the user to trigger a retry of GET operation.
1009+ if (httpStatusCodesSet .contains (new Integer (responseStatusCode ))) {
1010+ Logger .getLogger (this .getClass ()).error ("Operator=" + context .getName () + ", PE=" +
1011+ context .getPE ().getPEId () + ", Job=" + context .getPE ().getJobId () +
1012+ ", Incoming tuple number=" + httpPostCnt +
1013+ ". HTTP " + httpMethod +
1014+ " operation for the current incoming tuple received a HTTP status code " +
1015+ responseStatusCode + " that must trigger a retry as configured by the user." );
1016+ // Continue the while loop to decide if we must make a
1017+ // retry attempt after this executed GET operation.
1018+ continue ;
1019+ }
1020+
1021+ // Update the operator metrics.
1022+ if (responseStatusCode < 200 || responseStatusCode > 299 ) {
1023+ nHttpPostFailed .increment ();
1024+ } else {
1025+ nDataItemsSent .increment ();
1026+ nDataBytesSent .setValue (dataBytesSent );
1027+ }
1028+
9901029 outTuple .setInt ("statusCode" , responseStatusCode );
9911030 outTuple .setString ("statusMessage" , responseStatusReason );
9921031 outTuple .setMap ("responseHeaders" , responseHeadersMap );
@@ -1154,7 +1193,7 @@ public void setMaxRetryAttempts(int val) {
11541193 if (val >= 0 ) {
11551194 maxRetryAttempts = val ;
11561195 }
1157- }
1196+ }
11581197
11591198 @ Parameter (name ="waitTimeBetweenRetry" ,
11601199 description ="This parameter specifies wait time in milliseconds between retry attempts after an error on a PUT, POST or GET operation for a given incoming tuple. Use this feature sparingly so as not to cause back pressure when this operator is processing a high volume of incoming tuples. (Default: 2000 milliseconds)" , optional =true )
@@ -1164,6 +1203,26 @@ public void setWaitTimeBetweenRetry(int val) {
11641203 waitTimeBetweenRetry = val ;
11651204 }
11661205 }
1206+
1207+ @ Parameter (name ="httpStatusCodesThatRequireRetry" ,
1208+ description ="This parameter specifies a comma separated string containing zero or more HTTP status codes that must trigger a retry attempt on a PUT, POST or GET operation for a given incoming tuple. e-g: '503, 408, 504' (Default: Empty string)" , optional =true )
1209+ public void setHttpStatusCodesThatRequireRetry (String val ) {
1210+ // We will parse this comma separated string containing zero or more
1211+ // HTTP status codes and store them as elements in a set.
1212+ if (val .length () > 0 ) {
1213+ // Tokenize the given string using comma as a separator.
1214+ StringTokenizer st = new StringTokenizer (val , "," );
1215+
1216+ // Iterate through the available tokens and store them in the set as an Integer.
1217+ while (st .hasMoreTokens ()) {
1218+ String token = st .nextToken ();
1219+ // Trim the leading and trailing space characters.
1220+ token = token .trim ();
1221+ Integer httpStatusCode = Integer .parseInt (token );
1222+ httpStatusCodesSet .add (httpStatusCode );
1223+ }
1224+ }
1225+ }
11671226
11681227 public static final String DESC = "This operator posts/sends the incoming tuple's text or binary content to a " +
11691228 "HTTP or HTTPS persistent (Keep-Alive) or non-persistent endpoint specified in the operator parameter named url. " +
0 commit comments