Skip to content

Commit a58ce89

Browse files
committed
Add bidirectional pairs of processes
This adds cwpl where processes communicate in pairs, each process publishing N topics in partition Pk and subscribing in Pj, where k is the specified id and j=k+1-2(k%2), so 0 and 1 form a pair, 2 and 3 form a pair, etc. A number of different types are available. Each writer has its own thread, readers record latency and optional write it to a file on termination. Signed-off-by: Erik Boasson <eb@ilities.com>
1 parent 16560b5 commit a58ce89

File tree

2 files changed

+296
-0
lines changed

2 files changed

+296
-0
lines changed

examples/hop/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,6 @@ target_link_libraries(hop CycloneDDS-CXX::ddscxx hop_type)
2727

2828
add_executable(mop mop.cpp)
2929
target_link_libraries(mop CycloneDDS-CXX::ddscxx mop_type)
30+
31+
add_executable(cwpl cwpl.cpp)
32+
target_link_libraries(cwpl CycloneDDS-CXX::ddscxx mop_type)

examples/hop/cwpl.cpp

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
/*
2+
* Copyright(c) 2024 ZettaScale Technology and others
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
7+
* v. 1.0 which is available at
8+
* http://www.eclipse.org/org/documents/edl-v10.php.
9+
*
10+
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
11+
*/
12+
13+
#include <algorithm>
14+
#include <iostream>
15+
#include <fstream>
16+
#include <sstream>
17+
#include <iomanip>
18+
#include <chrono>
19+
#include <thread>
20+
#include <string>
21+
#include <cstdlib>
22+
#include <random>
23+
24+
#include <unistd.h>
25+
26+
#include "dds/dds.hpp"
27+
#include "mop_type.hpp"
28+
29+
using namespace org::eclipse::cyclonedds;
30+
using namespace std::chrono_literals;
31+
using namespace std::chrono;
32+
33+
using CLK = high_resolution_clock;
34+
35+
enum class Type { T8, T128, T1k, T8k, T128k };
36+
37+
static Type type = Type::T128;
38+
static uint32_t pairid = 0;
39+
static uint32_t ntopics = 10;
40+
static bool random_timing = false;
41+
static std::optional<std::string> datafile;
42+
43+
static dds::core::Time mkDDSTime (const time_point<CLK> x)
44+
{
45+
int64_t t = duration_cast<nanoseconds>(x.time_since_epoch()).count();
46+
return dds::core::Time(t / 1000000000, static_cast<uint32_t>(t % 1000000000));
47+
}
48+
49+
static volatile std::atomic<bool> interrupted = false;
50+
static void sigh(int sig)
51+
{
52+
static_cast<void>(sig);
53+
interrupted = true;
54+
}
55+
56+
template<typename T>
57+
static dds::sub::DataReader<T> make_reader(dds::topic::Topic<T> tp)
58+
{
59+
dds::domain::DomainParticipant dp = tp.domain_participant();
60+
std::vector<std::string> spart{"P" + std::to_string(pairid + 1 - 2 * (pairid % 2))};
61+
dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart);
62+
dds::sub::Subscriber sub{dp, sqos};
63+
return dds::sub::DataReader<T>{sub, tp, tp.qos()};
64+
}
65+
66+
template<typename T>
67+
static dds::pub::DataWriter<T> make_writer(dds::topic::Topic<T> tp)
68+
{
69+
dds::domain::DomainParticipant dp = tp.domain_participant();
70+
std::vector<std::string> ppart{"P" + std::to_string(pairid)};
71+
dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart);
72+
dds::pub::Publisher pub{dp, pqos};
73+
return dds::pub::DataWriter<T>{pub, tp, tp.qos()};
74+
}
75+
76+
template<typename T>
77+
static void source(dds::pub::DataWriter<T> wr)
78+
{
79+
T sample{};
80+
sample.k(0);
81+
auto now = CLK::now();
82+
while (!interrupted)
83+
{
84+
wr.write(sample, mkDDSTime(CLK::now()));
85+
++sample.seq();
86+
now += 10ms;
87+
std::this_thread::sleep_until(now);
88+
}
89+
}
90+
91+
template<typename T>
92+
static void randomsource(dds::pub::DataWriter<T> wr)
93+
{
94+
std::random_device ran_dev;
95+
std::mt19937 prng(ran_dev());
96+
std::exponential_distribution<double> intvdist(100);
97+
98+
T sample{};
99+
sample.k(0);
100+
auto now = CLK::now();
101+
while (!interrupted)
102+
{
103+
wr.write(sample, mkDDSTime(CLK::now()));
104+
++sample.seq();
105+
auto delay = std::chrono::duration<double>(intvdist(prng));
106+
if (delay > 1s)
107+
delay = 1s;
108+
now += std::chrono::duration_cast<std::chrono::nanoseconds>(delay);
109+
std::this_thread::sleep_until(now);
110+
}
111+
}
112+
113+
// t = reception time, l = latency, i = topic index, k = source key
114+
struct TLK { int64_t t; double l; uint32_t k; };
115+
struct TLIK { int64_t t; double l; size_t i; uint32_t k; };
116+
struct LIK { double l; size_t i; uint32_t k; };
117+
118+
template<typename T>
119+
class Sink : public dds::sub::NoOpDataReaderListener<T> {
120+
public:
121+
Sink() = default;
122+
123+
const std::vector<TLK>& lats() const {
124+
return lats_;
125+
};
126+
127+
private:
128+
void on_data_available(dds::sub::DataReader<T>& rd)
129+
{
130+
const auto now_clk = CLK::now();
131+
const int64_t now = duration_cast<nanoseconds>(now_clk.time_since_epoch()).count();
132+
auto xs = rd.take();
133+
for (const auto& x : xs) {
134+
if (x.info().valid()) {
135+
const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec());
136+
lats_.push_back(TLK{now, lat / 1e3, x.data().k()});
137+
} else {
138+
interrupted = true;
139+
}
140+
};
141+
}
142+
143+
std::vector<TLK> lats_;
144+
};
145+
146+
template<typename T>
147+
static void run()
148+
{
149+
dds::domain::DomainParticipant dp{0};
150+
auto tpqos = dp.default_topic_qos()
151+
<< dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite())
152+
<< dds::core::policy::History::KeepLast(1);
153+
std::vector<dds::topic::Topic<T>> tps;
154+
std::vector<dds::pub::DataWriter<T>> wrs;
155+
for (uint32_t i = 0; i < ntopics; i++)
156+
tps.push_back(dds::topic::Topic<T>{dp, "Mop" + std::to_string(i), tpqos});
157+
for (auto& tp : tps)
158+
wrs.push_back(make_writer(tp));
159+
std::vector<dds::sub::DataReader<T>> rds;
160+
std::vector<Sink<T>> ls;
161+
for (size_t i = 0; i < tps.size(); i++)
162+
ls.push_back(Sink<T>{});
163+
for (size_t i = 0; i < tps.size(); i++)
164+
rds.push_back(make_reader(tps[i]));
165+
for (size_t i = 0; i < tps.size(); i++)
166+
rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available());
167+
168+
signal(SIGINT, sigh);
169+
signal(SIGTERM, sigh);
170+
std::vector<std::thread> threads;
171+
for (auto wr : wrs)
172+
threads.push_back(std::thread(random_timing ? randomsource<T> : source<T>, wr));
173+
174+
// latencies in microseconds
175+
std::vector<LIK> lats;
176+
while (!interrupted)
177+
std::this_thread::sleep_for(103ms);
178+
for (auto& t : threads)
179+
t.join();
180+
for (auto rd : rds)
181+
rd.close();
182+
for (auto wr : wrs)
183+
wr.close();
184+
// collect latencies for all topics and sort by reception time
185+
std::vector<TLIK> tlats;
186+
for (size_t i = 0; i < ls.size(); i++)
187+
for (const auto& x : ls[i].lats())
188+
tlats.push_back(TLIK{x.t, x.l, i, x.k});
189+
std::sort(tlats.begin(), tlats.end(), [](const TLIK& a, const TLIK& b) -> bool { return a.t < b.t; });
190+
// then reduce to just latency, topic and key
191+
for (const auto& x : tlats)
192+
lats.push_back(LIK{x.l, x.i, x.k});
193+
194+
if (datafile.has_value())
195+
{
196+
std::ofstream f;
197+
f.open(datafile.value());
198+
for (const auto& l : lats)
199+
f << l.l << " " << l.i << " " << l.k << std::endl;
200+
f.close();
201+
}
202+
const size_t n = lats.size();
203+
if (n < 2) {
204+
std::cout << "insufficient data" << std::endl;
205+
} else {
206+
std::sort(lats.begin(), lats.end(), [](const LIK& a, const LIK& b) -> bool { return a.l < b.l; });
207+
std::cout
208+
<< "received " << n
209+
<< " samples; min " << lats[0].l
210+
<< " max-1 " << lats[n-2].l
211+
<< " max " << lats[n-1].l << std::endl;
212+
}
213+
}
214+
215+
[[noreturn]]
216+
static void usage()
217+
{
218+
std::cout
219+
<< "usage: cwpl [OPTIONS] id" << std::endl
220+
<< std::endl
221+
<< "OPTIONS:" << std::endl
222+
<< "-tTYPE type to use one of 8, 128 (def), 1k, 8k, 128k" << std::endl
223+
<< "-nNTPS use N (def = 10) topics in parallel" << std::endl
224+
<< "-r use randomized write intervals with average 10ms" << std::endl
225+
<< "-oFILE write latencies to FILE" << std::endl
226+
<< std::endl
227+
<< "id = 0 writes in partition P0, reads from P1" << std::endl
228+
<< "id = 1 writes in partition P1, reads from P0" << std::endl
229+
<< "id = 2 writes in partition P2, reads from P3" << std::endl
230+
<< "etc." << std::endl;
231+
std::exit(1);
232+
}
233+
234+
static Type convert_typestr (const std::string& typestr)
235+
{
236+
if (typestr == "8") {
237+
return Type::T8;
238+
} else if (typestr == "128") {
239+
return Type::T128;
240+
} else if (typestr == "1k") {
241+
return Type::T1k;
242+
} else if (typestr == "8k") {
243+
return Type::T8k;
244+
} else if (typestr == "128k") {
245+
return Type::T128k;
246+
} else {
247+
std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl;
248+
std::exit(1);
249+
return Type::T128;
250+
}
251+
}
252+
253+
int main (int argc, char **argv)
254+
{
255+
if (argc < 1)
256+
usage();
257+
258+
int opt;
259+
while ((opt = getopt (argc, argv, "n:o:rt:")) != EOF)
260+
{
261+
switch (opt)
262+
{
263+
case 'n':
264+
ntopics = static_cast<uint32_t>(std::atoi(optarg));
265+
break;
266+
case 'o':
267+
datafile = std::string(optarg);
268+
break;
269+
case 'r':
270+
random_timing = true;
271+
break;
272+
case 't':
273+
type = convert_typestr(std::string(optarg));
274+
break;
275+
default:
276+
usage();
277+
}
278+
}
279+
if (argc - optind != 1)
280+
{
281+
usage();
282+
}
283+
pairid = static_cast<uint32_t>(std::atoi(argv[optind]));
284+
switch (type)
285+
{
286+
case Type::T8: run<Mop8>(); break;
287+
case Type::T128: run<Mop128>(); break;
288+
case Type::T1k: run<Mop1k>(); break;
289+
case Type::T8k: run<Mop8k>(); break;
290+
case Type::T128k: run<Mop128k>(); break;
291+
}
292+
return 0;
293+
}

0 commit comments

Comments
 (0)