Skip to content

Commit 3594512

Browse files
committed
Changes done for v1.1.4.
1 parent 965c246 commit 3594512

File tree

18 files changed

+448
-83
lines changed

18 files changed

+448
-83
lines changed

com.ibm.streamsx.websocket/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
Changes
22
=======
3+
## v1.1.4:
4+
* Mar/20/2023
5+
* Made the second output port of the WebSocketSendReceive operator optional.
6+
* Made the metrics counters for the three WebSocket operators to be reset to zero at a user defined time interval via a new optional parameter named metricsResetInterval.
7+
38
## v1.1.3:
49
* Feb/01/2023
510
* Added a code fix to try/catch the invalid state exception thrown from within the websocketpp send method when sending data to a remote client or server and at that exact time that remote client or server closing its WebSocket connection due to that client or server application being shut down or for any other reason. This change was done for all the three WebSocket operators in this toolkit.

com.ibm.streamsx.websocket/com.ibm.streamsx.websocket.op/WebSocketSendReceive/WebSocketSendReceive.xml

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@
4444
remote WebSocket server. This input port must have either rstring strData or
4545
blob blobData or both of them as the incoming tuple attribute(s).
4646

47-
This operator provides two output ports. On the first output port, it keeps returning the data
47+
This operator provides two output ports where the first output port is mandatory and
48+
the second output port is optional. On the first output port, it keeps returning the data
4849
received from the remote server. This output port must have either rstring strData or
4950
blob blobData or both of them as the output tuple attribute(s). Optionally, it can have
5051
uint64 numberOfDataItemsReceived and uint64 numberOfDataBytesReceived attributes to be
51-
assigned via the output functions provided by this operator. On the second output port,
52+
assigned via the output functions provided by this operator. If the second output port is present,
5253
it keeps returning the result about whether a given incoming tuple was successfully
5354
sent to the remote server or not. This output port must have int32 sendResultCode and
5455
rstring sendResultMessage as its output tuple attributes. A value of 0 in sendResultCode means
@@ -314,6 +315,16 @@
314315
<type>boolean</type>
315316
<cardinality>1</cardinality>
316317
</parameter>
318+
319+
<parameter>
320+
<name>metricsResetInterval</name>
321+
<description>This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)</description>
322+
<optional>true</optional>
323+
<rewriteAllowed>true</rewriteAllowed>
324+
<expressionMode>AttributeFree</expressionMode>
325+
<type>uint32</type>
326+
<cardinality>1</cardinality>
327+
</parameter>
317328
</parameters>
318329

319330
<inputPorts>
@@ -367,9 +378,10 @@
367378

368379
<outputPortSet>
369380
<description>
370-
This port returns the result about whether a given input tuple consumed by this
371-
operator was successfully sent to the remote WebSocket server or not. The schema
372-
for this port must have two attributes: int32 sendResultCode, rstring sendResultMessage.
381+
This is an optional output port. If it is present, it returns the result about
382+
whether a given input tuple consumed by this operator was successfully sent to
383+
the remote WebSocket server or not. The schema for this port must have two attributes:
384+
int32 sendResultCode, rstring sendResultMessage.
373385
sendResultCode will carry a value of 0 on a successful send to the remote server and a
374386
non-zero result in case of an error in sending the data. sendResultMessage will
375387
carry a descriptive message about the send result. Application logic can check these
@@ -383,7 +395,7 @@
383395
<windowPunctuationOutputMode>Free</windowPunctuationOutputMode>
384396
<tupleMutationAllowed>false</tupleMutationAllowed>
385397
<cardinality>1</cardinality>
386-
<optional>false</optional>
398+
<optional>true</optional>
387399
</outputPortSet>
388400
</outputPorts>
389401
</cppOperatorModel>

com.ibm.streamsx.websocket/com.ibm.streamsx.websocket.op/WebSocketSendReceive/WebSocketSendReceive_cpp.cgt

