Skip to content

Commit 965c246

Browse files
committed
Changes done for v1.1.3.
1 parent 2578162 commit 965c246

File tree

13 files changed

+437
-64
lines changed

13 files changed

+437
-64
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ The streamsx.websocket toolkit provides the following C++ and Java operators tha
1616

1717
**HttpPost** is a utility operator provided by this toolkit to test the optional HTTP(S) text or binary data reception feature available in the WebSocketSource operator. This utility operator can send text or binary data and receive text or binary data in response from the remote server. This operator allows clients to send data via HTTP GET, PUT and POST. If other application scenarios see a fit for this utility operator, they can also use it as needed.
1818

19-
If you clone this toolkit from the IBMStreams GitHub, then you must build this toolkit via `ant all` and `ant download-clean` from this toolkit's top-level directory. If there is no direct Internet access from the IBM Streams machine and if there is a need to go through a proxy server, then the `ant all` command may not work. In that case, you can try this command instead. `ant all -Dwebsocket.archive=file://localhost$(pwd)/ext -Dwebsocket.version=0.8.2 -Dboost.archive.src0=file://localhost$(pwd)/ext/boost-install-files/boost_1_73_0.tar.gz`
19+
If you clone this toolkit from the IBMStreams GitHub, then you must build this toolkit via `ant all` and `ant download-clean` from this toolkit's top-level directory. If there is no direct Internet access from the IBM Streams machine and if there is a need to go through a proxy server, then the `ant all` command may not work. In that case, please follow the specific instructions outlined in this toolkit's documentation [page](https://ibmstreams.github.io/streamsx.websocket/docs/user/overview/).
2020

2121
In a Streams application, these operators can either be used together or independent of each other. When they are used in an IBM Streams application, the WebSocket operators in this toolkit generate important metrics data that can be viewed from the IBM Streams web console to observe details such as data transfer time, payload size, number of data items transferred etc.
2222

com.ibm.streamsx.websocket/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
Changes
22
=======
3+
## v1.1.3:
4+
* Feb/01/2023
5+
* Added a code fix to try/catch the invalid state exception thrown from within the websocketpp send method when sending data to a remote client or server and at that exact time that remote client or server closing its WebSocket connection due to that client or server application being shut down or for any other reason. This change was done for all the three WebSocket operators in this toolkit.
6+
* Optimized all the reference examples that use the WebSocketSendReceive operator to ignore if a tuple arrives with empty data to be sent to a remote WebSocket endpoint.
7+
38
## v1.1.2:
49
* Jan/30/2023
510
* Optimized all the reference examples that use the WebSocketSendReceive operator to queue the incoming tuples in a map instead of a list to improve the data sending performance.

com.ibm.streamsx.websocket/com.ibm.streamsx.websocket.op/WebSocketSendReceive/WebSocketSendReceive_cpp.cgt

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/*
99
============================================================
1010
First created on: Apr/08/2020
11-
Last modified on: Jan/29/2023
11+
Last modified on: Feb/01/2023
1212

1313
This particular operator (WebSocketSendReceive) is used to
1414
send and receive either text data or binary data to and from a
@@ -639,7 +639,9 @@ void MY_OPERATOR::process(Punctuation const & punct, uint32_t port)
639639
//
640640
void MY_OPERATOR::ws_data_sender() {
641641
SPL::int64 timeTakenToSendDataItem = 0;
642-
642+
// v1.1.3 change. (Feb/01/2023)
643+
bool dataItemSentToRemoteServer = false;
644+
643645
// v1.1.1 change (Jan/28/2023)
644646
// Since it is no longer a long running thread loop,
645647
// there is no need for this while loop.
@@ -868,15 +870,51 @@ void MY_OPERATOR::ws_data_sender() {
868870
payloadSize = blobDataSize.at(0);
869871

870872
if (buffer != NULL && payloadSize > 0) {
871-
// c->get_alog().write(websocketpp::log::alevel::app, "Sent binary Message: " + boost::to_string(buffer.size()));
872-
if (tls_ws_client != NULL) {
873-
tls_ws_client->send(wsHandle, buffer, payloadSize, websocketpp::frame::opcode::binary);
874-
} else {
875-
non_tls_ws_client->send(wsHandle, buffer, payloadSize, websocketpp::frame::opcode::binary);
876-
}
877-
878-
numberOfDataItemsSent++;
879-
numberOfDataBytesSent += payloadSize;
873+
// v1.1.3 change. (Feb/01/2023)
874+
// Catch any exception from the send method below.
875+
// I have seen it throw an invalid_state exception when
876+
// we are inside the send method to send data to the
877+
// remote server that we are connected to and at that
878+
// time that server closes its WebSocket connection
879+
// due to the remote server application getting
880+
// shut down or for any other reason.
881+
try {
882+
// c->get_alog().write(websocketpp::log::alevel::app, "Sent binary Message: " + boost::to_string(buffer.size()));
883+
if (tls_ws_client != NULL) {
884+
tls_ws_client->send(wsHandle, buffer, payloadSize, websocketpp::frame::opcode::binary);
885+
// v1.1.3 change. (Feb/01/2023)
886+
dataItemSentToRemoteServer = true;
887+
} else {
888+
non_tls_ws_client->send(wsHandle, buffer, payloadSize, websocketpp::frame::opcode::binary);
889+
// v1.1.3 change. (Feb/01/2023)
890+
dataItemSentToRemoteServer = true;
891+
}
892+
} catch (websocketpp::exception const & e) {
893+
/*
894+
// On a heavy load of data send situations, there will be
895+
// too many data item send operations that will throw
896+
// several thousands of exceptions during the few second period
897+
// in which the remote server disconnect happens. That will
898+
// write the same PE log entry several thousands of times.
899+
// We don't want to log all of them. If you want to specifically
900+
// test this condition in a lab test environment, you can
901+
// uncomment this block only for testing purposes.
902+
SPLAPPTRC(L_ERROR,
903+
"Caught an exception in the middle of sending data to the " <<
904+
"remote server we are connected to. Server URL=" <<
905+
this->url <<
906+
". We are skipping the data send to that " <<
907+
"disconnected server.", "ws_data_sender");
908+
*/
909+
910+
// Do nothing here.
911+
}
912+
913+
// v1.1.3 change. (Feb/01/2023)
914+
if(dataItemSentToRemoteServer == true) {
915+
numberOfDataItemsSent++;
916+
numberOfDataBytesSent += payloadSize;
917+
}
880918
}
881919

