diff --git a/src/api/qpid-proton/CMakeLists.txt b/src/api/qpid-proton/CMakeLists.txt index a9d20d7..757b762 100644 --- a/src/api/qpid-proton/CMakeLists.txt +++ b/src/api/qpid-proton/CMakeLists.txt @@ -61,10 +61,14 @@ add_library( reactor/handler/CommonHandler.cpp reactor/handler/ConnectorHandler.cpp reactor/handler/SenderHandler.cpp + reactor/handler/TxSenderHandler.cpp reactor/handler/ReceiverHandler.cpp + reactor/handler/TxReceiverHandler.cpp reactor/ConnectingClient.cpp reactor/SendingClient.cpp + reactor/TxSendingClient.cpp reactor/ReceivingClient.cpp + reactor/TxReceivingClient.cpp ) target_link_libraries( diff --git a/src/api/qpid-proton/clients/aac3_receiver.cpp b/src/api/qpid-proton/clients/aac3_receiver.cpp index 55a0408..10a11dc 100644 --- a/src/api/qpid-proton/clients/aac3_receiver.cpp +++ b/src/api/qpid-proton/clients/aac3_receiver.cpp @@ -1,17 +1,33 @@ /* - * aac1_sender.cpp + * aac3_sender.cpp * * Created on: Apr 14, 2015 * Author: opiske */ - #include +#include using dtests::proton::reactor::ReceivingClient; +using dtests::proton::reactor::TxReceivingClient; int main(int argc, char** argv) { - ReceivingClient client = ReceivingClient(); + int i = 0; + bool tx_mode = false; + std::string tx_opt = "--tx-"; + while (i < argc) { + if (std::string(argv[i]).rfind("--tx-", 0) == 0) { // pos=0 limits the search to the prefix + tx_mode = true; + break; + } + i++; + } - return client.run(argc, argv); + if (tx_mode) { + TxReceivingClient client = TxReceivingClient(); + return client.run(argc, argv); + } else { + ReceivingClient client = ReceivingClient(); + return client.run(argc, argv); + } } diff --git a/src/api/qpid-proton/clients/aac3_sender.cpp b/src/api/qpid-proton/clients/aac3_sender.cpp index fe9cc9f..0a2bf61 100644 --- a/src/api/qpid-proton/clients/aac3_sender.cpp +++ b/src/api/qpid-proton/clients/aac3_sender.cpp @@ -1,23 +1,33 @@ /* - * aac1_sender.cpp + * aac3_sender.cpp * * Created on: Apr 14, 2015 * Author: opiske */ -#include - #include +#include using dtests::proton::reactor::SendingClient; +using dtests::proton::reactor::TxSendingClient; int main(int argc, char** argv) { - SendingClient client = SendingClient(); - - return client.run(argc, argv); - + int i = 0; + bool tx_mode = false; + std::string tx_opt = "--tx-"; + while (i < argc) { + if (std::string(argv[i]).rfind("--tx-", 0) == 0) { // pos=0 limits the search to the prefix + tx_mode = true; + break; + } + i++; + } + + if (tx_mode) { + TxSendingClient client = TxSendingClient(); + return client.run(argc, argv); + } else { + SendingClient client = SendingClient(); + return client.run(argc, argv); + } } - - - - diff --git a/src/api/qpid-proton/reactor/ReceivingClient.h b/src/api/qpid-proton/reactor/ReceivingClient.h index 5e5f5af..9a4fe1f 100644 --- a/src/api/qpid-proton/reactor/ReceivingClient.h +++ b/src/api/qpid-proton/reactor/ReceivingClient.h @@ -52,6 +52,7 @@ class ReceivingClient : public ModernClient { private: typedef ModernClient super; + protected: void setMessageOptions(const OptionsSetter &setter, message &msg) const; }; diff --git a/src/api/qpid-proton/reactor/SendingClient.h b/src/api/qpid-proton/reactor/SendingClient.h index 559babc..59bc5ac 100644 --- a/src/api/qpid-proton/reactor/SendingClient.h +++ b/src/api/qpid-proton/reactor/SendingClient.h @@ -47,6 +47,7 @@ class SendingClient : public ModernClient { private: typedef ModernClient super; + protected: void setMessageOptions(const OptionsSetter &setter, message &msg) const; bool nameVal(const string& in, string& name, string& value, string& separator) const; void setMessageProperty(message *msg, const string &property) const; diff --git a/src/api/qpid-proton/reactor/TxReceivingClient.cpp b/src/api/qpid-proton/reactor/TxReceivingClient.cpp new file mode 100644 index 0000000..647bb83 --- /dev/null +++ b/src/api/qpid-proton/reactor/TxReceivingClient.cpp @@ -0,0 +1,425 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: TxReceivingClient.cpp + * Author: pematous + * + * Created on November 20, 2024 + */ + +#include + +#include "TxReceivingClient.h" + +using proton::container; + +namespace dtests { +namespace proton { +namespace reactor { + +using dtests::common::UriParser; + +TxReceivingClient::TxReceivingClient() + : super() +{ +} + +TxReceivingClient::~TxReceivingClient() +{ +} + +int TxReceivingClient::run(int argc, char **argv) const +{ + const string usage = "usage: %prog [OPTION]... DIR [FILE]..."; + const string version = "1.0"; + const string desc = "C/C++ AMQ reactive API receiver client for Qpid Proton"; + + ReceiverOptionsParser parser = ReceiverOptionsParser(); + UriParser uri_parser = UriParser(); + + /** + * WARNING: do not reassign the result of chainned calls to usage/version/etc + * because it causes the code to segfault. For example, this crashes: + * + * ControlOptions parser = ControlOptions().usage(usage) + */ + parser.usage(usage).version(version).description(desc); + + optparse::Values options = parser.parse_args(argc, argv); + + parser.validate(options); + + setLogLevel(options); + + // Temporary variable for address, will search for prefix + string tempAddress = options["broker-url"]; + // Variable for final address + string address; + // Variable for recognition of topic + bool is_topic = false; + // Search for prefix + std::size_t prefix_index = tempAddress.find("topic://"); + // If prefix is present + if (prefix_index != std::string::npos) { + // Delete prefix + address = tempAddress.replace(prefix_index, 8, ""); + // Set that it will be topic + is_topic = true; + // If prefix is NOT present + } else { + // Use full address + address = tempAddress; + } + + uri_parser.parse(options["broker-url"]); + + std::vector conn_urls; + if (options.is_set("conn-urls")) { + std::stringstream conn_urls_string(options["conn-urls"]); + std::string segment; + + while(std::getline(conn_urls_string, segment, ',')) { + conn_urls.push_back(segment); + } + } + + string msg_action = "no-action"; + if(options.is_set("msg-action")) { + msg_action = options["msg-action"]; + } + + int msg_action_size = 1; + if(options.is_set("msg-action-size")) { + msg_action_size = atoi(options["msg-action-size"].c_str()); + } + + string user = ""; + if (options.is_set("user")) { + user = options["user"]; + } else { + user = uri_parser.getUser(); + } + + string password = ""; + if (options.is_set("password")) { + password = options["password"]; + } else { + password = uri_parser.getPassword(); + } + + string sasl_mechanisms = ""; + if (options.is_set("sasl-mechanisms")) { + sasl_mechanisms = options["sasl-mechanisms"]; + } else if (user != "" && password != "") { + sasl_mechanisms = "PLAIN"; + } else { + sasl_mechanisms = "ANONYMOUS"; + } + + string conn_sasl_enabled = "true"; + if (options.is_set("conn-sasl-enabled")) { + conn_sasl_enabled = options["conn-sasl-enabled"]; + std::transform(conn_sasl_enabled.begin(), conn_sasl_enabled.end(), conn_sasl_enabled.begin(), ::tolower); + } + + string conn_ssl_certificate = ""; + if (options.is_set("conn-ssl-certificate")) { + conn_ssl_certificate = options["conn-ssl-certificate"]; + } + + string conn_ssl_private_key = ""; + if (options.is_set("conn-ssl-private-key")) { + conn_ssl_private_key = options["conn-ssl-private-key"]; + } + + string conn_ssl_password = ""; + if (options.is_set("conn-ssl-password")) { + conn_ssl_password = options["conn-ssl-password"]; + } + + string conn_ssl_trust_store = ""; + if (options.is_set("conn-ssl-trust-store")) { + conn_ssl_trust_store = options["conn-ssl-trust-store"]; + } + + bool conn_ssl_verify_peer = options.is_set("conn-ssl-verify-peer"); + + bool conn_ssl_verify_peer_name = options.is_set("conn-ssl-verify-peer-name"); + + bool durable_subscriber = false; + if (options.is_set("durable-subscriber")) { + string durable_subscriber_lower = options["durable-subscriber"]; + std::transform(durable_subscriber_lower.begin(), durable_subscriber_lower.end(), durable_subscriber_lower.begin(), ::tolower); + + if (durable_subscriber_lower == "true") { + durable_subscriber = true; + } + } + + bool subscriber_unsubscribe = false; + if (options.is_set("subscriber-unsubscribe")) { + string subscriber_unsubscribe_lower = options["subscriber-unsubscribe"]; + std::transform(subscriber_unsubscribe_lower.begin(), subscriber_unsubscribe_lower.end(), subscriber_unsubscribe_lower.begin(), ::tolower); + + if (subscriber_unsubscribe_lower == "true") { + subscriber_unsubscribe = true; + } + } + + string durable_subscriber_prefix = ""; + if (options.is_set("durable-subscriber-prefix")) { + durable_subscriber_prefix = options["durable-subscriber-prefix"]; + } + + string durable_subscriber_name = ""; + if (options.is_set("durable-subscriber-name")) { + durable_subscriber_name = options["durable-subscriber-name"]; + } + + bool shared_subscriber = false; + if (options.is_set("shared-subscriber")) { + string shared_subscriber_lower = options["shared-subscriber"]; + std::transform(shared_subscriber_lower.begin(), shared_subscriber_lower.end(), shared_subscriber_lower.begin(), ::tolower); + + if (shared_subscriber_lower == "true") { + shared_subscriber = true; + } + } + + string conn_clientid = ""; + if (options.is_set("conn-clientid")) { + conn_clientid = options["conn-clientid"]; + } + + string conn_clientid_prefix = ""; + if (options.is_set("conn-clientid-prefix")) { + conn_clientid_prefix = options["conn-clientid-prefix"]; + } + + bool conn_reconnect_custom = false; + + string conn_reconnect = "true"; + if (options.is_set("conn-reconnect")) { + conn_reconnect = options["conn-reconnect"]; + std::transform(conn_reconnect.begin(), conn_reconnect.end(), conn_reconnect.begin(), ::tolower); + } + + int32_t conn_reconnect_interval = 10; + if (options.is_set("conn-reconnect-interval")) { + conn_reconnect_interval = std::strtol(options["conn-reconnect-interval"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + int32_t conn_reconnect_limit = 0; + if (options.is_set("conn-reconnect-limit")) { + conn_reconnect_limit = std::strtol(options["conn-reconnect-limit"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + int32_t conn_reconnect_timeout = duration::FOREVER.milliseconds(); + if (options.is_set("conn-reconnect-timeout")) { + conn_reconnect_timeout = std::strtol(options["conn-reconnect-timeout"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + uint32_t conn_reconnect_first = 0; + if (options.is_set("conn-reconnect-first")) { + conn_reconnect_first = std::strtoul(options["conn-reconnect-first"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + uint32_t conn_reconnect_increment = 100; + if (options.is_set("conn-reconnect-increment")) { + conn_reconnect_increment = std::strtoul(options["conn-reconnect-increment"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + bool conn_reconnect_doubling = true; + if (options.is_set("conn-reconnect-doubling")) { + if (options["conn-reconnect-doubling"] == "false") { + conn_reconnect_doubling = false; + } + + conn_reconnect_custom = true; + } + + uint32_t conn_heartbeat = 0; + if (options.is_set("conn-heartbeat")) { + conn_heartbeat = std::strtoul(options["conn-heartbeat"].c_str(), NULL, 10); + } + + uint32_t max_frame_size = -1; + if (options.is_set("conn-max-frame-size")) { + max_frame_size = std::strtoul(options["conn-max-frame-size"].c_str(), NULL, 10); + } + + string log_msgs = ""; + if (options.is_set("log-msgs")) { + log_msgs = options["log-msgs"]; + } + + OptionsSetter setter = OptionsSetter(options); + + int timeout = -1; + if (options.is_set("timeout")) { + timeout = static_cast (options.get("timeout")); + } + + bool process_reply_to = false; + if (options.is_set("process-reply-to")) { + process_reply_to = options.get("process-reply-to"); + } + + bool browse = false; + if (options.is_set("recv-browse")) { + browse = options.get("recv-browse"); + } + + int count = 0; + if (options.is_set("count")) { + count = static_cast (options.get("count")); + } + + string selector = ""; + if (options.is_set("recv-selector")) { + selector = options["recv-selector"]; + } + + int duration = 0; + if (options.is_set("duration")) { + duration = static_cast (options.get("duration")); + } + + string duration_mode = "after-receive"; + if (options.is_set("duration-mode")) { + duration_mode = options["duration-mode"]; + } + + int recv_listen_port = 5672; + if(options.is_set("recv-listen-port")) { + recv_listen_port = atoi(options["recv-listen-port"].c_str()); + } + + string recv_listen = "false"; + if (options.is_set("recv-listen")) { + recv_listen = options["recv-listen"]; + std::transform(recv_listen.begin(), recv_listen.end(), recv_listen.begin(), ::tolower); + } + + int recv_credit_window = -1; + if(options.is_set("recv-credit-window")) { + recv_credit_window = atoi(options["recv-credit-window"].c_str()); + } + + bool recv_drain_after_credit_window = options.is_set("recv-drain-after-credit-window"); + + bool conn_use_config_file = options.is_set("conn-use-config-file"); + + bool trace_messages = false; + if (options.is_set("trace-messages")) { + if (options["trace-messages"] == "1" || options["trace-messages"] == "true") { + trace_messages = true; + } + } + if (trace_messages) { + enableTracing("aac3_receiver"); + } + + string tx_action = "commit"; + if (options.is_set("tx-action")) { + tx_action = options["tx-action"]; + } + + string tx_endloop_action = "none"; + if (options.is_set("tx-endloop-action")) { + tx_endloop_action = options["tx-endloop-action"]; + } + + TxReceiverHandler handler = TxReceiverHandler( + address, + conn_urls, + is_topic, + durable_subscriber, + subscriber_unsubscribe, + durable_subscriber_prefix, + durable_subscriber_name, + shared_subscriber, + msg_action, + msg_action_size, + user, + password, + sasl_mechanisms, + conn_sasl_enabled, + conn_ssl_certificate, + conn_ssl_private_key, + conn_ssl_password, + conn_ssl_trust_store, + conn_ssl_verify_peer, + conn_ssl_verify_peer_name, + timeout, + count, + duration, + duration_mode, + conn_reconnect, + conn_reconnect_interval, + conn_reconnect_limit, + conn_reconnect_timeout, + conn_reconnect_first, + conn_reconnect_increment, + conn_reconnect_doubling, + conn_reconnect_custom, + conn_heartbeat, + max_frame_size, + conn_use_config_file, + log_msgs, + process_reply_to, + browse, + recv_listen, + recv_listen_port, + recv_credit_window, + recv_drain_after_credit_window + ); + + if (selector != "") { + handler.setSelector(selector); + } + + // TODO python defaults to 10 + // int tx_size = 10; + int tx_size = 0; + if (options.is_set("tx-size")) { + tx_size = static_cast (options.get("tx-size")); + } + handler.setBatchSize(tx_size); + + try { + if (conn_clientid != "") { + container(handler, conn_clientid).run(); + } else if (conn_clientid_prefix != "") { + conn_clientid = conn_clientid_prefix + ::proton::uuid::random().str(); + + container(handler, conn_clientid).run(); + } else { + container(handler).run(); + } + + return 0; + } catch (const std::exception& e) { + std::cerr << "error: " << e.what() << std::endl; + } + + return 1; +} + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ diff --git a/src/api/qpid-proton/reactor/TxReceivingClient.h b/src/api/qpid-proton/reactor/TxReceivingClient.h new file mode 100644 index 0000000..ac5a2e0 --- /dev/null +++ b/src/api/qpid-proton/reactor/TxReceivingClient.h @@ -0,0 +1,48 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: TxReceivingClient.h + * Author: pematous + * + * Created on October 19, 2015, 4:25 PM + */ + +#ifndef TXRECEIVINGCLIENT_H +#define TXRECEIVINGCLIENT_H + +#include "ReceivingClient.h" +#include "handler/TxReceiverHandler.h" + +using proton::message; +using proton::uuid; + +namespace dtests { +namespace proton { +namespace reactor { + +using namespace dtests::common; + +/** + * Implements a receiving client using the proton reactive API + */ +class TxReceivingClient : public ReceivingClient { + public: + TxReceivingClient(); + virtual ~TxReceivingClient(); + + virtual int run(int argc, char **argv) const; + + private: + typedef ReceivingClient super; +}; + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ + +#endif /* TXRECEIVINGCLIENT_H */ + diff --git a/src/api/qpid-proton/reactor/TxSendingClient.cpp b/src/api/qpid-proton/reactor/TxSendingClient.cpp new file mode 100644 index 0000000..8148885 --- /dev/null +++ b/src/api/qpid-proton/reactor/TxSendingClient.cpp @@ -0,0 +1,340 @@ +/* + * TxSendingClient.cpp + * + * Created on: Nov 20, 2024 + * Author: pematous + */ + +#include "TxSendingClient.h" + +using proton::message; +using proton::container; + + +namespace dtests { +namespace proton { +namespace reactor { + +using namespace std; +using dtests::common::UriParser; + +TxSendingClient::TxSendingClient() +: super() + +{ + +} + +TxSendingClient::~TxSendingClient() +{ + +} + +int TxSendingClient::run(int argc, char **argv) const +{ + const string usage = "usage: %prog [OPTION]... DIR [FILE]..."; + const string version = "1.0"; + const string desc = "C/C++ AMQ reactive API sender client for Qpid Proton"; + + SenderOptionsParser parser = SenderOptionsParser(); + UriParser uri_parser = UriParser(); + + /** + * WARNING: do not reassign the result of chainned calls to usage/version/etc + * because it causes the code to segfault. For example, this crashes: + * + * ControlOptions parser = ControlOptions().usage(usage) + */ + parser.usage(usage).version(version).description(desc); + + optparse::Values options = parser.parse_args(argc, argv); + + parser.validate(options); + + setLogLevel(options); + + // Temporary variable for address, will search for prefix + string tempAddress = options["broker-url"]; + // Variable for final address + string address; + // Variable for recognition of topic + bool is_topic = false; + // Search for prefix + std::size_t prefix_index = tempAddress.find("topic://"); + // If prefix is present + if (prefix_index != std::string::npos) { + // Delete prefix + address = tempAddress.replace(prefix_index, 8, ""); + // Set that it will be topic + is_topic = true; + // If prefix is NOT present + } else { + // Use full address + address = tempAddress; + } + + uri_parser.parse(options["broker-url"]); + + std::vector conn_urls; + if (options.is_set("conn-urls")) { + std::stringstream conn_urls_string(options["conn-urls"]); + std::string segment; + + while(std::getline(conn_urls_string, segment, ',')) { + conn_urls.push_back(segment); + } + } + + string user = ""; + if (options.is_set("user")) { + user = options["user"]; + } else { + user = uri_parser.getUser(); + } + + string password = ""; + if (options.is_set("password")) { + password = options["password"]; + } else { + password = uri_parser.getPassword(); + } + + string sasl_mechanisms = ""; + if (options.is_set("sasl-mechanisms")) { + sasl_mechanisms = options["sasl-mechanisms"]; + } else if (user != "" && password != "") { + sasl_mechanisms = "PLAIN"; + } else { + sasl_mechanisms = "ANONYMOUS"; + } + + string conn_sasl_enabled = "true"; + if (options.is_set("conn-sasl-enabled")) { + conn_sasl_enabled = options["conn-sasl-enabled"]; + std::transform(conn_sasl_enabled.begin(), conn_sasl_enabled.end(), conn_sasl_enabled.begin(), ::tolower); + } + + string conn_ssl_certificate = ""; + if (options.is_set("conn-ssl-certificate")) { + conn_ssl_certificate = options["conn-ssl-certificate"]; + } + + string conn_ssl_private_key = ""; + if (options.is_set("conn-ssl-private-key")) { + conn_ssl_private_key = options["conn-ssl-private-key"]; + } + + string conn_ssl_password = ""; + if (options.is_set("conn-ssl-password")) { + conn_ssl_password = options["conn-ssl-password"]; + } + + string conn_ssl_trust_store = ""; + if (options.is_set("conn-ssl-trust-store")) { + conn_ssl_trust_store = options["conn-ssl-trust-store"]; + } + + bool conn_ssl_verify_peer = options.is_set("conn-ssl-verify-peer"); + + bool conn_ssl_verify_peer_name = options.is_set("conn-ssl-verify-peer-name"); + + bool conn_reconnect_custom = false; + + string conn_reconnect = "true"; + if (options.is_set("conn-reconnect")) { + conn_reconnect = options["conn-reconnect"]; + std::transform(conn_reconnect.begin(), conn_reconnect.end(), conn_reconnect.begin(), ::tolower); + } + + int32_t conn_reconnect_interval = 10; + if (options.is_set("conn-reconnect-interval")) { + conn_reconnect_interval = std::strtol(options["conn-reconnect-interval"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + int32_t conn_reconnect_limit = 0; + if (options.is_set("conn-reconnect-limit")) { + conn_reconnect_limit = std::strtol(options["conn-reconnect-limit"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + int32_t conn_reconnect_timeout = duration::FOREVER.milliseconds(); + if (options.is_set("conn-reconnect-timeout")) { + conn_reconnect_timeout = std::strtol(options["conn-reconnect-timeout"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + uint32_t conn_reconnect_first = 0; + if (options.is_set("conn-reconnect-first")) { + conn_reconnect_first = std::strtoul(options["conn-reconnect-first"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + uint32_t conn_reconnect_increment = 100; + if (options.is_set("conn-reconnect-increment")) { + conn_reconnect_increment = std::strtoul(options["conn-reconnect-increment"].c_str(), NULL, 10); + + conn_reconnect_custom = true; + } + + bool conn_reconnect_doubling = true; + if (options.is_set("conn-reconnect-doubling")) { + if (options["conn-reconnect-doubling"] == "false") { + conn_reconnect_doubling = false; + } + + conn_reconnect_custom = true; + } + + uint32_t conn_heartbeat = 0; + if (options.is_set("conn-heartbeat")) { + conn_heartbeat = std::strtoul(options["conn-heartbeat"].c_str(), NULL, 10); + } + + uint32_t max_frame_size = -1; + if (options.is_set("conn-max-frame-size")) { + max_frame_size = std::strtoul(options["conn-max-frame-size"].c_str(), NULL, 10); + } + + OptionsSetter setter = OptionsSetter(options); + + int timeout = 1; + if (options.is_set("timeout")) { + timeout = static_cast (options.get("timeout")); + } + + string log_msgs = ""; + if (options.is_set("log-msgs")) { + log_msgs = options["log-msgs"]; + } + + int duration = 0; + if (options.is_set("duration")) { + duration = static_cast (options.get("duration")); + } + + string duration_mode = "after-send"; + if (options.is_set("duration-mode")) { + duration_mode = options["duration-mode"]; + } + + bool trace_messages = false; + if (options.is_set("trace-messages")) { + if (options["trace-messages"] == "1" || options["trace-messages"] == "true") { + trace_messages = true; + } + } + if (trace_messages) { + enableTracing("aac3_sender"); + } + + string tx_action = "commit"; + if (options.is_set("tx-action")) { + tx_action = options["tx-action"]; + } + + string tx_endloop_action = "none"; + if (options.is_set("tx-endloop-action")) { + tx_endloop_action = options["tx-endloop-action"]; + } + + message msg; + + setMessageOptions(setter, msg); + setMessageProperties(parser.callbackProperty, &msg); + + if (parser.callbackList.str.length() > 0) { + // List + setMessageList(parser.callbackList, &msg); + } else if (parser.callbackMap.str.length() > 0) { + // Map + setMessageMap(parser.callbackMap, &msg); + } else { + // Text + setMessageText(options["msg-content"], &msg); + } + + +/* + * Note 1: this is a left-over from setMessageOptions. Since I don't want to + * change the method signature there, I check again here and set the remaining + * option that cannot be done implicitly above. + * + * Note 2: this is a hack for GCC ~4.4.7 on i686. + */ +#ifndef ENABLE_IMPLICIT_CONVERSIONS + long value = options.get("msg-ttl"); + + msg.ttl(::proton::duration(value)); +#endif + if (options.is_set("msg-group-seq")) { + msg.group_sequence(std::strtol(options["msg-group-seq"].c_str(), NULL, 10)); + } + + bool conn_use_config_file = options.is_set("conn-use-config-file"); + + TxSenderHandler handler = TxSenderHandler( + address, + conn_urls, + is_topic, + user, + password, + sasl_mechanisms, + conn_sasl_enabled, + conn_ssl_certificate, + conn_ssl_private_key, + conn_ssl_password, + conn_ssl_trust_store, + conn_ssl_verify_peer, + conn_ssl_verify_peer_name, + timeout, + duration, + duration_mode, + conn_reconnect, + conn_reconnect_interval, + conn_reconnect_limit, + conn_reconnect_timeout, + conn_reconnect_first, + conn_reconnect_increment, + conn_reconnect_doubling, + conn_reconnect_custom, + conn_heartbeat, + max_frame_size, + conn_use_config_file, + log_msgs, + tx_action, + tx_endloop_action + ); + + handler.setMessage(msg); + + int count = 1; + if (options.is_set("count")) { + count = static_cast (options.get("count")); + } + handler.setCount(count); + + int tx_size = 0; + if (options.is_set("tx-size")) { + tx_size = static_cast (options.get("tx-size")); + } + handler.setBatchSize(tx_size); + + try { + container(handler).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << "error: " << e.what() << std::endl; + } + + return 1; +} + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ diff --git a/src/api/qpid-proton/reactor/TxSendingClient.h b/src/api/qpid-proton/reactor/TxSendingClient.h new file mode 100644 index 0000000..55bc01c --- /dev/null +++ b/src/api/qpid-proton/reactor/TxSendingClient.h @@ -0,0 +1,41 @@ +/* + * TxSendingClient.h + * + * Created on: Nov 20, 2024 + * Author: pematous + */ + +#ifndef DTESTS_NODE_DATA_CLIENTS_LANG_CPP_APIS_PROTON_REACTOR_TXSENDINGCLIENT_H_ +#define DTESTS_NODE_DATA_CLIENTS_LANG_CPP_APIS_PROTON_REACTOR_TXSENDINGCLIENT_H_ + +#include "SendingClient.h" +#include "handler/TxSenderHandler.h" + +using proton::message; + +namespace dtests { +namespace proton { +namespace reactor { + +using namespace dtests::common; + +/** + * Implements a connecting client using the proton reactive API + */ +class TxSendingClient : public SendingClient { + public: + TxSendingClient(); + virtual ~TxSendingClient(); + + virtual int run(int argc, char **argv) const; + + private: + typedef SendingClient super; +}; + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ + + +#endif /* DTESTS_NODE_DATA_CLIENTS_LANG_CPP_APIS_PROTON_REACTOR_TXSENDINGCLIENT_H_ */ diff --git a/src/api/qpid-proton/reactor/handler/ReceiverHandler.h b/src/api/qpid-proton/reactor/handler/ReceiverHandler.h index 0ff9ff0..b23101c 100644 --- a/src/api/qpid-proton/reactor/handler/ReceiverHandler.h +++ b/src/api/qpid-proton/reactor/handler/ReceiverHandler.h @@ -181,13 +181,13 @@ class ReceiverHandler : public CommonHandler { void setSelector(string selector); void createSubscriptionName(string customPrefix); - private: + protected: typedef CommonHandler super; receiver recv; listener lsnr; container *cont; double ts; - + struct timer_event_t : public void_function0 { ReceiverHandler &parent; timer_event_t(ReceiverHandler &handler): parent(handler) { } diff --git a/src/api/qpid-proton/reactor/handler/SenderHandler.h b/src/api/qpid-proton/reactor/handler/SenderHandler.h index 0b81439..27b8897 100644 --- a/src/api/qpid-proton/reactor/handler/SenderHandler.h +++ b/src/api/qpid-proton/reactor/handler/SenderHandler.h @@ -151,7 +151,7 @@ class SenderHandler : public CommonHandler { */ message getMessage() const; - private: + protected: typedef CommonHandler super; bool ready; int count; diff --git a/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp new file mode 100644 index 0000000..316c495 --- /dev/null +++ b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp @@ -0,0 +1,482 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: TxReceiverHandler.cpp + * Author: pematous + * + * Created on October 20, 2024 + */ + +#include + +#include + +#include "TxReceiverHandler.h" + +using namespace dtests::common; +using namespace dtests::common::log; +using namespace dtests::proton::reactor; + +TxReceiverHandler::TxReceiverHandler( + const string &url, + vector conn_urls, + bool is_topic, + bool durable_subscriber, + bool subscriber_unsubscribe, + string durable_subscriber_prefix, + string durable_subscriber_name, + bool shared_subscriber, + string msg_action, + int msg_action_size, + string user, + string password, + string sasl_mechanisms, + string conn_sasl_enabled, + string conn_ssl_certificate, + string conn_ssl_private_key, + string conn_ssl_password, + string conn_ssl_trust_store, + bool conn_ssl_verify_peer, + bool conn_ssl_verify_peer_name, + int timeout, + int count, + int duration_time, + string duration_mode, + string conn_reconnect, + int32_t conn_reconnect_interval, + int32_t conn_reconnect_limit, + int32_t conn_reconnect_timeout, + uint32_t conn_reconnect_first, + uint32_t conn_reconnect_increment, + bool conn_reconnect_doubling, + bool conn_reconnect_custom, + uint32_t conn_heartbeat, + uint32_t max_frame_size, + bool conn_use_config_file, + string log_msgs, + bool process_reply_to, + bool browse, + string recv_listen, + int recv_listen_port, + int recv_credit_window, + bool recv_drain_after_credit_window, + string tx_action, + string tx_endloop_action +) + : super( + url, + conn_urls, + is_topic, + durable_subscriber, + subscriber_unsubscribe, + durable_subscriber_prefix, + durable_subscriber_name, + shared_subscriber, + msg_action, + msg_action_size, + user, + password, + sasl_mechanisms, + conn_sasl_enabled, + conn_ssl_certificate, + conn_ssl_private_key, + conn_ssl_password, + conn_ssl_trust_store, + conn_ssl_verify_peer, + conn_ssl_verify_peer_name, + timeout, + count, + duration_time, + duration_mode, + conn_reconnect, + conn_reconnect_interval, + conn_reconnect_limit, + conn_reconnect_timeout, + conn_reconnect_first, + conn_reconnect_increment, + conn_reconnect_doubling, + conn_reconnect_custom, + conn_heartbeat, + max_frame_size, + conn_use_config_file, + log_msgs, + process_reply_to, + browse, + recv_listen, + recv_listen_port, + recv_credit_window, + recv_drain_after_credit_window + ), + tx_action(tx_action), + tx_endloop_action(tx_endloop_action) +{ +} + +TxReceiverHandler::~TxReceiverHandler() +{ +} + +void TxReceiverHandler::setBatchSize(int batchSize) +{ + this->batch_size = batchSize; +} + +int TxReceiverHandler::getBatchSize() const +{ + return batch_size; +} + +// reactor methods + +void TxReceiverHandler::on_session_open(session &s) { + logger(trace) << "[on_session_open] declare_txn started..."; + s.declare_transaction(*this); + logger(trace) << "[on_session_open] declare_txn ended..."; + logger(debug) << "[on_session_open] transaction batch size: " << batch_size; +} + +void TxReceiverHandler::on_transaction_declare_failed(session) {} + +void TxReceiverHandler::on_transaction_commit_failed(session s) { + logger(debug) << "[on_transaction_commit_failed] Transaction Commit Failed"; + s.connection().close(); + exit(-1); +} + +void TxReceiverHandler::on_transaction_declared(session s) { + // TODO python some weird magic around count 0, doesn't make much sense to me yet + // when fixes take care about all count checks ofr zero + if (count != 0 && processed + batch_size > count) { + batch_size = count % batch_size; + } else if (count != 0) { + batch_size = count; + } + logger(trace) << "[on_transaction_declared] txn called " << (&s); + logger(debug) << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty()); +} + +void TxReceiverHandler::on_transaction_aborted(session s) { + processed += current_batch; + current_batch = 0; + logger(debug) << "[on_transaction_aborted] messages aborted, processed: " << processed; + if (count == 0 || processed < count) { + s.declare_transaction(*this); + } else { + logger(info) << "[on_transaction_aborted] All messages processed"; + s.connection().close(); + } +} + +void TxReceiverHandler::on_transaction_committed(session s) { + processed += current_batch; + current_batch = 0; + logger(debug) << "[on_transaction_committed] messages committed, processed: " << processed; + if (count == 0 || processed < count) { + s.declare_transaction(*this); + } else { + logger(info) << "[on_transaction_committed] All messages processed"; + s.connection().close(); + } +} + +void TxReceiverHandler::on_container_start(container &c) +{ + logger(debug) << "[on_container_start] Starting messaging transaction handler"; + logger(debug) << "[on_container_start] User: " << user; + logger(debug) << "[on_container_start] Password: " << password; + logger(debug) << "[on_container_start] SASL mechanisms: " << sasl_mechanisms; + logger(debug) << "[on_container_start] SASL enabled: " << conn_sasl_enabled; + logger(debug) << "[on_container_start] Maximum frame size: " << max_frame_size; + logger(debug) << "[on_container_start] Topic: " << is_topic; + logger(debug) << "[on_container_start] Transaction batch size: " << batch_size; + logger(debug) << "[on_container_start] Transaction action: " << tx_action; + logger(debug) << "[on_container_start] Transaction endloop action: " << tx_endloop_action; + logger(trace) << "[on_container_start] Messages count: " << count; + logger(debug) << "[on_container_start] Messages processed: " << processed; + logger(debug) << "[on_container_start] Peer to Peer: " << recv_listen; + + if (recv_listen == "true") { + cont = &c; + } + + connection_options conn_opts; + std::vector< ::proton::symbol > caps; + + if (is_topic) { + caps.push_back("topic"); + + if (durable_subscriber || subscriber_unsubscribe) { + createSubscriptionName(durable_subscriber_prefix); + } + + if (shared_subscriber) { + caps.push_back("shared"); + caps.push_back("global"); + } + } + + logger(debug) << "[on_container_start] Source capabilities: "; + for (std::vector< ::proton::symbol >::const_iterator i = caps.begin(); i != caps.end(); ++i) { + logger(debug) << *i; + } + + if (!user.empty()) conn_opts.user(user); + if (!password.empty()) conn_opts.password(password); + + if (conn_sasl_enabled == "false") { + conn_opts.sasl_enabled(false); + } else { + conn_opts.sasl_enabled(true); + } + + conn_opts.sasl_allow_insecure_mechs(true); + conn_opts.sasl_allowed_mechs(sasl_mechanisms); + // conn_opts.max_frame_size(max_frame_size); + conn_opts.failover_urls(conn_urls); + + logger(debug) << "[on_container_start] Setting a reconnect timer: " << conn_reconnect; + logger(debug) << "[on_container_start] Custom reconnect: " << conn_reconnect_custom; + + configure_reconnect(conn_opts); + configure_ssl(c); + + if (conn_heartbeat != 0) { + logger(debug) << "[on_container_start] Heartbeat: " << conn_heartbeat; + + duration heartbeat_seconds = conn_heartbeat * duration::SECOND; + + conn_opts.idle_timeout(heartbeat_seconds); + } + + logger(debug) << "[on_container_start] Browsing: " << browse; + + if (browse) { + logger(debug) << "[on_container_start] Creating a receiver and connecting to the server"; + + source_options s_opts = source_options() + .distribution_mode(source::COPY) + .filters(this->fm) + .capabilities(caps); + + if (durable_subscriber || subscriber_unsubscribe) { + s_opts.durability_mode( ::proton::source::UNSETTLED_STATE ); + s_opts.expiry_policy( ::proton::source::NEVER ); + } + + receiver_options r_opts = c.receiver_options() + .source( + s_opts + ); + + if (duration_time > 0 && recv_credit_window == -1) { + r_opts.credit_window(1); + } else if (recv_credit_window != -1) { + r_opts.credit_window(recv_credit_window); + } + + if (durable_subscriber || subscriber_unsubscribe) { + r_opts.name(durable_subscriber_name); + } + + connection conn; + if (conn_use_config_file) { + conn = c.connect(); + } else { + conn = c.connect(broker_url.getUri(), conn_opts); + } + + recv = conn.open_receiver( + broker_url.getPath(), + r_opts + ); + + work_q = &recv.work_queue(); + } else { + logger(debug) << "[on_container_start] Peer-to-peer: " << recv_listen; + logger(debug) << "[on_container_start] Peer-to-peer port: " << recv_listen_port; + + if (recv_listen == "true") { + logger(debug) << "[on_container_start] Creating a listener"; + // P2P + stringstream ss; + ss << "0.0.0.0:"; + ss << recv_listen_port; + lsnr = c.listen(ss.str(), conn_opts); + } else { + logger(debug) << "[on_container_start] Creating a receiver and connecting to the server"; + + source_options s_opts = source_options().filters(this->fm).capabilities(caps); + + if (durable_subscriber || subscriber_unsubscribe) { + s_opts.durability_mode( ::proton::source::UNSETTLED_STATE ); + s_opts.expiry_policy( ::proton::source::NEVER ); + } + + receiver_options r_opts = c.receiver_options() + .source( + s_opts + ); + + if (durable_subscriber || subscriber_unsubscribe) { + r_opts.name(durable_subscriber_name); + } + + if (duration_time > 0 && recv_credit_window == -1) { + r_opts.credit_window(1); + } else if (recv_credit_window != -1) { + r_opts.credit_window(recv_credit_window); + } + + connection conn; + if (conn_use_config_file) { + conn = c.connect(); + } else { + conn = c.connect(broker_url.getUri(), conn_opts); + } + + recv = conn.open_receiver( + broker_url.getPath(), + r_opts + ); + + work_q = &recv.work_queue(); + } + } + logger(debug) << "[on_container_start] Connected to the broker/p2p and waiting for messages"; + + if (subscriber_unsubscribe && durable_subscriber_name != "") { + recv.close(); + recv.connection().close(); + } else { + ts = get_time(); +#if defined(__REACTOR_HAS_TIMER) + if (recv_listen != "true") { + work_q->schedule(duration::IMMEDIATE, make_work(&TxReceiverHandler::timerEvent, this)); + } else { + cont->schedule(duration::IMMEDIATE, make_work(&TxReceiverHandler::timerEvent, this)); + } +#endif + } + +} + +void TxReceiverHandler::on_message(delivery &d, message &m) +{ + logger(debug) << "[on_message] Processing received message"; + + // TODO legit? + session s = d.session(); + + s.txn_accept(d); + current_batch += 1; + + logger(debug) << "[on_message] current batch: " << current_batch; + + if (log_msgs == "dict") { + logger(trace) << "[on_message] Decoding message"; + ReactorDecoder decoder = ReactorDecoder(m); + + std::ostringstream stream; + DictWriter writer = DictWriter(&stream); + + DictFormatter formatter = DictFormatter(); + formatter.printMessage(&decoder, &writer); + + writer.endLine(); + std::cout << writer.toString(); + } else if (log_msgs == "interop") { + DictFormatter formatter = DictFormatter(); + + formatter.printMessageInterop(m); + } + + if (duration_time > 0 && duration_mode == "after-receive") { + logger(debug) << "[on_message] Waiting..."; + sleep4next(ts, count, duration_time, processed + current_batch); + } + + if(((processed + current_batch) % msg_action_size) == 0) { + do_message_action(d); + } + + if (duration_time > 0 && duration_mode == "after-receive-action") { + sleep4next(ts, count, duration_time, processed + current_batch); + } + + logger(debug) << "[on_message] Process-reply-to: " << process_reply_to; + + if (process_reply_to) { + if (m.reply_to() != "") { + logger(debug) << "[on_message] Reply-to address: " << m.reply_to(); + + do_process_reply_to(m); + } else { + logger(debug) << "[on_message] Reply-to address is not set"; + } + } + + if (recv_drain_after_credit_window && processed + current_batch == recv_credit_window) { + logger(debug) << "[on_message] Scheduling drain"; + d.receiver().work_queue().add(make_work(&TxReceiverHandler::drain, this)); + } + + if (!process_reply_to && processed + current_batch == count) { + if (durable_subscriber) { + d.receiver().detach(); + } else { + d.receiver().close(); + } + d.connection().close(); + } else { +#if defined(__REACTOR_HAS_TIMER) + super::timer.reset(); +#endif + } + + if(current_batch == batch_size) { + logger(debug) << "[send] Transaction attempt: " << tx_action; + if (tx_action == "commit") { + s.txn_commit(); + } else if (tx_action == "rollback") { + s.txn_abort(); + } + + if (tx_action == "none") { + if (processed + current_batch == count) { + recv.connection().close(); + } else { + processed += current_batch; + current_batch = 0; + s.declare_transaction(*this); + } + } + + if (duration_time > 0 && duration_mode == "after-receive-action-tx-action") { + // TODO: not implemented yet + } + + } else if (count != 0 && processed + current_batch == count) { + logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action; + if (tx_endloop_action == "commit") { + s.txn_commit(); + } else if (tx_endloop_action == "rollback") { + s.txn_abort(); + } else { + recv.connection().close(); + } + } +} + +void TxReceiverHandler::on_transport_close(transport &t) { + logger(debug) << "[on_transport_close] Closing the transport"; + current_batch = 0; + if (conn_reconnect == "false") { + exit(1); + } else if (processed == count) { + exit(0); + } +} diff --git a/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h new file mode 100644 index 0000000..ac642e1 --- /dev/null +++ b/src/api/qpid-proton/reactor/handler/TxReceiverHandler.h @@ -0,0 +1,174 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: TxReceiverHandler.h + * Author: pematous + * + * Created on November 20, 2024 + */ + +#ifndef TXRECEIVERHANDLER_H +#define TXRECEIVERHANDLER_H + +#include + +#include "ReceiverHandler.h" + +using proton::transaction; +using proton::session; +using proton::transaction_handler; + +#ifdef PN_CPP_HAS_STD_FUNCTION +#undef PN_CPP_HAS_STD_FUNCTION +#endif + +namespace dtests { +namespace proton { +namespace reactor { + +/** + * A proton transaction message handler that handles message receive events + */ +class TxReceiverHandler : public ReceiverHandler, transaction_handler { + public: + /** + * Constructor + * @param url broker URL + * @param conn_urls connection URLs + * @param is_topic if target is topic + * @param durable_subscriber durable subscription to topic + * @param subscriber_unsubscribe unsubscribe durable subscriptor + * @param durable_subscriber_prefix prefix to use to identify subscriber + * @param durable_subscriber_name name of the durable subscriber to be unsubscribe + * @param shared_subscriber shared subscription to topic + * @param msg_action message action + * @param msg_action_size apply action in the batch of given size + * @param user username + * @param password password + * @param sasl_mechanisms SASL mechanisms + * @param conn_sasl_enabled enable connection SASL + * @param conn_ssl_certificate path to client certificate + * @param conn_ssl_private_key path to client private key + * @param conn_ssl_password client's certificate database password + * @param conn_ssl_trust_store path to client trust store + * @param conn_ssl_verify_peer verifies server certificate + * @param conn_ssl_verify_peer_name verifies connection url against server hostname + * @param timeout timeout + * @param count count of messages to receive + * @param duration_time message actions total duration + * @param duration_mode specifies where to wait to achieve expected duration + * @param conn_reconnect type of reconnection + * @param conn_reconnect_interval reconnect interval + * @param conn_reconnect_limit reconnect limit + * @param conn_reconnect_timeout reconnect timeout + * @param conn_reconnect_first reconnect first + * @param conn_reconnect_increment reconnect increment + * @param conn_reconnect_doubling reconnect doubling + * @param conn_reconnect_custom custom reconnect values + * @param conn_heartbeat connection heartbeat in seconds + * @param max_frame_size maximum frame size + * @param conn_use_config_file use configuration file for connection + * @param log_msgs message log format + * @param process_reply_to send message to reply-to address if enabled and message got reply-to address + * @param browse enable browsing receiver + * @param recv_listen enable p2p listener + * @param recv_listen_port p2p listener port + * @param recv_credit_window receiver credit window + * @param recv_drain_after_credit_window drain aqfter credit window + * @param tx_action transaction action on batch + * @param tx_endloop_action transaction action on last batch + */ + TxReceiverHandler( + const string &url, + vector conn_urls, + bool is_topic, + bool durable_subscriber, + bool subscriber_unsubscribe, + string durable_subscriber_prefix, + string durable_subscriber_name, + bool shared_subscriber, + string msg_action, + int msg_action_size, + string user, + string password, + string sasl_mechanisms, + string conn_sasl_enabled = "true", + string conn_ssl_certificate = "", + string conn_ssl_private_key = "", + string conn_ssl_password = "", + string conn_ssl_trust_store = "", + bool conn_ssl_verify_peer = false, + bool conn_ssl_verify_peer_name = false, + int timeout = 10, + int count = 0, + int duration_time = 0, + string duration_mode = "after-receive", + string conn_reconnect = "true", + int32_t conn_reconnect_interval = -1, + int32_t conn_reconnect_limit = -1, + int32_t conn_reconnect_timeout = -1, + uint32_t conn_reconnect_first = 0, + uint32_t conn_reconnect_increment = 100, + bool conn_reconnect_doubling = true, + bool conn_reconnect_custom = false, + uint32_t conn_heartbeat = 0, + uint32_t max_frame_size = -1, + bool conn_use_config_file = false, + string log_msgs = "", + bool process_reply_to = false, + bool browse = false, + string recv_listen = "false", + int recv_listen_port = 5672, + int recv_credit_window = -1, + bool recv_drain_after_credit_window = false, + string tx_action = "commit", + string tx_endloop_action = "commit" + ); + + virtual ~TxReceiverHandler(); + + /** + * Sets the transaction batch size + * @param batch_size the transaction batch size + */ + void setBatchSize(int batchSize); + + /** + * Gets the transaction batch size + * @return the transaction batch size + */ + int getBatchSize() const; + + // reactor method + void on_session_open(session &s); + void on_transaction_declare_failed(session); + void on_transaction_commit_failed(session s); + void on_transaction_declared(session s); + void on_transaction_committed(session s); + void on_transaction_aborted(session s); + + // overrides + void on_container_start(container &c); + void on_message(delivery &d, message &m); + void on_transport_close(transport &t); + + private: + typedef ReceiverHandler super; + + int batch_size = 0; + int current_batch = 0; + int processed = 0; + string tx_action = "commit"; + string tx_endloop_action = "commit"; +}; + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ + +#endif /* TXRECEIVERHANDLER_H */ + diff --git a/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp b/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp new file mode 100644 index 0000000..be5c475 --- /dev/null +++ b/src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp @@ -0,0 +1,393 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: TxSenderHandler.cpp + * Author: pematous + * + * Created on November 20, 2024 + */ + +#include "TxSenderHandler.h" + +#include "reactor/formatter/ReactorDecoder.h" +#include "formatter/DictFormatter.h" +#include "formatter/DictWriter.h" + +namespace dtests { +namespace proton { +namespace reactor { + +using namespace dtests::common; +using namespace dtests::common::log; + +TxSenderHandler::TxSenderHandler( + const string &url, + vector conn_url, + bool is_topic, + string user, + string password, + string sasl_mechanisms, + string conn_sasl_enabled, + string conn_ssl_certificate, + string conn_ssl_private_key, + string conn_ssl_password, + string conn_ssl_trust_store, + bool conn_ssl_verify_peer, + bool conn_ssl_verify_peer_name, + int timeout, + int duration_time, + string duration_mode, + string conn_reconnect, + int32_t conn_reconnect_interval, + int32_t conn_reconnect_limit, + int32_t conn_reconnect_timeout, + uint32_t conn_reconnect_first, + uint32_t conn_reconnect_increment, + bool conn_reconnect_doubling, + bool conn_reconnect_custom, + uint32_t conn_heartbeat, + uint32_t max_frame_size, + bool conn_use_config_file, + string log_msgs, + string tx_action, + string tx_endloop_action +) + : super( + url, + conn_url, + is_topic, + user, + password, + sasl_mechanisms, + conn_sasl_enabled, + conn_ssl_certificate, + conn_ssl_private_key, + conn_ssl_password, + conn_ssl_trust_store, + conn_ssl_verify_peer, + conn_ssl_verify_peer_name, + timeout, + duration_time, + duration_mode, + conn_reconnect, + conn_reconnect_interval, + conn_reconnect_limit, + conn_reconnect_timeout, + conn_reconnect_first, + conn_reconnect_increment, + conn_reconnect_doubling, + conn_reconnect_custom, + conn_heartbeat, + max_frame_size, + conn_use_config_file, + log_msgs + ), + batch_size(0), + current_batch(0), + processed(0), + tx_action(tx_action), + tx_endloop_action(tx_endloop_action) +{ + +} + +TxSenderHandler::~TxSenderHandler() +{ + logger(debug) << "Destroying the sender handler"; +} + +void TxSenderHandler::setBatchSize(int batchSize) +{ + this->batch_size = batchSize; +} + +int TxSenderHandler::getBatchSize() const +{ + return batch_size; +} + +void TxSenderHandler::checkIfCanSend() { + if (processed < count) { + work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this)); + + if (sndr.credit() > 0) { + logger(debug) << "[checkIfCanSend] Preparing to send message"; + // TODO test w/ session defined + // send(); + } else { + ready = true; + } + } +} + +void TxSenderHandler::send(session s) +{ + logger(debug) << "[send] Preparing to send message"; + int credit = sndr.credit(); + + if (credit == 0) { + logger(warning) << "[send] There not enough credit to send messages"; + } + + logger(debug) << "[send] The handler has enough credit to send " << credit + << " message" << (credit > 1 ? "s" : "" ); + + logger(trace) << "[send] Sending messages through the link"; + + message message_to_send = message(m); + + try { + if (get(message_to_send.body()).find("%d") != string::npos) { + size_t percent_position = get(message_to_send.body()).find("%d"); + stringstream ss; + ss << processed; + string replaced_number = get(message_to_send.body()).replace(percent_position, 2, ss.str()); + message_to_send.body(replaced_number); + } + } catch (conversion_error &) { + } + + + logger(trace) << "[send] Transaction is empty: " << s.txn_is_empty(); + logger(debug) << "[send] Messages processed: " << processed; + logger(trace) << "[send] Current batch: " << current_batch; + while (s.txn_is_declared() && sndr.credit() && (processed + current_batch) < count) + { + s.txn_send(sndr, message_to_send); + current_batch += 1; + + if (log_msgs == "dict") { + ReactorDecoder decoder = ReactorDecoder(message_to_send); + + std::ostringstream stream; + DictWriter writer = DictWriter(&stream); + + DictFormatter formatter = DictFormatter(); + formatter.printMessage(&decoder, &writer); + + writer.endLine(); + std::cout << writer.toString(); + } else if (log_msgs == "interop") { + DictFormatter formatter = DictFormatter(); + + formatter.printMessageInterop(message_to_send); + } + + if (duration_time > 0 && duration_mode == "after-send-tx-action") { + // TODO: Not implemented yet + } + + logger(debug) << "[send] Messages processed: " << processed; + logger(debug) << "[send] Current batch: " << current_batch; + + if(current_batch == batch_size) { + logger(debug) << "[send] Transaction attempt: " << tx_action; + if (tx_action == "commit") { + s.txn_commit(); + } else if (tx_action == "rollback") { + s.txn_abort(); + } + + if (tx_action == "none") { + if (processed + current_batch == count) { + sndr.connection().close(); + } else { + processed += current_batch; + current_batch = 0; + s.declare_transaction(*this); + } + } + } else if (processed + current_batch == count) { + logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action; + if (tx_endloop_action == "commit") { + s.txn_commit(); + } else if (tx_endloop_action == "rollback") { + s.txn_abort(); + } + sndr.connection().close(); + } + } + +#if defined(__REACTOR_HAS_TIMER) + timer.reset(); +#endif + ready = false; +} + +// reactor methods + +void TxSenderHandler::on_sendable(sender &s) +{ + logger(trace) << "[on_sendable] transaction: " << &s; + if (ready) { + send(s.session()); + } +} + +void TxSenderHandler::on_tracker_accept(tracker &t) +{ + logger(trace) << "[on_tracker_accept] Message accepted, confirmed message delivery: " << processed; +} + +void TxSenderHandler::on_connection_close(connection &c) +{ + current_batch = 0; + logger(debug) << "[on_connection_close] Closing connection"; +} + +void TxSenderHandler::on_transaction_declared(session s) { + logger(trace) << "[on_transaction_declared] txn called " << (&s); + logger(trace) << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty()) + << "\t" << s.txn_is_empty(); + send(s); +} + +void TxSenderHandler::on_transaction_committed(session s) { + logger(trace) << "[on_transaction_committed] Messages committed"; + processed += current_batch; + logger(debug) << "[on_transaction_committed] Messages processed" << processed; + if (processed == count) { + logger(trace) << "[on_transaction_committed] All messages processed"; + s.connection().close(); + } else { + logger(trace) << "[on_transaction_committed] Declaring new transaction"; + current_batch = 0; + s.declare_transaction(*this); + } +} + +void TxSenderHandler::on_transaction_aborted(session s) { + logger(trace) << "[on_transaction_aborted] Messages aborted"; + processed += current_batch; + logger(debug) << "[on_transaction_aborted] Messages processed" << processed; + if (processed == count) { + logger(trace) << "[on_transaction_aborted] All messages processed"; + s.connection().close(); + } else { + logger(trace) << "[on_transaction_aborted] Declaring new transaction"; + current_batch = 0; + s.declare_transaction(*this); + } +} + +void TxSenderHandler::on_sender_close(sender &s) { + current_batch = 0; +} + +void TxSenderHandler::on_session_open(session &s) { + logger(trace) << "[on_session_open] declare_txn started..."; + s.declare_transaction(*this); + logger(trace) << "[on_session_open] declare_txn ended..."; +} + +void TxSenderHandler::on_container_start(container &c) +{ + logger(debug) << "[on_container_start] Starting messaging transaction handler"; + logger(debug) << "[on_container_start] User: " << user; + logger(debug) << "[on_container_start] Password: " << password; + logger(debug) << "[on_container_start] SASL mechanisms: " << sasl_mechanisms; + logger(debug) << "[on_container_start] SASL enabled: " << conn_sasl_enabled; + logger(debug) << "[on_container_start] Maximum frame size: " << max_frame_size; + logger(debug) << "[on_container_start] Topic: " << is_topic; + logger(debug) << "[on_container_start] Transaction batch size: " << batch_size; + logger(debug) << "[on_container_start] Transaction action: " << tx_action; + logger(debug) << "[on_container_start] Transaction endloop action: " << tx_endloop_action; + logger(trace) << "[on_container_start] Messages count: " << count; + logger(debug) << "[on_container_start] Messages processed: " << processed; + + std::vector< ::proton::symbol > caps; + + if (is_topic) { + caps.push_back("topic"); + } + + logger(debug) << "[on_container_start] Source capabilities: "; + for (std::vector< ::proton::symbol >::const_iterator i = caps.begin(); i != caps.end(); ++i) { + logger(debug) << *i; + } + connection_options conn_opts; + + if (!user.empty()) conn_opts.user(user); + if (!password.empty()) conn_opts.password(password); + + if (conn_sasl_enabled == "false") { + conn_opts.sasl_enabled(false); + } else { + conn_opts.sasl_enabled(true); + } + + conn_opts.sasl_allow_insecure_mechs(true); + conn_opts.sasl_allowed_mechs(sasl_mechanisms); + // conn_opts.max_frame_size(max_frame_size); + conn_opts.failover_urls(conn_urls); + + logger(debug) << "[on_container_start] Setting a reconnect timer: " << conn_reconnect; + logger(debug) << "[on_container_start] Custom reconnect: " << conn_reconnect_custom; + + configure_reconnect(conn_opts); + configure_ssl(c); + + if (conn_heartbeat != 0) { + logger(debug) << "[on_container_start] Heartbeat: " << conn_heartbeat; + + duration heartbeat_seconds = conn_heartbeat * duration::SECOND; + + conn_opts.idle_timeout(heartbeat_seconds); + } + + logger(debug) << "[on_container_start] Creating a sender"; + + connection conn; + if (conn_use_config_file) { + conn = c.connect(); + } else { + conn = c.connect(broker_url.getUri(), conn_opts); + } + + sndr = conn.open_sender( + broker_url.getPath(), + c.sender_options() + .source( + source_options().capabilities(caps) + ) + ); + + work_q = &sndr.work_queue(); + + logger(trace) << "[on_container_start] Setting up timer"; + + if (duration_time > 0 && count > 0) { + interval = duration((duration_time * duration::SECOND) / count); + + logger(trace) << "[on_container_start] Interval for duration: " << interval.milliseconds() << " ms"; + } + +// TODO +// #if defined(__REACTOR_HAS_TIMER) +// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this)); +// +// if (duration_time > 0 && duration_mode == "after-send") { +// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this)); +// } else if (duration_time > 0 && duration_mode == "before-send") { +// work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this)); +// } else { +// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this)); +// } +// #endif +} + +void TxSenderHandler::on_transaction_declare_failed(session) {} + +void TxSenderHandler::on_transaction_commit_failed(session s) { + logger(error) << "[on_transaction_commit_failed] Transaction Commit Failed"; + s.connection().close(); + exit(1); +} + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ diff --git a/src/api/qpid-proton/reactor/handler/TxSenderHandler.h b/src/api/qpid-proton/reactor/handler/TxSenderHandler.h new file mode 100644 index 0000000..d4997fb --- /dev/null +++ b/src/api/qpid-proton/reactor/handler/TxSenderHandler.h @@ -0,0 +1,150 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +/* + * File: TxSenderHandler.h + * Author: pematous + * + * Created on November 20, 2024 + */ + +#ifndef TXSENDERHANDLER_H +#define TXSENDERHANDLER_H + +#include + +#include "SenderHandler.h" + +using proton::transaction; +using proton::session; +using proton::transaction_handler; + +namespace dtests { +namespace proton { +namespace reactor { + +using dtests::common::Timer; + +/** + * A proton transaction message handler that handles message send events + */ +class TxSenderHandler : public SenderHandler, transaction_handler { + public: + /** + * Constructor + * @param url broker URL + * @param conn_urls connection URLs + * @param is_topic if target is topic + * @param user username + * @param password password + * @param sasl_mechanisms SASL mechanisms + * @param conn_sasl_enabled enable connection SASL + * @param conn_ssl_certificate path to client certificate + * @param conn_ssl_private_key path to client private key + * @param conn_ssl_password client's certificate database password + * @param conn_ssl_trust_store path to client trust store + * @param conn_ssl_verify_peer verifies server certificate + * @param conn_ssl_verify_peer_name verifies connection url against server hostname + * @param timeout timeout + * @param duration message actions total duration + * @param duration_mode specifies where to wait to achieve expected duration + * @param conn_reconnect type or reconnection + * @param conn_reconnect_interval reconnect interval + * @param conn_reconnect_limit reconnect limit + * @param conn_reconnect_timeout reconnect timeout + * @param conn_reconnect_first reconnect first + * @param conn_reconnect_increment reconnect increment + * @param conn_reconnect_doubling reconnect doubling + * @param conn_reconnect_custom custom reconnect values + * @param conn_heartbeat connection heartbeat in seconds + * @param max_frame_size maximum frame size + * @param conn_use_config_file use configuration file for connection + * @param log_msgs message log format + * @param tx_action transaction action on batch + * @param tx_endloop_action transaction action on last batch + */ + TxSenderHandler( + const string &url, + vector conn_urls, + bool is_topic, + string user, + string password, + string sasl_mechanisms, + string conn_sasl_enabled = "true", + string conn_ssl_certificate = "", + string conn_ssl_private_key = "", + string conn_ssl_password = "", + string conn_ssl_trust_store = "", + bool conn_ssl_verify_peer = false, + bool conn_ssl_verify_peer_name = false, + int timeout = 10, + int duration_time = 0, + string duration_mode = "after-send", + string conn_reconnect = "true", + int32_t conn_reconnect_interval = -1, + int32_t conn_reconnect_limit = -1, + int32_t conn_reconnect_timeout = -1, + uint32_t conn_reconnect_first = 0, + uint32_t conn_reconnect_increment = 100, + bool conn_reconnect_doubling = true, + bool conn_reconnect_custom = false, + uint32_t conn_heartbeat = 0, + uint32_t max_frame_size = -1, + bool conn_use_config_file = false, + string log_msgs = "", + string tx_action = "commit", + string tx_endloop_action = "commit" + ); + + virtual ~TxSenderHandler(); + + /** + * Sets the transaction batch size + * @param batch_size the transaction batch size + */ + void setBatchSize(int batchSize); + + /** + * Gets the transaction batch size + * @return the transaction batch size + */ + int getBatchSize() const; + + // overrides + void checkIfCanSend(); + void send(session s); + + // reactor methods + void on_sender_close(sender &s); + void on_transaction_declared(session s); + void on_transaction_committed(session s); + void on_transaction_aborted(session s); + void on_transaction_declare_failed(session s); + void on_transaction_commit_failed(session s); + + // overrides + void on_container_start(container &c); + void on_session_open(session &s); + void on_sendable(sender &s); + void on_tracker_accept(tracker &t); + void on_connection_close(connection &c); + + private: + typedef SenderHandler super; + + int batch_size = 0; + int current_batch = 0; + int processed = 0; + string tx_action = "commit"; + string tx_endloop_action = "commit"; +}; + +} /* namespace reactor */ +} /* namespace proton */ +} /* namespace dtests */ + +#endif /* TXSENDERHANDLER_H */ + diff --git a/src/common/options/modern/ModernOptionsParser.cpp b/src/common/options/modern/ModernOptionsParser.cpp index 8a95a01..03244de 100644 --- a/src/common/options/modern/ModernOptionsParser.cpp +++ b/src/common/options/modern/ModernOptionsParser.cpp @@ -126,6 +126,27 @@ ModernOptionsParser::ModernOptionsParser() .help("client reconnect TIMEOUT (default: -1)") .metavar("TIMEOUT"); + // transactions + add_option("--tx-size") + .dest("tx-size") + .help("transactional mode: batch message count size (default: 0)") + .metavar("TX_SIZE"); + + char const* const choices[] = { "commit", "rollback", "none" }; + add_option("--tx-action") + .dest("tx-action") + .help("transactional action at the end of tx batch (default: commit)") + .type("choice") + .choices(std::begin(choices), std::end(choices)) + .metavar("TX_ACTION"); + + add_option("--tx-endloop-action") + .dest("tx-endloop-action") + .help("transactional action after sending all messages in loop") + .type("choice") + .choices(std::begin(choices), std::end(choices)) + .metavar("TX_ENDLOOP_ACTION"); + /*********************** Reactive C++ API client extras ***********************/ add_option("--conn-reconnect-first") .dest("conn-reconnect-first")