Skip to content

Commit 05f359a

Browse files
committed
Refactor a bit to have separate Hub/Trunk/Prune logic functions
Paid for itself already - cleaner logic tipped off the fixed branch bug Haven't fixed the bug yet
1 parent 80b6438 commit 05f359a

File tree

2 files changed

+97
-34
lines changed

2 files changed

+97
-34
lines changed

src/MiniPlex.cpp

Lines changed: 83 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,36 @@ MiniPlex::MiniPlex(const CmdArgs& Args, asio::io_context& IOC):
3838
asio::socket_base::receive_buffer_size option(Args.SoRcvBuf.getValue());
3939
socket.set_option(option);
4040

41-
if(Args.Trunk || Args.Prune)
41+
if(Args.Hub)
4242
{
43-
trunk = asio::ip::udp::endpoint(asio::ip::address::from_string(Args.TrunkAddr.getValue()),Args.TrunkPort.getValue());
44-
spdlog::get("MiniPlex")->info("Operating in Trunk mode to {}:{}",Args.TrunkAddr.getValue(),Args.TrunkPort.getValue());
43+
spdlog::get("MiniPlex")->info("Operating in Hub mode.");
44+
ModeHandler = std::bind(&MiniPlex::Hub,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3,std::placeholders::_4);
45+
}
46+
else if(Args.Trunk)
47+
{
48+
spdlog::get("MiniPlex")->info("Operating in Trunk mode.");
49+
ModeHandler = std::bind(&MiniPlex::Trunk,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3,std::placeholders::_4);
50+
}
51+
else if(Args.Prune)
52+
{
53+
spdlog::get("MiniPlex")->info("Operating in Prune mode.");
54+
ModeHandler = std::bind(&MiniPlex::Prune,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3,std::placeholders::_4);
4555
}
4656
else
57+
throw std::runtime_error("Mode error");
58+
59+
if(Args.Trunk || Args.Prune)
4760
{
48-
spdlog::get("MiniPlex")->info("Operating in Hub mode.");
61+
trunk = asio::ip::udp::endpoint(asio::ip::address::from_string(Args.TrunkAddr.getValue()),Args.TrunkPort.getValue());
62+
spdlog::get("MiniPlex")->info("Trunking to {}:{}",Args.TrunkAddr.getValue(),Args.TrunkPort.getValue());
4963
}
64+
5065
for(size_t i=0; i<Args.BranchAddrs.getValue().size(); i++)
5166
{
5267
auto branch = asio::ip::udp::endpoint(asio::ip::address::from_string(Args.BranchAddrs.getValue()[i]),Args.BranchPorts.getValue()[i]);
5368
PermaBranches.emplace_back(branch);
5469
}
70+
5571
socket_strand.post([this](){Rcv();});
5672
spdlog::get("MiniPlex")->info("Listening on {}:{}",Args.LocalAddr.getValue(),Args.LocalPort.getValue());
5773
}
@@ -91,53 +107,87 @@ void MiniPlex::RcvHandler(const asio::error_code err, const uint8_t* const buf,
91107
spdlog::get("MiniPlex")->error("RcvHandler(): error code {}: '{}'",err.value(),err.message());
92108
return;
93109
}
94-
95110
auto sender_string = rcv_sender.address().to_string()+":"+std::to_string(rcv_sender.port());
96111
spdlog::get("MiniPlex")->trace("RcvHandler(): {} bytes from {}",n,sender_string);
97112

98-
if(Args.Hub || rcv_sender != trunk)
113+
const auto& branches = Branches(rcv_sender, sender_string);
114+
ModeHandler(branches,rcv_sender,buf,n);
115+
}
116+
117+
void MiniPlex::Hub(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n)
118+
{
119+
auto pForwardBuf = MakeSharedBuf(buf,n);
120+
Forward(pForwardBuf,n,rcv_sender,PermaBranches,"fixed branches");
121+
Forward(pForwardBuf,n,rcv_sender,branches,"cached branches");
122+
}
123+
124+
void MiniPlex::Trunk(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n)
125+
{
126+
auto pForwardBuf = MakeSharedBuf(buf,n);
127+
if(rcv_sender == trunk)
99128
{
100-
auto added = EndPointCache.Add(rcv_sender);
101-
spdlog::get("MiniPlex")->trace("RcvHandler(): {} cache entry for {}",added?"Inserted":"Refreshed",sender_string);
129+
Forward(pForwardBuf,n,rcv_sender,PermaBranches,"fixed branches");
130+
Forward(pForwardBuf,n,rcv_sender,branches,"cached branches");
102131
}
132+
else
133+
Forward(pForwardBuf,n,rcv_sender,std::list{trunk},"trunk");
134+
}
103135

