Skip to content

Commit 692efbd

Browse files
authored
Merge pull request #108 from DUNE-DAQ/ron/bad_omen
add sleep and send timeout checking in order to potentially log "bad …
2 parents 5d5708b + f8c51bb commit 692efbd

File tree

2 files changed

+69
-22
lines changed

2 files changed

+69
-22
lines changed

test/apps/zmq_recv.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@ using namespace dunedaq::ipm;
99

1010
int main(int argc, char* argv[]){
1111
std::string conString="tcp://127.0.0.1:12345";
12-
int npackets=1;
12+
size_t npackets=1;
1313
int nthreads=1;
1414
int timeout = 10;
1515

1616
namespace po = boost::program_options;
1717
po::options_description desc("Simple test program for ZmqReceiver");
1818
desc.add_options()(
19-
"connection,c", po::value<std::string>(&conString), "Connection to listen on")(
20-
"threads,t", po::value<int>(&nthreads), "Number of ZMQ threads")(
21-
"packets,p", po::value<int>(&npackets), "Number of packets per group for reporting")(
22-
"timeout,o", po::value<int>(&timeout), "Timeout, in seconds");
19+
"connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
20+
"threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
21+
"packets,p", po::value<size_t>(&npackets)->default_value(npackets), "Number of packets per group for reporting")(
22+
"timeout,o", po::value<int>(&timeout)->default_value(timeout), "Timeout, in seconds");
2323
try {
2424
po::variables_map vm;
2525
po::store(po::parse_command_line(argc, argv, desc), vm);
@@ -30,6 +30,7 @@ int main(int argc, char* argv[]){
3030
return 0;
3131
}
3232

33+
dunedaq::logging::Logging::setup("ZMQ Test", "zmq_recv");
3334
if (nthreads > 1) {
3435
dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
3536
}
@@ -38,20 +39,36 @@ int main(int argc, char* argv[]){
3839
std::shared_ptr<Receiver> receiver=make_ipm_receiver("ZmqReceiver");
3940
receiver->connect_for_receives({ {"connection_string", conString} });
4041

42+
std::map<uint32_t, uint32_t> last_received_sequence;
43+
int64_t first_latency = 0;
4144
try {
4245
while (true) {
4346
// Last arg is receive timeout
4447
auto start=std::chrono::steady_clock::now();
4548
float bytesReceived=0;
46-
for (int p=0;p<npackets;p++) {
49+
for (size_t p=0;p<npackets;p++) {
4750
Receiver::Response resp=receiver->receive(std::chrono::seconds(timeout));
51+
int64_t recvd_ts = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
4852
bytesReceived+=resp.data.size();
53+
auto this_id = *(reinterpret_cast<uint32_t*>(resp.data.data()));
54+
auto this_sequence = *(reinterpret_cast<uint32_t*>(resp.data.data()) + 1);
55+
auto this_ts = *(reinterpret_cast<uint64_t*>(resp.data.data()) + 1);
56+
57+
if(this_sequence < last_received_sequence[this_id] + 1) {
58+
TLOG() << "Received sequence ID " << this_sequence << " < expected sequence " << (last_received_sequence[this_id] + 1) << " from sender " << this_id;
59+
}
60+
int64_t this_latency = recvd_ts - this_ts;
61+
if(first_latency == 0) {
62+
first_latency = this_latency;
63+
}
64+
TLOG_DEBUG(6) << "Received message " << this_sequence << " from sender " << this_id << ", latency= " << this_latency << " us (diff= " << (this_latency - first_latency) << " us)";
65+
last_received_sequence[this_id] = this_sequence;
4966
}
5067
auto elapsed=std::chrono::steady_clock::now()-start;
5168
auto nano=std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
5269
float bw=bytesReceived/nano;
53-
std::cout << "Received " << bytesReceived << " bytes in "
54-
<< nano << " ns " << bw << " GB/s" << std::endl;
70+
TLOG() << "Received " << bytesReceived << " bytes in "
71+
<< nano << " ns " << bw << " GB/s";
5572
// std::cout << "resp.data=";
5673
// for (auto d: resp.data) {
5774
// std::cout << d << std::endl;

test/apps/zmq_send.cpp

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,31 @@
11
#include "ipm/Sender.hpp"
22
#include "ipm/ZmqContext.hpp"
3-
3+
#include "logging/Logging.hpp"
44
#include "boost/program_options.hpp"
55

66
#include <memory>
77
#include <chrono>
88
#include <cstdlib>
99
#include <cerrno>
10+
#include <unistd.h> // sleep
1011

1112
int main(int argc, char* argv[]){
12-
int npackets=1;
13-
int packetSize=100;
13+
uint32_t npackets=1;
14+
size_t packetSize=100;
15+
size_t interval = 0;
1416
std::string conString="tcp://127.0.0.1:12345";
1517
int nthreads=1;
18+
uint32_t id = 0;
1619

1720
namespace po = boost::program_options;
1821
po::options_description desc("Simple test program for ZmqSender");
1922
desc.add_options()(
20-
"connection,c", po::value<std::string>(&conString), "Connection to listen on")(
21-
"threads,t", po::value<int>(&nthreads), "Number of ZMQ threads")(
22-
"packets,p", po::value<int>(&npackets), "Number of packets to send")(
23-
"packetSize,s", po::value<int>(&packetSize), "Number of bytes per packet");
23+
"connection,c", po::value<std::string>(&conString)->default_value(conString), "Connection to listen on")(
24+
"threads,t", po::value<int>(&nthreads)->default_value(nthreads), "Number of ZMQ threads")(
25+
"packets,p", po::value<uint32_t>(&npackets)->default_value(npackets), "Number of packets to send")(
26+
"packetSize,s", po::value<size_t>(&packetSize)->default_value(packetSize), "Number of bytes per packet")(
27+
"interval,i", po::value<size_t>(&interval)->default_value(interval), "Microseconds to sleep between messages")(
28+
"id", po::value<uint32_t>(&id)->default_value(id), "Identifier for this Sender");
2429
try {
2530
po::variables_map vm;
2631
po::store(po::parse_command_line(argc, argv, desc), vm);
@@ -31,25 +36,50 @@ int main(int argc, char* argv[]){
3136
return 0;
3237
}
3338

39+
dunedaq::logging::Logging::setup("ZMQ Test", "zmq_send");
3440
if (nthreads > 1) {
3541
dunedaq::ipm::ZmqContext::instance().set_context_threads(nthreads);
3642
}
43+
44+
// Set the minimum packet size to 16 bytes, 8 bytes for sequence number and 8 bytes for current time
45+
if(packetSize < 16) {
46+
packetSize = 16;
47+
}
3748

3849
std::shared_ptr<dunedaq::ipm::Sender> sender=dunedaq::ipm::make_ipm_sender("ZmqSender");
3950
sender->connect_for_sends({ {"connection_string", conString} });
4051

41-
std::vector message(packetSize,0);
52+
std::vector<char> message(packetSize,0);
53+
*reinterpret_cast<uint32_t*>(message.data()) = id;
4254

4355
auto start=std::chrono::steady_clock::now();
44-
for (int p=0; p<npackets;p++) {
45-
// Last arg is send timeout
46-
sender->send((void*)message.data(), packetSize, std::chrono::milliseconds(100));
56+
for (uint64_t p=0; p<npackets;p++) {
57+
TLOG_DEBUG(3) << "Preparing Message " << p;
58+
// Last arg is send timeout
59+
int attempt=0;
60+
bool success;
61+
62+
*(reinterpret_cast<uint32_t*>(message.data()) + 1)= p;
63+
*(reinterpret_cast<uint64_t*>(message.data()) + 1) = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
64+
65+
TLOG_DEBUG(4) << "Sending Message " << p;
66+
do {
67+
success = sender->send((void*)message.data(), packetSize, std::chrono::milliseconds(2),"",true);
68+
if (success == false && ++attempt == 1)
69+
TLOG() << "bad omen";
70+
} while (success == false);
71+
72+
TLOG_DEBUG(5) << "Message sent " << p;
73+
if(interval > 0) {
74+
usleep(interval);
75+
}
4776
}
4877

4978
auto elapsed=std::chrono::steady_clock::now()-start;
5079
auto nano=std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count();
5180
float bw=((float)packetSize*npackets)/nano;
52-
std::cout << "Sent " << packetSize*npackets << " bytes in "
53-
<< nano << " ns " << bw << " GB/s" << std::endl;
54-
81+
TLOG() << "Sent " << packetSize*npackets << " bytes in "
82+
<< nano << " ns " << bw << " GB/s";
83+
TLOG() << "Sleep 2 to allow finish???";
84+
sleep(2); // Sleep for 2
5585
}

0 commit comments

Comments
 (0)