882920
// Remove the items from the vector. It is no longer needed. Also free the original
@@ -893,21 +931,57 @@ void MY_OPERATOR::ws_data_sender() {
893931
payloadSize = strDataToSend.at(0).length();
894932

895933
if (payloadSize > 0) {
896-
if (tls_ws_client != NULL) {
897-
tls_ws_client->send(wsHandle, strDataToSend.at(0), websocketpp::frame::opcode::text);
898-
} else {
899-
non_tls_ws_client->send(wsHandle, strDataToSend.at(0), websocketpp::frame::opcode::text);
900-
}
901-
902-
numberOfDataItemsSent++;
903-
numberOfDataBytesSent += payloadSize;
934+
// v1.1.3 change. (Feb/01/2023)
935+
// Catch any exception from the send method below.
936+
// I have seen it throw an invalid_state exception when
937+
// we are inside the send method to send data to the
938+
// remote server that we are connected to and at that
939+
// time that server closes its WebSocket connection
940+
// due to the remote server application getting
941+
// shut down or for any other reason.
942+
try {
943+
if (tls_ws_client != NULL) {
944+
tls_ws_client->send(wsHandle, strDataToSend.at(0), websocketpp::frame::opcode::text);
945+
// v1.1.3 change. (Feb/01/2023)
946+
dataItemSentToRemoteServer = true;
947+
} else {
948+
non_tls_ws_client->send(wsHandle, strDataToSend.at(0), websocketpp::frame::opcode::text);
949+
// v1.1.3 change. (Feb/01/2023)
950+
dataItemSentToRemoteServer = true;
951+
}
952+
} catch (websocketpp::exception const & e) {
953+
/*
954+
// On a heavy load of data send situations, there will be
955+
// too many data item send operations that will throw
956+
// several thousands of exceptions during the few second period
957+
// in which the remote server disconnect happens. That will
958+
// write the same PE log entry several thousands of times.
959+
// We don't want to log all of them. If you want to specifically
960+
// test this condition in a lab test environment, you can
961+
// uncomment this block only for testing purposes.
962+
SPLAPPTRC(L_ERROR,
963+
"Caught an exception in the middle of sending data to the " <<
964+
"remote server we are connected to. Server URL=" <<
965+
this->url <<
966+
". We are skipping the data send to that " <<
967+
"disconnected server.", "ws_data_sender");
968+
*/
969+
970+
// Do nothing here.
971+
}
972+
973+
// v1.1.3 change. (Feb/01/2023)
974+
if(dataItemSentToRemoteServer == true) {
975+
numberOfDataItemsSent++;
976+
numberOfDataBytesSent += payloadSize;
977+
}
904978
}
905979

906980
// Erase the item pushed into the vector member (cache) earlier in the
907981
// process method for this data item. It is no longer needed.
908982
strDataToSend.erase(strDataToSend.begin() + 0);
909983
strDataVectorSize--;
910-
}
984+
}
911985