104-
const auto& cache_EPs = EndPointCache.Keys();
105-
106-
if(Args.Prune && rcv_sender != trunk && !cache_EPs.empty() && rcv_sender != *cache_EPs.begin())
136+
void MiniPlex::Prune(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n)
137+
{
138+
if(rcv_sender != trunk && !branches.empty() && rcv_sender != *branches.begin())
107139
{
108-
spdlog::get("MiniPlex")->debug("RcvHandler(): pruned packet for branch {}",sender_string);
140+
auto sender_string = rcv_sender.address().to_string()+":"+std::to_string(rcv_sender.port());
141+
spdlog::get("MiniPlex")->debug("RcvHandler(): pruned packet from branch {}",sender_string);
109142
return;
110143
}
144+
auto pForwardBuf = MakeSharedBuf(buf,n);
145+
if(rcv_sender == trunk)
146+
{
147+
if(branches.empty())
148+
Forward(pForwardBuf,n,rcv_sender,PermaBranches,"fixed branches");
149+
else
150+
Forward(pForwardBuf,n,rcv_sender,std::list{*branches.begin()},"active prune branch");
151+
}
152+
else
153+
Forward(pForwardBuf,n,rcv_sender,std::list{trunk},"pruned branch");
154+
}
111155

156+
inline std::shared_ptr<uint8_t> MiniPlex::MakeSharedBuf(const uint8_t* const buf, const size_t n)
157+
{
112158
// The C++20 way causes a malloc error when asio tries to copy a handler with this style shared_ptr
113159
//auto pForwardBuf = std::make_shared<uint8_t[]>(n);
114160
// Use the old way instead - only difference should be the control block is allocated separately
115161
auto pForwardBuf = std::shared_ptr<uint8_t>(new uint8_t[n],[](uint8_t* p){delete[] p;});
116162
std::memcpy(pForwardBuf.get(),buf,n);
163+
return pForwardBuf;
164+
}
117165

118-
auto forward_to_branches = [&](const auto& branches, const char* desc)
119-
{
120-
spdlog::get("MiniPlex")->trace("RcvHandler(): Forwarding to {} {}",branches.size(),desc);
121-
for(const auto& endpoint : branches)
122-
if(endpoint != rcv_sender)
123-
socket_strand.post([this,pForwardBuf,n,ep{endpoint}]()
124-
{
125-
socket.async_send_to(asio::buffer(pForwardBuf.get(),n),ep,[pForwardBuf](asio::error_code,size_t){});
126-
});
127-
};
128-
129-
if(Args.Hub || rcv_sender == trunk)
166+
void MiniPlex::Forward(
167+
const std::shared_ptr<uint8_t>& pBuf,
168+
const size_t size,
169+
const asio::ip::udp::endpoint& sender,
170+
const std::list<asio::ip::udp::endpoint>& branches,
171+
const char* desc)
172+
{
173+
spdlog::get("MiniPlex")->trace("Forward(): sending to {} {}",branches.size(),desc);
174+
for(const auto& endpoint : branches)
175+
if(endpoint != sender)
176+
socket_strand.post([this,pBuf,size,ep{endpoint}]()
177+
{
178+
socket.async_send_to(asio::buffer(pBuf.get(),size),ep,[pBuf](asio::error_code,size_t){});
179+
});
180+
}
181+
182+
//Cache or refresh given branch endpoint, and return the list
183+
const std::list<asio::ip::udp::endpoint>& MiniPlex::Branches(const asio::ip::udp::endpoint& ep, const std::string& sender_string)
184+
{
185+
if(ep != trunk)
130186
{
131-
if(!Args.Prune || cache_EPs.empty())
132-
{
133-
forward_to_branches(PermaBranches,"fixed branches");
134-
forward_to_branches(cache_EPs,"cached branches");
135-
}
136-
else
137-
forward_to_branches(std::list{*cache_EPs.begin()},"pruned branch");
187+
auto added = EndPointCache.Add(ep);
188+
spdlog::get("MiniPlex")->trace("RcvHandler(): {} cache entry for {}",added?"Inserted":"Refreshed",sender_string);
138189
}
139-
else
140-
forward_to_branches(std::list{trunk},"trunk");
190+
return EndPointCache.Keys();
141191
}
142192

143193
void MiniPlex::Benchmark()

src/MiniPlex.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020

2121
#include "TimeoutCache.h"
2222
#include <asio.hpp>
23-
#include <string>
2423
#include <atomic>
2524
#include <deque>
2625
#include <list>
26+
#include <functional>
2727

2828
struct CmdArgs;
2929

@@ -38,6 +38,19 @@ class MiniPlex
3838

3939
void Rcv();
4040
void RcvHandler(const asio::error_code err, const uint8_t* const buf, const asio::ip::udp::endpoint& rcv_sender, const size_t n);
41+
inline std::shared_ptr<uint8_t> MakeSharedBuf(const uint8_t* const buf, const size_t n);
42+
void Forward(
43+
const std::shared_ptr<uint8_t>& pBuf,
44+
const size_t size,
45+
const asio::ip::udp::endpoint& sender,
46+
const std::list<asio::ip::udp::endpoint>& branches,
47+
const char* desc);
48+
const std::list<asio::ip::udp::endpoint>& Branches(const asio::ip::udp::endpoint& ep, const std::string& sender_string);
49+
50+
void Hub(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n);
51+
void Trunk(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n);
52+
void Prune(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n);
53+
std::function<void(const std::list<asio::ip::udp::endpoint>&, const asio::ip::udp::endpoint&, const uint8_t* const, const size_t)> ModeHandler;
4154

4255
const CmdArgs& Args;
4356
asio::io_context& IOC;

0 commit comments

Comments
 (0)