Lines changed: 148 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/*
99
============================================================
1010
First created on: Apr/08/2020
11-
Last modified on: Feb/01/2023
11+
Last modified on: Mar/20/2023
1212

1313
This particular operator (WebSocketSendReceive) is used to
1414
send and receive either text data or binary data to and from a
@@ -154,51 +154,56 @@ if ($blobDataOutputAttributeFound == 1 and !(defined($dataOutputAsBlob))) {
154154
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'blobData' is not of type 'blob' in the first output port.");
155155
}
156156

157-
# Check the output port number 1 i.e. the second output port.
158-
# It will carry the send result code of the most recent tuple transmission to
159-
# the remote WebSocket server and the send error message if there is any error during transmission.
160-
my $outputPort2 = $model->getOutputPortAt(1);
161-
my $outputTupleName2 = $outputPort2->getCppTupleName();
162-
my $sendResultCodeInt32 = undef;
163-
my $sendResultMessageRString = undef;
164-
my $outputAttrs2 = $outputPort2->getAttributes();
165-
my $sendResultCodeAttributeFound = 0;
166-
my $sendResultMessageAttributeFound = 0;
167-
168-
foreach my $outputAttr2 (@$outputAttrs2) {
169-
my $outAttrName2 = $outputAttr2->getName();
170-
my $outAttrType2 = $outputAttr2->getSPLType();
157+
158+
# Second output port i.e. output port number 1 is optional in this operator.
159+
# Do the following if that output port is present.
160+
if ($numberOfOutputPorts > 1) {
161+
# Check the output port number 1 i.e. the second output port.
162+
# It will carry the send result code of the most recent tuple transmission to
163+
# the remote WebSocket server and the send error message if there is any error during transmission.
164+
my $outputPort2 = $model->getOutputPortAt(1);
165+
my $outputTupleName2 = $outputPort2->getCppTupleName();
166+
my $sendResultCodeInt32 = undef;
167+
my $sendResultMessageRString = undef;
168+
my $outputAttrs2 = $outputPort2->getAttributes();
169+
my $sendResultCodeAttributeFound = 0;
170+
my $sendResultMessageAttributeFound = 0;
171171

172-
if ($outAttrName2 eq "sendResultCode") {
173-
$sendResultCodeAttributeFound = 1;
172+
foreach my $outputAttr2 (@$outputAttrs2) {
173+
my $outAttrName2 = $outputAttr2->getName();
174+
my $outAttrType2 = $outputAttr2->getSPLType();
174175

175-
if ($outAttrType2 eq "int32") {
176-
# This tuple attribute will carry the result code for sending the most recent tuple.
177-
$sendResultCodeInt32 = 1;
178-
}
179-
}
180-
181-
if ($outAttrName2 eq "sendResultMessage") {
182-
$sendResultMessageAttributeFound = 1;
176+
if ($outAttrName2 eq "sendResultCode") {
177+
$sendResultCodeAttributeFound = 1;
178+
179+
if ($outAttrType2 eq "int32") {
180+
# This tuple attribute will carry the result code for sending the most recent tuple.
181+
$sendResultCodeInt32 = 1;
182+
}
183+
}
183184

184-
if ($outAttrType2 eq "rstring") {
185-
# This tuple attribute will carry the error message if any while sending the most recent tuple.
186-
$sendResultMessageRString = 1;
187-
}
188-
}
185+
if ($outAttrName2 eq "sendResultMessage") {
186+
$sendResultMessageAttributeFound = 1;
187+
188+
if ($outAttrType2 eq "rstring") {
189+
# This tuple attribute will carry the error message if any while sending the most recent tuple.
190+
$sendResultMessageRString = 1;
191+
}
192+
}
193+
194+
}
189195

190-
}
191-
192-
if ($sendResultCodeAttributeFound == 0 and $sendResultMessageAttributeFound == 0) {
193-
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'sendResultCode' or 'sendResultMessage' is missing in the second output port.");
194-
}
195-
196-
if ($sendResultCodeAttributeFound == 1 and !(defined($sendResultCodeInt32))) {
197-
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'sendResultCode' is not of type 'int32' in the second output port.");
198-
}
199-
200-
if ($sendResultMessageAttributeFound == 1 and !(defined($sendResultMessageRString))) {
201-
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'sendResultMessage' is not of type 'rstring' in the second output port.");
196+
if ($sendResultCodeAttributeFound == 0 and $sendResultMessageAttributeFound == 0) {
197+
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'sendResultCode' or 'sendResultMessage' is missing in the second output port.");
198+
}
199+
200+
if ($sendResultCodeAttributeFound == 1 and !(defined($sendResultCodeInt32))) {
201+
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'sendResultCode' is not of type 'int32' in the second output port.");
202+
}
203+
204+
if ($sendResultMessageAttributeFound == 1 and !(defined($sendResultMessageRString))) {
205+
SPL::CodeGen::exitln("WebSocketSendReceive_cpp.cgt: The required output tuple attribute 'sendResultMessage' is not of type 'rstring' in the second output port.");
206+
}
202207
}
203208

204209
# Following are the operator parameters.
@@ -257,6 +262,10 @@ $noDataCpuYieldTimeInSenderThread = $noDataCpuYieldTimeInSenderThread ? $noDataC
257262
my $reconnectionInterval = $model->getParameterByName("reconnectionInterval");
258263
# Default: 60.0 seconds
259264
$reconnectionInterval = $reconnectionInterval ? $reconnectionInterval->getValueAt(0)->getCppExpression() : 60.0;
265+
266+
my $metricsResetInterval = $model->getParameterByName("metricsResetInterval");
267+
# Default: 0 minutes
268+
$metricsResetInterval = $metricsResetInterval ? $metricsResetInterval->getValueAt(0)->getCppExpression() : 0;
260269
%>
261270

262271
<%SPL::CodeGen::implementationPrologue($model);%>
@@ -362,6 +371,8 @@ MY_OPERATOR::MY_OPERATOR()
362371
boost::to_string(reconnectionInterval) + " is given for the reconnectionInterval parameter." +
363372
" Valid value must be greater than 0.0.");
364373
}
374+
375+
metricsResetInterval = <%=$metricsResetInterval%>;
365376

366377
operatorPhysicalName = getContext().getName();
367378
udpChannelNumber = getContext().getChannel();
@@ -382,7 +393,8 @@ MY_OPERATOR::MY_OPERATOR()
382393
", wsClientSessionLoggingNeeded=" << wsClientSessionLoggingNeeded <<
383394
", newDataCpuYieldTimeInSenderThread=" << newDataCpuYieldTimeInSenderThread <<
384395
", noDataCpuYieldTimeInSenderThread=" << noDataCpuYieldTimeInSenderThread <<
385-
", reconnectionInterval=" << reconnectionInterval, "constructor");
396+
", reconnectionInterval=" << reconnectionInterval <<
397+
", metricsResetInterval=" << metricsResetInterval, "constructor");
386398
} // End of Constructor
387399

388400
// Destructor
@@ -459,6 +471,25 @@ void MY_OPERATOR::allPortsReady()
459471
SPL::Functions::Utility::abort(__FILE__, __LINE__);
460472
}
461473
*/
474+
475+
// v1.1.4 change (Mar/15/2023)
476+
// If this operator is configured with a non-zero metrics reset interval,
477+
// we will go ahead and start a thread to periodically reset all the
478+
// metrics counters to zero. Metrics reset interval is given in minutes.
479+
// We will also start this thread only when the live update metrics is enabled.
480+
if(metricsResetInterval > 0 && websocketLiveMetricsUpdateNeeded == true) {
481+
try {
482+
auto callback3 = std::bind(&MY_OPERATOR::reset_metrics_counters, this);
483+
boost::thread* myThread3 = new boost::thread(callback3);
484+
} catch(std::exception& e) {
485+
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
486+
"-->Channel " << boost::to_string(udpChannelNumber) <<
487+
"-->Exception in creating the reset_metrics_counters thread. " <<
488+
e.what(), "reset_metrics_counters thread creation");
489+
// We can't do what the user asked for without this thread. Abort this operator now.
490+
SPL::Functions::Utility::abort(__FILE__, __LINE__);
491+
}
492+
}
462493
} // End of allPortsReady.
463494

