Skip to content

Commit 027764a

Browse files
committed
Fix permanent branch bug
perm branches were getting duplicate messages in Hub and Trunk mode Prune mode wasn't affected, that's why the original test didn't pick it up.
1 parent 05f359a commit 027764a

File tree

2 files changed

+22
-15
lines changed

2 files changed

+22
-15
lines changed

src/MiniPlex.cpp

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ MiniPlex::MiniPlex(const CmdArgs& Args, asio::io_context& IOC):
2929
socket(IOC,local_ep),
3030
socket_strand(IOC),
3131
process_strand(IOC),
32-
EndPointCache(process_strand,Args.CacheTimeout.getValue(),[](const asio::ip::udp::endpoint& ep)
32+
ActiveBranches(process_strand,Args.CacheTimeout.getValue(),[this](const asio::ip::udp::endpoint& ep)
3333
{
34+
if(PermaBranches.contains(ep))
35+
InactivePermaBranches.insert(ep);
3436
auto ep_string = ep.address().to_string()+":"+std::to_string(ep.port());
3537
spdlog::get("MiniPlex")->debug("Cache entry for {} timed out.",ep_string);
3638
})
@@ -65,7 +67,8 @@ MiniPlex::MiniPlex(const CmdArgs& Args, asio::io_context& IOC):
6567
for(size_t i=0; i<Args.BranchAddrs.getValue().size(); i++)
6668
{
6769
auto branch = asio::ip::udp::endpoint(asio::ip::address::from_string(Args.BranchAddrs.getValue()[i]),Args.BranchPorts.getValue()[i]);
68-
PermaBranches.emplace_back(branch);
70+
PermaBranches.insert(branch);
71+
InactivePermaBranches.insert(branch);
6972
}
7073

7174
socket_strand.post([this](){Rcv();});
@@ -117,17 +120,17 @@ void MiniPlex::RcvHandler(const asio::error_code err, const uint8_t* const buf,
117120
void MiniPlex::Hub(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n)
118121
{
119122
auto pForwardBuf = MakeSharedBuf(buf,n);
120-
Forward(pForwardBuf,n,rcv_sender,PermaBranches,"fixed branches");
121-
Forward(pForwardBuf,n,rcv_sender,branches,"cached branches");
123+
Forward(pForwardBuf,n,rcv_sender,branches,"active branches");
124+
Forward(pForwardBuf,n,rcv_sender,InactivePermaBranches,"inactive fixed branches");
122125
}
123126

124127
void MiniPlex::Trunk(const std::list<asio::ip::udp::endpoint>& branches, const asio::ip::udp::endpoint& rcv_sender, const uint8_t* const buf, const size_t n)
125128
{
126129
auto pForwardBuf = MakeSharedBuf(buf,n);
127130
if(rcv_sender == trunk)
128131
{
129-
Forward(pForwardBuf,n,rcv_sender,PermaBranches,"fixed branches");
130-
Forward(pForwardBuf,n,rcv_sender,branches,"cached branches");
132+
Forward(pForwardBuf,n,rcv_sender,branches,"active branches");
133+
Forward(pForwardBuf,n,rcv_sender,InactivePermaBranches,"inactive fixed branches");
131134
}
132135
else
133136
Forward(pForwardBuf,n,rcv_sender,std::list{trunk},"trunk");
@@ -150,7 +153,7 @@ void MiniPlex::Prune(const std::list<asio::ip::udp::endpoint>& branches, const a
150153
Forward(pForwardBuf,n,rcv_sender,std::list{*branches.begin()},"active prune branch");
151154
}
152155
else
153-
Forward(pForwardBuf,n,rcv_sender,std::list{trunk},"pruned branch");
156+
Forward(pForwardBuf,n,rcv_sender,std::list{trunk},"trunk");
154157
}
155158

156159
inline std::shared_ptr<uint8_t> MiniPlex::MakeSharedBuf(const uint8_t* const buf, const size_t n)
@@ -163,11 +166,11 @@ inline std::shared_ptr<uint8_t> MiniPlex::MakeSharedBuf(const uint8_t* const buf
163166
return pForwardBuf;
164167
}
165168

166-
void MiniPlex::Forward(
169+
template<typename T> void MiniPlex::Forward(
167170
const std::shared_ptr<uint8_t>& pBuf,
168171
const size_t size,
169172
const asio::ip::udp::endpoint& sender,
170-
const std::list<asio::ip::udp::endpoint>& branches,
173+
const T& branches,
171174
const char* desc)
172175
{
173176
spdlog::get("MiniPlex")->trace("Forward(): sending to {} {}",branches.size(),desc);
@@ -184,10 +187,12 @@ const std::list<asio::ip::udp::endpoint>& MiniPlex::Branches(const asio::ip::udp
184187
{
185188
if(ep != trunk)
186189
{
187-
auto added = EndPointCache.Add(ep);
190+
auto added = ActiveBranches.Add(ep);
188191
spdlog::get("MiniPlex")->trace("RcvHandler(): {} cache entry for {}",added?"Inserted":"Refreshed",sender_string);
192+
if(added)
193+
InactivePermaBranches.erase(ep);
189194
}
190-
return EndPointCache.Keys();
195+
return ActiveBranches.Keys();
191196
}
192197

193198
void MiniPlex::Benchmark()

src/MiniPlex.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <atomic>
2424
#include <deque>
2525
#include <list>
26+
#include <set>
2627
#include <functional>
2728

2829
struct CmdArgs;
@@ -39,11 +40,11 @@ class MiniPlex
3940
void Rcv();
4041
void RcvHandler(const asio::error_code err, const uint8_t* const buf, const asio::ip::udp::endpoint& rcv_sender, const size_t n);
4142
inline std::shared_ptr<uint8_t> MakeSharedBuf(const uint8_t* const buf, const size_t n);
42-
void Forward(
43+
template<typename T> void Forward(
4344
const std::shared_ptr<uint8_t>& pBuf,
4445
const size_t size,
4546
const asio::ip::udp::endpoint& sender,
46-
const std::list<asio::ip::udp::endpoint>& branches,
47+
const T& branches,
4748
const char* desc);
4849
const std::list<asio::ip::udp::endpoint>& Branches(const asio::ip::udp::endpoint& ep, const std::string& sender_string);
4950

@@ -58,8 +59,9 @@ class MiniPlex
5859
asio::ip::udp::socket socket;
5960
asio::io_context::strand socket_strand;
6061
asio::io_context::strand process_strand;
61-
TimeoutCache<asio::ip::udp::endpoint> EndPointCache;
62-
std::list<asio::ip::udp::endpoint> PermaBranches;
62+
std::set<asio::ip::udp::endpoint> PermaBranches;
63+
TimeoutCache<asio::ip::udp::endpoint> ActiveBranches;
64+
std::set<asio::ip::udp::endpoint> InactivePermaBranches;
6365
asio::ip::udp::endpoint trunk;
6466

6567
std::deque<std::shared_ptr<rbuf_t>> rcv_buf_q;

0 commit comments

Comments
 (0)