1
1
#include " behaviortree_cpp/loggers/bt_zmq_publisher.h"
2
2
#include " behaviortree_cpp/loggers/bt_flatbuffer_helper.h"
3
3
#include < future>
4
+ #include < zmq.hpp>
4
5
5
6
namespace BT
6
7
{
7
8
std::atomic<bool > PublisherZMQ::ref_count (false );
8
9
9
- void PublisherZMQ::createStatusBuffer ()
10
+ struct PublisherZMQ ::Pimpl
10
11
{
11
- status_buffer_.clear ();
12
- applyRecursiveVisitor (root_node_, [this ](TreeNode* node) {
13
- size_t index = status_buffer_.size ();
14
- status_buffer_.resize (index + 3 );
15
- flatbuffers::WriteScalar<uint16_t >(&status_buffer_[index], node->UID ());
16
- flatbuffers::WriteScalar<int8_t >(&status_buffer_[index + 2 ],
17
- static_cast <int8_t >(convertToFlatbuffers (node->status ())));
18
- });
19
- }
12
+ Pimpl ():
13
+ context (1 )
14
+ , publisher(context, ZMQ_PUB)
15
+ , server(context, ZMQ_REP)
16
+ {}
17
+
18
+ zmq::context_t context;
19
+ zmq::socket_t publisher;
20
+ zmq::socket_t server;
21
+ };
22
+
20
23
21
24
PublisherZMQ::PublisherZMQ (TreeNode* root_node, int max_msg_per_second)
22
25
: StatusChangeLogger(root_node)
23
26
, root_node_(root_node)
24
27
, min_time_between_msgs_(std::chrono::microseconds(1000 * 1000 ) / max_msg_per_second)
25
- , zmq_context_(1 )
26
- , zmq_publisher_(zmq_context_, ZMQ_PUB)
27
- , zmq_server_(zmq_context_, ZMQ_REP)
28
28
, send_pending_(false )
29
+ , zmq_(new Pimpl())
29
30
{
30
31
static bool first_instance = true ;
31
32
if (first_instance)
@@ -43,11 +44,11 @@ PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
43
44
tree_buffer_.resize (builder.GetSize ());
44
45
memcpy (tree_buffer_.data (), builder.GetBufferPointer (), builder.GetSize ());
45
46
46
- zmq_publisher_ .bind (" tcp://*:1666" );
47
- zmq_server_ .bind (" tcp://*:1667" );
47
+ zmq_-> publisher .bind (" tcp://*:1666" );
48
+ zmq_-> server .bind (" tcp://*:1667" );
48
49
49
50
int timeout_ms = 100 ;
50
- zmq_server_ .setsockopt (ZMQ_RCVTIMEO, &timeout_ms, sizeof (int ));
51
+ zmq_-> server .setsockopt (ZMQ_RCVTIMEO, &timeout_ms, sizeof (int ));
51
52
52
53
active_server_ = true ;
53
54
@@ -57,12 +58,12 @@ PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
57
58
zmq::message_t req;
58
59
try
59
60
{
60
- bool received = zmq_server_ .recv (&req);
61
+ bool received = zmq_-> server .recv (&req);
61
62
if (received)
62
63
{
63
64
zmq::message_t reply (tree_buffer_.size ());
64
65
memcpy (reply.data (), tree_buffer_.data (), tree_buffer_.size ());
65
- zmq_server_ .send (reply);
66
+ zmq_-> server .send (reply);
66
67
}
67
68
}
68
69
catch (zmq::error_t & err)
@@ -84,6 +85,20 @@ PublisherZMQ::~PublisherZMQ()
84
85
thread_.join ();
85
86
}
86
87
flush ();
88
+ delete zmq_;
89
+ }
90
+
91
+
92
+ void PublisherZMQ::createStatusBuffer ()
93
+ {
94
+ status_buffer_.clear ();
95
+ applyRecursiveVisitor (root_node_, [this ](TreeNode* node) {
96
+ size_t index = status_buffer_.size ();
97
+ status_buffer_.resize (index + 3 );
98
+ flatbuffers::WriteScalar<uint16_t >(&status_buffer_[index], node->UID ());
99
+ flatbuffers::WriteScalar<int8_t >(&status_buffer_[index + 2 ],
100
+ static_cast <int8_t >(convertToFlatbuffers (node->status ())));
101
+ });
87
102
}
88
103
89
104
void PublisherZMQ::callback (Duration timestamp, const TreeNode& node, NodeStatus prev_status,
@@ -139,7 +154,7 @@ void PublisherZMQ::flush()
139
154
createStatusBuffer ();
140
155
}
141
156
142
- zmq_publisher_ .send (message);
157
+ zmq_-> publisher .send (message);
143
158
send_pending_ = false ;
144
159
// printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
145
160
}
0 commit comments