464495
// Notify pending shutdown
@@ -768,6 +799,12 @@ void MY_OPERATOR::ws_data_sender() {
768799
strDataVectorSize--;
769800
}
770801

802+
<%
803+
if ($numberOfOutputPorts > 1) {
804+
%>
805+
806+
// Second output port for this operator is optional.
807+
// Do the following only if that output port is present.
771808
// We will return the status as connection error via the send status output port.
772809
// On receiving this error status, the application logic that invoked this operator can
773810
// decide to retransmit that data at a later time.
@@ -789,6 +826,10 @@ void MY_OPERATOR::ws_data_sender() {
789826
std::string(" Unable to connect to the remote WebSocket server. You may try sending your data again later."));
790827
submit(oTuple, 1);
791828

829+
<%
830+
}
831+
%>
832+
792833
if (wsClientSessionLoggingNeeded == true) {
793834
SPLAPPTRC(L_ERROR,
794835
"Operator "
@@ -1000,6 +1041,12 @@ void MY_OPERATOR::ws_data_sender() {
10001041
nSizeOfMostRecentDataItemSentMetric->setValueNoLock(payloadSize);
10011042
}
10021043

1044+
<%
1045+
if ($numberOfOutputPorts > 1) {
1046+
%>
1047+
1048+
// Second output port for this operator is optional.
1049+
// Do the following only if that output port is present.
10031050
// We will return the status as success via the send status output port.
10041051
OPort1Type oTuple;
10051052

@@ -1018,6 +1065,10 @@ void MY_OPERATOR::ws_data_sender() {
10181065

10191066
submit(oTuple, 1);
10201067

1068+
<%
1069+
}
1070+
%>
1071+
10211072
// v1.1.3 change. (Feb/01/2023)
10221073
if (wsClientSessionLoggingNeeded == true &&
10231074
payloadSize > 0 && dataItemSentToRemoteServer) {
@@ -1055,6 +1106,60 @@ void MY_OPERATOR::ws_data_sender() {
10551106
// } // End of the outer while loop
10561107
} // End of ws_data_sender
10571108

1109+
// v1.1.4 change.
1110+
// This is a thread that gets started in the allPortsReady method above to
1111+
// reset metrics counters if the user configured this operator to do that.
1112+
// In this method, we will keep resetting the metrics counters to zero at a
1113+
// user configured time interval in minutes.
1114+
void MY_OPERATOR::reset_metrics_counters() {
1115+
// If the metrics reset interval is set to zero,
1116+
// we can return from this method now.
1117+
// We can also return from this method when the live metrics update is not enabled.
1118+
if(metricsResetInterval <= 0 || websocketLiveMetricsUpdateNeeded == false) {
1119+
return;
1120+
}
1121+
1122+
SPL::uint32 timeElapsed = 0;
1123+
1124+
while (!getPE().getShutdownRequested()) {
1125+
// Wait for a minute.
1126+
SPL::Functions::Utility::block(60.0);
1127+
// Advance it by a minute.
1128+
timeElapsed += 1;
1129+
1130+
if (timeElapsed >= metricsResetInterval) {
1131+
// Set our time counter to 0 for a fresh start.
1132+
timeElapsed = 0;
1133+
1134+
SPLAPPTRC(L_ERROR,
1135+
"Operator " <<
1136+
operatorPhysicalName <<
1137+
"-->Channel " << boost::to_string(udpChannelNumber) <<
1138+
", metricsResetInterval=" << metricsResetInterval <<
1139+
". Values of the metrics counters just before resetting them to zero. " <<
1140+
"numberOfDataItemsSent=" << numberOfDataItemsSent <<
1141+
", numberOfDataBytesSent=" << numberOfDataBytesSent <<
1142+
", numberOfDataItemsReceived=" << numberOfDataItemsReceived <<
1143+
", numberOfDataBytesReceived=" << numberOfDataBytesReceived, "reset_metrics_counters");
1144+
1145+
// It is time now to reset the metrics counters to zero.
1146+
// Please note that we are blindly resetting the
1147+
// counters here without acquiring any mutex to avoid
1148+
// variable write collisions with other threds (data sender and data receiver)
1149+
// If tests prove that this kind of resetting counters is
1150+
// not effective, plan to wait on a mutex here to coordinate with the other threads.
1151+
numberOfDataItemsSent = 0;
1152+
numberOfDataBytesSent = 0;
1153+
numberOfDataItemsReceived = 0;
1154+
numberOfDataBytesReceived = 0;
1155+
nDataItemsSentToRemoteServerMetric->setValueNoLock(numberOfDataItemsSent);
1156+
nDataBytesSentToRemoteServerMetric->setValueNoLock(numberOfDataBytesSent);
1157+
nDataItemsReceivedFromRemoteServerMetric->setValueNoLock(numberOfDataItemsReceived);
1158+
nDataBytesReceivedFromRemoteServerMetric->setValueNoLock(numberOfDataBytesReceived);
1159+
}
1160+
} // End of the while loop.
1161+
} // End of reset_metrics_counters
1162+
10581163
// This method initializes the Websocket driver, TLS or non-TLS and then
10591164
// opens a connection. This is going to run on its own thread.
10601165
// See the commentary in the allPortsReady method above to

com.ibm.streamsx.websocket/com.ibm.streamsx.websocket.op/WebSocketSendReceive/WebSocketSendReceive_h.cgt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/*
99
============================================================
1010
First created on: Apr/08/2020
11-
Last modified on: Jan/29/2023
11+
Last modified on: Mar/16/2023
1212

1313
This include file contains the necessary inclusion of
1414
other C++ header files, declaration of member variables and
@@ -86,6 +86,7 @@ public:
8686
SPL::float64 newDataCpuYieldTimeInSenderThread;
8787
SPL::float64 noDataCpuYieldTimeInSenderThread;
8888
SPL::float64 reconnectionInterval;
89+
SPL::uint32 metricsResetInterval;
8990

9091
// Custom metrics for this operator.
9192
Metric *nDataItemsSentToRemoteServerMetric;
@@ -191,6 +192,9 @@ public:
191192

192193
// Websocket data sender thread method
193194
void ws_data_sender();
195+
196+
// Reset metrics counters thread method
197+
void reset_metrics_counters();
194198

195199
private:
196200
// These are the output attribute assignment functions for this operator.

com.ibm.streamsx.websocket/com.ibm.streamsx.websocket.op/WebSocketSink/WebSocketSink.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,17 @@
369369
<expressionMode>AttributeFree</expressionMode>
370370
<type>rstring</type>
371371
<cardinality>1</cardinality>
372-
</parameter>
372+
</parameter>
373+
374+
<parameter>
375+
<name>metricsResetInterval</name>
376+
<description>This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)</description>
377+
<optional>true</optional>
378+
<rewriteAllowed>true</rewriteAllowed>
379+
<expressionMode>AttributeFree</expressionMode>
380+
<type>uint32</type>
381+
<cardinality>1</cardinality>
382+
</parameter>
373383
</parameters>
374384

375385
<inputPorts>

0 commit comments

Comments
 (0)