1+ #include "ArqStore.h"
2+
3+ #include <stdlib.h>
4+ #include <stored>
5+ #include <string.h>
6+
7+ static stored::Synchronizer synchronizer;
8+
9+ class ArqStore
10+ : public STORE_T(ArqStore, stored::Synchronizable, stored::ArqStoreBase) {
11+ STORE_CLASS(ArqStore, stored::Synchronizable, stored::ArqStoreBase)
12+ public:
13+ ArqStore() is_default
14+ };
15+
16+ static ArqStore store;
17+
18+ /*!
19+ * \brief Simulate a lossy channel.
20+ *
21+ * Depending on the bit error rate (ber) set in the store, bits are flipped.
22+ * Moreover, it allows to set an MTU via the store.
23+ */
24+ class LossyChannel : public stored::ProtocolLayer {
25+ STORED_CLASS_NOCOPY(LossyChannel)
26+ public:
27+ typedef stored::ProtocolLayer base;
28+
29+ LossyChannel(ProtocolLayer* up = nullptr, ProtocolLayer* down = nullptr)
30+ : base(up, down)
31+ {}
32+
33+ virtual ~LossyChannel() override is_default
34+
35+ virtual void decode(void* buffer, size_t len) override
36+ {
37+ char* buffer_ = static_cast<char*>(buffer);
38+ for(size_t i = 0; i < len; i++)
39+ buffer_[i] = lossyByte(buffer_[i]);
40+
41+ //printBuffer(buffer, len, "> ");
42+ base::decode(buffer, len);
43+ }
44+
45+ virtual void encode(void const* buffer, size_t len, bool last = true) override
46+ {
47+ // cppcheck-suppress allocaCalled
48+ char* buffer_ = (char*)alloca(len);
49+ for(size_t i = 0; i < len; i++)
50+ buffer_[i] = lossyByte(static_cast<char const*>(buffer)[i]);
51+
52+ //printBuffer(buffer_, len, "< ");
53+ base::encode(buffer_, len, last);
54+ }
55+
56+ using base::encode;
57+
58+ // Bit error rate
59+ double ber() const
60+ {
61+ return store.ber;
62+ }
63+
64+ char lossyByte(char b)
65+ {
66+ for(int i = 0; i < 8; i++) {
67+ double p =
68+ #ifdef STORED_OS_WINDOWS
69+ (double)::rand() / RAND_MAX;
70+ #else
71+ // flawfinder: ignore
72+ drand48();
73+ #endif
74+ if(p < ber()) {
75+ // Inject an error.
76+ b = (char)(b ^ (char)(1 << (rand() % 8)));
77+ store.injected_errors = store.injected_errors + 1;
78+ }
79+ }
80+ return b;
81+ }
82+
83+ virtual size_t mtu() const override
84+ {
85+ return store.MTU.as<size_t>();
86+ }
87+ };
88+
89+ int main()
90+ {
91+ printf("\nStart synchorizer from ZmqLayer on port 5555.\n");
92+ synchronizer.map(store);
93+
94+ stored::SegmentationLayer segmentation;
95+ stored::SyncConnection const& connection = synchronizer.connect(segmentation);
96+
97+ stored::ArqLayer arq;
98+ arq.wrap(segmentation);
99+
100+ stored::Crc16Layer crc;
101+ crc.wrap(arq);
102+
103+ stored::AsciiEscapeLayer escape;
104+ escape.wrap(crc);
105+
106+ stored::TerminalLayer terminal;
107+ terminal.wrap(escape);
108+
109+ stored::BufferLayer buffer;
110+ buffer.wrap(terminal);
111+
112+ LossyChannel lossy;
113+ lossy.wrap(buffer);
114+
115+ stored::SyncZmqLayer zmq(nullptr, "tcp://*:5555", true);
116+ zmq.wrap(lossy);
117+
118+ stored::Poller poller;
119+ stored::PollableZmqLayer pollableZmq(zmq, stored::Pollable::PollIn);
120+
121+ while(true) {
122+ // 1 s timeout, to force keep alive once in a while.
123+ stored::Poller::Result const& result = poller.poll(1000);
124+
125+ if(result.empty()) {
126+ switch(errno) {
127+ case EAGAIN:
128+ case EINTR:
129+ break;
130+ default:
131+ perror("poll failed");
132+ return 1;
133+ }
134+ }
135+
136+ zmq.recv();
137+ synchronizer.process();
138+ }
139+ return 0;
140+ }
0 commit comments