1- #include < sustain/framework/pub_sub/Subscriber.h>
2-
3- #include < boost/asio.hpp>
4- #include < boost/system/error_code.hpp>
1+ #include < sustain/framework/net/patterns/pub_sub/Subscriber.h>
52
63#include < sustain/framework/util/Error.h>
74
85namespace pfc {
9- struct Subscriber ::Implementation {
10- Implementation ();
11- Implementation (const std::string& service_name, const std::string& multicast_address);
12- ~Implementation ();
13- Implementation (const Implementation&) = delete ;
14- Implementation (Implementation&&) = default ;
15- Implementation& operator =(const Implementation&) = delete ;
16- Implementation& operator =(Implementation&&) = delete ;
17-
18- pfc::Error multicast_setup (const std::string& multicast_address);
19- void multicast_announce ();
20- void multicast_announce_timeout ();
21-
22- std::thread multicast_announce_thread;
23- std::thread multicast_listen_thread;
24-
25- std::atomic<bool > thead_control_shutdown;
26- std::atomic<bool > thead_control_service_registered;
27-
28- boost::asio::io_context io_context;
29- boost::asio::ip::udp::endpoint endpoint;
30- boost::asio::ip::udp::socket socket;
31- boost::asio::steady_timer timer;
32-
33- static constexpr int max_message_count = 25 ;
34- int message_count;
35- std::string message;
36-
37- // TODO: Determine Optimum Size
38- std::array<char , 4096 > data;
39-
40- pfc::Error server_status;
41- };
42- // -----------------------------------------------------------------------------
43- Subscriber::Implementation::Implementation ()
44- : io_context()
45- , socket(io_context)
46- , timer(io_context)
47- {
48- }
49- // -----------------------------------------------------------------------------
50- Subscriber::Implementation::Implementation (const std::string& service_name, const std::string& multicast_address)
51- : io_context()
52- , socket(io_context)
53- , endpoint()
54- , timer(io_context)
55- , thead_control_shutdown(false )
56- , thead_control_service_registered(false )
57- , message_count(0 )
58- , message(" Tutorial Message" )
59- , server_status(Success())
60- {
61- server_status |= multicast_setup (multicast_address);
62- }
63- // -----------------------------------------------------------------------------
64- Subscriber::Implementation::~Implementation ()
65- {
66- thead_control_shutdown = true ;
67- if (multicast_announce_thread.joinable ()) {
68- multicast_announce_thread = std::thread ([]() {});
69- }
70- if (multicast_listen_thread.joinable ()) {
71- multicast_announce_thread = std::thread ([]() {});
72- }
73- }
74- // -----------------------------------------------------------------------------
75- pfc::Error Subscriber::Implementation::multicast_setup (const std::string& multicast_address)
76- {
77- boost::system::error_code err;
78- pfc::Error pfc_error_code = Success ();
79- auto broadcast_address = boost::asio::ip::make_address (multicast_address, err);
80- if (!err) {
81- endpoint = boost::asio::ip::udp::endpoint (broadcast_address, g_pfc_registry_announce_port);
82- socket = boost::asio::ip::udp::socket (io_context, endpoint.protocol ());
83- } else {
84- pfc_error_code = pfc::Error (Error::Code::PFC_IP_PARSE_ERROR);
85- }
86- return pfc_error_code;
87- }
88- // -----------------------------------------------------------------------------
89- void Subscriber::Implementation::multicast_announce ()
90- {
91- std::ostringstream os;
92- os << " Message " << message_count++;
93- message = os.str ();
6+ PubSub_Subscriber::~PubSub_Subscriber () {}
947
95- socket.async_send_to (
96- boost::asio::buffer (message), endpoint,
97- [this ](boost::system::error_code ec, std::size_t /* length*/ ) {
98- if (!ec && message_count < max_message_count)
99- multicast_announce_timeout ();
100- });
101- }
102- // -----------------------------------------------------------------------------
103- void Subscriber::Implementation::multicast_announce_timeout ()
104- {
105- timer.expires_after (std::chrono::seconds (1 ));
106- timer.async_wait (
107- [this ](boost::system::error_code ec) {
108- if (!ec)
109- multicast_announce ();
110- });
111- }
112- // -----------------------------------------------------------------------------
113- Subscriber::Subscriber (std::string service_name, std::string multicast_address)
114- : _impl(std::make_unique<Implementation>(std::move(service_name), std::move(multicast_address)))
115- {
116- }
117- // -----------------------------------------------------------------------------
118- Subscriber::Subscriber (Subscriber&& obj)
119- : _impl(std::move(obj._impl))
120- {
121- obj._impl = std::make_unique<Implementation>();
122- }
123- // -----------------------------------------------------------------------------
124- Subscriber::~Subscriber ()
125- {
8+ void PubSub_Subscriber::listen (std::function<void (void )>) {}
9+ void PubSub_Subscriber::async_listen (std::function<void (void )>) {}
12610
127- }
128- // -----------------------------------------------------------------------------
129- bool Subscriber::Valid ()
130- {
131- return _impl->server_status .is_ok ();
132- }
133- // -----------------------------------------------------------------------------
134- Error Subscriber::Error ()
135- {
136- return _impl->server_status ;
137- }
138- // -----------------------------------------------------------------------------
139- Subscriber& Subscriber::operator =(Subscriber&& obj)
140- {
141- _impl = std::move (obj._impl );
142- obj._impl = std::make_unique<Implementation>();
143- return *this ;
144- }
11+ void PubSub_Subscriber::standup () {}
12+ void PubSub_Subscriber::shutdown () {}
14513}
0 commit comments