@@ -18,6 +18,18 @@ enum {
18
18
IDLE_FROM_RUNNING = 10 + static_cast <int >(NodeStatus::RUNNING)
19
19
};
20
20
21
+ struct Transition
22
+ {
23
+ // when serializing, we will remove the initial time and serialize only
24
+ // 6 bytes, instead of 8
25
+ uint64_t timestamp_usec;
26
+ // if you have more than 64.000 nodes, you are doing something wrong :)
27
+ uint16_t node_uid;
28
+ // enough bits to contain NodeStatus
29
+ uint8_t status;
30
+
31
+ uint8_t padding[5 ];
32
+ };
21
33
22
34
std::array<char ,16 > CreateRandomUUID ()
23
35
{
@@ -82,6 +94,10 @@ struct Groot2Publisher::PImpl
82
94
std::chrono::system_clock::time_point last_heartbeat;
83
95
std::chrono::milliseconds max_heartbeat_delay = std::chrono::milliseconds(5000 );
84
96
97
+ std::atomic_bool recording = false ;
98
+ std::deque<Transition> transitions_buffer;
99
+ std::chrono::microseconds recording_fist_time;
100
+
85
101
std::thread heartbeat_thread;
86
102
87
103
zmq::context_t context;
@@ -183,7 +199,7 @@ Groot2Publisher::~Groot2Publisher()
183
199
}
184
200
}
185
201
186
- void Groot2Publisher::callback (Duration, const TreeNode& node,
202
+ void Groot2Publisher::callback (Duration ts , const TreeNode& node,
187
203
NodeStatus prev_status, NodeStatus new_status)
188
204
{
189
205
std::unique_lock<std::mutex> lk (_p->status_mutex );
@@ -193,6 +209,20 @@ void Groot2Publisher::callback(Duration, const TreeNode& node,
193
209
status = 10 + static_cast <char >(prev_status);
194
210
}
195
211
*(_p->status_buffermap .at (node.UID ())) = status;
212
+
213
+ if (_p->recording )
214
+ {
215
+ Transition trans;
216
+ trans.node_uid = node.UID ();
217
+ trans.status = static_cast <uint8_t >(new_status);
218
+ auto timestamp = ts -_p->recording_fist_time ;
219
+ trans.timestamp_usec =
220
+ std::chrono::duration_cast<std::chrono::microseconds>(timestamp).count ();
221
+ _p->transitions_buffer .push_back (trans);
222
+ while (_p->transitions_buffer .size () > 1000 ) {
223
+ _p->transitions_buffer .pop_front ();
224
+ }
225
+ }
196
226
}
197
227
198
228
void Groot2Publisher::flush ()
@@ -239,6 +269,7 @@ void Groot2Publisher::serverLoop()
239
269
240
270
Monitor::ReplyHeader reply_header;
241
271
reply_header.request = request_header;
272
+ reply_header.request .protocol = Monitor::kProtocolID ;
242
273
reply_header.tree_id = serialized_uuid;
243
274
244
275
zmq::multipart_t reply_msg;
@@ -383,6 +414,54 @@ void Groot2Publisher::serverLoop()
383
414
}
384
415
reply_msg.addstr ( json_out.dump () );
385
416
} break ;
417
+
418
+ case Monitor::RequestType::TOGGLE_RECORDING:
419
+ {
420
+ if (requestMsg.size () != 2 ) {
421
+ sendErrorReply (" must be 2 parts message" );
422
+ continue ;
423
+ }
424
+
425
+ auto const cmd = (requestMsg[1 ].to_string ());
426
+ if (cmd == " start" )
427
+ {
428
+ _p->recording = true ;
429
+ auto now = std::chrono::system_clock::now ();
430
+
431
+ _p->recording_fist_time = std::chrono::duration_cast<std::chrono::microseconds>
432
+ (now.time_since_epoch ());
433
+
434
+ reply_msg.addstr (std::to_string (_p->recording_fist_time .count ()));
435
+ std::unique_lock<std::mutex> lk (_p->status_mutex );
436
+ _p->transitions_buffer .clear ();
437
+ }
438
+ else if (cmd == " stop" )
439
+ {
440
+ _p->recording = false ;
441
+ }
442
+ } break ;
443
+
444
+ case Monitor::RequestType::GET_TRANSITIONS:
445
+ {
446
+ thread_local std::string trans_buffer;
447
+ trans_buffer.resize (9 * _p->transitions_buffer .size ());
448
+
449
+ std::unique_lock<std::mutex> lk (_p->status_mutex );
450
+ size_t offset = 0 ;
451
+ for (const auto & trans: _p->transitions_buffer )
452
+ {
453
+ std::memcpy (&trans_buffer[offset], &trans.timestamp_usec , 6 );
454
+ offset += 6 ;
455
+ std::memcpy (&trans_buffer[offset], &trans.node_uid , 2 );
456
+ offset += 2 ;
457
+ std::memcpy (&trans_buffer[offset], &trans.status , 1 );
458
+ offset += 1 ;
459
+ }
460
+ _p->transitions_buffer .clear ();
461
+ trans_buffer.resize (offset);
462
+ reply_msg.addstr (trans_buffer);
463
+ } break ;
464
+
386
465
default : {
387
466
sendErrorReply (" Request not recognized" );
388
467
continue ;
0 commit comments