912986
// Calculate the time it took to send the data item.
913987
SPL:timestamp tsAfterSendingDataItem = SPL::Functions::Time::getTimestamp();
@@ -917,7 +991,9 @@ void MY_OPERATOR::ws_data_sender() {
917991
timeTakenToSendDataItem /= 1000000;
918992

919993
// Update the operator metric only if the user asked for a live update.
920-
if (websocketLiveMetricsUpdateNeeded == true && payloadSize > 0) {
994+
// v1.1.3 change. (Feb/01/2023)
995+
if (websocketLiveMetricsUpdateNeeded == true &&
996+
payloadSize > 0 && dataItemSentToRemoteServer == true) {
921997
nDataItemsSentToRemoteServerMetric->setValueNoLock(numberOfDataItemsSent);
922998
nDataBytesSentToRemoteServerMetric->setValueNoLock(numberOfDataBytesSent);
923999
nTimeTakenToSendMostRecentDataItemMetric->setValueNoLock((SPL::uint64)timeTakenToSendDataItem);
@@ -926,13 +1002,25 @@ void MY_OPERATOR::ws_data_sender() {
9261002

9271003
// We will return the status as success via the send status output port.
9281004
OPort1Type oTuple;
929-
// Result code 0 means successful send.
930-
oTuple.set_sendResultCode(0);
931-
oTuple.set_sendResultMessage("Successfully sent " +
932-
boost::to_string(payloadSize) + " bytes of data to the remote WebSocket server.");
1005+
1006+
// v1.1.3 change. (Feb/01/2023)
1007+
if(payloadSize > 0 && dataItemSentToRemoteServer == false) {
1008+
// Result code 2 means unsuccessful send.
1009+
oTuple.set_sendResultCode(2);
1010+
oTuple.set_sendResultMessage(boost::to_string("Unable to send data to the remote WebSocket server as it may have ") +
1011+
boost::to_string("closed its connection. You may try sending your data again later."));
1012+
} else {
1013+
// Result code 0 means successful send.
1014+
oTuple.set_sendResultCode(0);
1015+
oTuple.set_sendResultMessage("Successfully sent " +
1016+
boost::to_string(payloadSize) + " bytes of data to the remote WebSocket server.");
1017+
}
1018+
9331019
submit(oTuple, 1);
9341020

935-
if (wsClientSessionLoggingNeeded == true && payloadSize > 0) {
1021+
// v1.1.3 change. (Feb/01/2023)
1022+
if (wsClientSessionLoggingNeeded == true &&
1023+
payloadSize > 0 && dataItemSentToRemoteServer) {
9361024
SPLAPPTRC(L_ERROR,
9371025
"Operator "
9381026
<< operatorPhysicalName

com.ibm.streamsx.websocket/com.ibm.streamsx.websocket.op/WebSocketSink/WebSocketSink_cpp.cgt

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
/*
99
============================================================
1010
First created on: Apr/28/2020
11-
Last modified on: Jan/29/2023
11+
Last modified on: Feb/01/2023
1212

1313
This particular operator (WebSocketSink) is used to
1414
send text (plain text, JSON, XML) and/or binary data to
@@ -1093,6 +1093,8 @@ void MY_OPERATOR::ws_data_sender() {
10931093
SPL::int64 timeTakenToSendDataItem = 0;
10941094
SPL::int64 timeTakenToAcquireMutex1 = 0;
10951095
SPL::int64 timeTakenToAcquireMutex3 = 0;
1096+
// v1.1.3 change. (Feb/01/2023)
1097+
bool dataItemSentToRemoteClient = false;
10961098

10971099
// v1.1.1 change (Jan/28/2023)
10981100
// Since it is no longer a long running thread loop,
@@ -1281,18 +1283,54 @@ void MY_OPERATOR::ws_data_sender() {
12811283
continue;
12821284
}
12831285

1284-
// We can now send a blob via either TLS or non-TLS.
1285-
if(con_metadata.isTlsConnection == false) {
1286-
endpoint_non_tls.send(hdl, buffer, payloadSize, websocketpp::frame::opcode::binary);
1287-
} else {
1288-
endpoint_tls.send(hdl, buffer, payloadSize, websocketpp::frame::opcode::binary);
1289-
}
1286+
// v1.1.3 change. (Feb/01/2023)
1287+
// Catch any exception from the send method below.
1288+
// I have seen it throw an invalid_state exception when
1289+
// we are inside the send method to send data to a
1290+
// connected client and at that time that client
1291+
// closes its WebSocket connection due to the
1292+
// remote client application getting shut down or
1293+
// for any other reason.
1294+
try {
1295+
// We can now send a blob via either TLS or non-TLS.
1296+
if(con_metadata.isTlsConnection == false) {
1297+
endpoint_non_tls.send(hdl, buffer, payloadSize, websocketpp::frame::opcode::binary);
1298+
// v1.1.3 change. (Feb/01/2023)
1299+
dataItemSentToRemoteClient = true;
1300+
} else {
1301+
endpoint_tls.send(hdl, buffer, payloadSize, websocketpp::frame::opcode::binary);
1302+
// v1.1.3 change. (Feb/01/2023)
1303+
dataItemSentToRemoteClient = true;
1304+
}
1305+
} catch (websocketpp::exception const & e) {
1306+
/*
1307+
// On a heavy load of data send situations, there will be
1308+
// too many data item send operations that will throw
1309+
// several thousands of exceptions during the few second period
1310+
// in which the remote client disconnect happens. That will
1311+
// write the same PE log entry several thousands of times.
1312+
// We don't want to log all of them. If you want to specifically
1313+
// test this condition in a lab test environment, you can
1314+
// uncomment this block only for testing purposes.
1315+
SPLAPPTRC(L_ERROR,
1316+
"Caught an exception in the middle of sending data to a " <<
1317+
"remote client connected here from the client IP address " <<
1318+
con_metadata.clientIpAddress <<
1319+
" and a client WebSocket port " <<
1320+
con_metadata.clientPort <<
1321+
". We are skipping the data send to that " <<
1322+
"disconnected client.", "ws_data_sender");
1323+
*/
1324+
1325+
// Do nothing here.
1326+
}
12901327

12911328
it++;
12921329
} // End of the inner while loop.
12931330
}
12941331

1295-
if (buffer != NULL && payloadSize > 0) {
1332+
// v1.1.3 change. (Feb/01/2023)
1333+
if (buffer != NULL && payloadSize > 0 && dataItemSentToRemoteClient == true) {
12961334
// We compute the following two values only for one remote
12971335
// client connected to this operator. That is what will get
12981336
// displayed in the PE log file and in the Streams web console's
@@ -1348,19 +1386,55 @@ void MY_OPERATOR::ws_data_sender() {
13481386
it++;
13491387
continue;
13501388
}
1351-
1352-
// We can now send a string via either TLS or non-TLS.
1353-
if(con_metadata.isTlsConnection == false) {
1354-
endpoint_non_tls.send(hdl, strDataToSend.at(0), websocketpp::frame::opcode::text);
1355-
} else {
1356-
endpoint_tls.send(hdl, strDataToSend.at(0), websocketpp::frame::opcode::text);
1357-
}
1389+
1390+
// v1.1.3 change. (Feb/01/2023)
1391+
// Catch any exception from the send method below.
1392+
// I have seen it throw an invalid_state exception when
1393+
// we are inside the send method to send data to a
1394+
// connected client and at that time that client
1395+
// closes its WebSocket connection due to the
1396+
// remote client application getting shut down or
1397+
// for any other reason.
1398+
try {
1399+
// We can now send a string via either TLS or non-TLS.
1400+
if(con_metadata.isTlsConnection == false) {
1401+
endpoint_non_tls.send(hdl, strDataToSend.at(0), websocketpp::frame::opcode::text);
1402+
// v1.1.3 change. (Feb/01/2023)
1403+
dataItemSentToRemoteClient = true;
1404+
} else {
1405+
endpoint_tls.send(hdl, strDataToSend.at(0), websocketpp::frame::opcode::text);
1406+
// v1.1.3 change. (Feb/01/2023)
1407+
dataItemSentToRemoteClient = true;
1408+
}
1409+
} catch (websocketpp::exception const & e) {
1410+
/*
1411+
// On a heavy load of data send situations, there will be
1412+
// too many data item send operations that will throw
1413+
// several thousands of exceptions during the few second period
1414+
// in which the remote client disconnect happens. That will
1415+
// write the same PE log entry several thousands of times.
1416+
// We don't want to log all of them. If you want to specifically
1417+
// test this condition in a lab test environment, you can
1418+
// uncomment this block only for testing purposes.
1419+
SPLAPPTRC(L_ERROR,
1420+
"Caught an exception in the middle of sending data to a " <<
1421+
"remote client connected here from the client IP address " <<
1422+
con_metadata.clientIpAddress <<
1423+
" and a client WebSocket port " <<
1424+
con_metadata.clientPort <<
1425+
". We are skipping the data send to that " <<
1426+
"disconnected client.", "ws_data_sender");
1427+
*/
1428+
1429+
// Do nothing here.
1430+
}
13581431

13591432
it++;
13601433
} // End of the inner while loop.
13611434
}
13621435

1363-
if (payloadSize > 0) {
1436+
// v1.1.3 change. (Feb/01/2023)
1437+
if (payloadSize > 0 && dataItemSentToRemoteClient == true) {
13641438
// We compute the following two values only for one remote
13651439
// client connected to this operator. That is what will get
13661440
// displayed in the PE log file and in the Streams web console's
@@ -1387,16 +1461,20 @@ void MY_OPERATOR::ws_data_sender() {
13871461
timeTakenToSendDataItem /= 1000000;
13881462

13891463
// Update the operator metric only if the user asked for a live update.
1390-
if (websocketLiveMetricsUpdateNeeded == true && payloadSize > 0) {
1464+
// v1.1.3 change. (Feb/01/2023)
1465+
if (websocketLiveMetricsUpdateNeeded == true &&
1466+
payloadSize > 0 && dataItemSentToRemoteClient == true) {
13911467
nDataItemsSentToRemoteClientsMetric->setValueNoLock(numberOfDataItemsSent);
13921468
nDataBytesSentToRemoteClientsMetric->setValueNoLock(numberOfDataBytesSent);
13931469
nBlobDataItemsWaitingToBeSentToRemoteClientsMetric->setValueNoLock(blobDataVectorSize);
13941470
nStringDataItemsWaitingToBeSentToRemoteClientsMetric->setValueNoLock(strDataVectorSize);
13951471
nTimeTakenToSendMostRecentDataItemMetric->setValueNoLock((SPL::uint64)timeTakenToSendDataItem);
13961472
nSizeOfMostRecentDataItemSentMetric->setValueNoLock(payloadSize);
13971473
}
1398-
1399-
if (wsClientSessionLoggingNeeded == true && payloadSize > 0) {
1474+
1475+
// v1.1.3 change. (Feb/01/2023)
1476+
if (wsClientSessionLoggingNeeded == true &&
1477+
payloadSize > 0 && dataItemSentToRemoteClient == true) {
14001478
SPLAPPTRC(L_ERROR,
14011479
"Operator "
14021480
<< operatorPhysicalName

0 commit comments

Comments
 (0)