Skip to content

Commit afa664d

Browse files
authored
Update tx.cpp
Support IPv6
1 parent 4ffa530 commit afa664d

File tree

1 file changed

+44
-60
lines changed

1 file changed

+44
-60
lines changed

tx.cpp

Lines changed: 44 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ TransmissionBlock::~TransmissionBlock()
2929
}
3030

3131

32-
const u16 TransmissionBlock::AckIndex()
32+
const uint16_t TransmissionBlock::AckIndex()
3333
{
3434
return m_BlockSequenceNumber%(Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION*2);
3535
}
3636

37-
bool TransmissionBlock::Send(u08* buffer, u16 buffersize)
37+
bool TransmissionBlock::Send(uint8_t* buffer, uint16_t buffersize)
3838
{
3939
if(p_Session->m_IsConnected == false)
4040
{
@@ -47,7 +47,7 @@ bool TransmissionBlock::Send(u08* buffer, u16 buffersize)
4747
#if ENABLE_CRITICAL_EXCEPTIONS
4848
TEST_EXCEPTION(std::bad_alloc());
4949
#endif
50-
m_OriginalPacketBuffer.emplace_back(std::unique_ptr<u08[]>(new u08[sizeof(Header::Data) + (m_BlockSize - 1) + buffersize]));
50+
m_OriginalPacketBuffer.emplace_back(std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(Header::Data) + (m_BlockSize - 1) + buffersize]));
5151
}
5252
catch(const std::bad_alloc& ex)
5353
{
@@ -77,7 +77,7 @@ bool TransmissionBlock::Send(u08* buffer, u16 buffersize)
7777
m_LargestOriginalPacketSize = buffersize;
7878
}
7979
DataHeader->m_LastIndicator = 1;/* This field is reserved to support fragmentation */
80-
for(u08 i = 0 ; i < m_BlockSize ; i++)
80+
for(uint8_t i = 0 ; i < m_BlockSize ; i++)
8181
{
8282
if(i != m_TransmissionCount)
8383
{
@@ -89,11 +89,7 @@ bool TransmissionBlock::Send(u08* buffer, u16 buffersize)
8989
}
9090
}
9191
memcpy(DataHeader->m_Codes + m_BlockSize, buffer, buffersize);
92-
sockaddr_in RemoteAddress = {0};
93-
RemoteAddress.sin_family = AF_INET;
94-
RemoteAddress.sin_addr.s_addr = p_Session->c_IPv4;
95-
RemoteAddress.sin_port = p_Session->c_Port;
96-
sendto(p_Session->c_Socket, m_OriginalPacketBuffer[m_TransmissionCount++].get(), ntohs(DataHeader->m_TotalSize), 0, (sockaddr*)&RemoteAddress, sizeof(RemoteAddress));
92+
sendto(p_Session->c_Socket, m_OriginalPacketBuffer[m_TransmissionCount++].get(), ntohs(DataHeader->m_TotalSize), 0, (sockaddr*)&p_Session->c_Addr.Addr, p_Session->c_Addr.AddrLength);
9793

9894
if((m_TransmissionCount == m_BlockSize)/* || reqack == true*/)
9995
{
@@ -110,7 +106,7 @@ bool TransmissionBlock::Send(u08* buffer, u16 buffersize)
110106

111107
void TransmissionBlock::Retransmission()
112108
{
113-
const u08 c_AckIndex = AckIndex();
109+
const uint8_t c_AckIndex = AckIndex();
114110
if(m_TransmissionMode == Parameter::BEST_EFFORT_TRANSMISSION_MODE)
115111
{
116112
if(m_TransmissionCount >= m_OriginalPacketBuffer.size() + m_RetransmissionRedundancy)
@@ -120,7 +116,7 @@ void TransmissionBlock::Retransmission()
120116
}
121117
if(p_Session->m_AckList[c_AckIndex] == true)
122118
{
123-
for(u16 i = p_Session->m_MinBlockSequenceNumber ; i != p_Session->m_MaxBlockSequenceNumber ; i++)
119+
for(uint16_t i = p_Session->m_MinBlockSequenceNumber ; i != p_Session->m_MaxBlockSequenceNumber ; i++)
124120
{
125121
if(p_Session->m_AckList[i%(Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION*2)] == true)
126122
{
@@ -139,13 +135,8 @@ void TransmissionBlock::Retransmission()
139135
delete this;
140136
return;
141137
}
142-
sockaddr_in RemoteAddress = {0};
143-
RemoteAddress.sin_family = AF_INET;
144-
RemoteAddress.sin_addr.s_addr = p_Session->c_IPv4;
145-
RemoteAddress.sin_port = p_Session->c_Port;
146-
147138
{
148-
std::vector<u08> RandomCoefficients;
139+
std::vector<uint8_t> RandomCoefficients;
149140
Header::Data* RemedyHeader = reinterpret_cast<Header::Data*>(m_RemedyPacketBuffer);
150141
if(m_OriginalPacketBuffer.size() == 1)
151142
{
@@ -168,7 +159,7 @@ void TransmissionBlock::Retransmission()
168159
}
169160
else
170161
{
171-
for(u08 Coef = 0 ; Coef < m_OriginalPacketBuffer.size() ; Coef++)
162+
for(uint8_t Coef = 0 ; Coef < m_OriginalPacketBuffer.size() ; Coef++)
172163
{
173164
do
174165
{
@@ -198,18 +189,18 @@ void TransmissionBlock::Retransmission()
198189
RemedyHeader->m_MaximumRank = m_BlockSize;
199190
RemedyHeader->m_Flags = Header::Data::FLAGS_END_OF_BLK;
200191
RemedyHeader->m_TxCount = ++m_TransmissionCount;
201-
for(u08 PacketIndex = 0 ; PacketIndex < m_OriginalPacketBuffer.size() ; PacketIndex++)
192+
for(uint8_t PacketIndex = 0 ; PacketIndex < m_OriginalPacketBuffer.size() ; PacketIndex++)
202193
{
203-
u08* OriginalBuffer = reinterpret_cast<u08*>(m_OriginalPacketBuffer[PacketIndex].get());
194+
uint8_t* OriginalBuffer = reinterpret_cast<uint8_t*>(m_OriginalPacketBuffer[PacketIndex].get());
204195
Header::Data* OriginalHeader = reinterpret_cast<Header::Data*>(OriginalBuffer);
205-
const u16 length = ntohs(OriginalHeader->m_TotalSize);
196+
const uint16_t length = ntohs(OriginalHeader->m_TotalSize);
206197
#if 0
207-
for(u16 CodingOffset = Header::Data::OffSets::CodingOffset ; CodingOffset < length ; CodingOffset++)
198+
for(uint16_t CodingOffset = Header::Data::OffSets::CodingOffset ; CodingOffset < length ; CodingOffset++)
208199
{
209200
m_RemedyPacketBuffer[CodingOffset] ^= FiniteField::instance()->mul(OriginalBuffer[CodingOffset], RandomCoefficients[PacketIndex]);
210201
}
211202
#else
212-
u16 CodingOffset = Header::Data::OffSets::CodingOffset;
203+
uint16_t CodingOffset = Header::Data::OffSets::CodingOffset;
213204
while(CodingOffset < length)
214205
{
215206
/*if(length - CodingOffset > 1024)
@@ -262,7 +253,7 @@ void TransmissionBlock::Retransmission()
262253
}
263254
#endif
264255
}
265-
sendto(p_Session->c_Socket, m_RemedyPacketBuffer, ntohs(RemedyHeader->m_TotalSize), 0, (sockaddr*)&RemoteAddress, sizeof(RemoteAddress));
256+
sendto(p_Session->c_Socket, m_RemedyPacketBuffer, ntohs(RemedyHeader->m_TotalSize), 0, (sockaddr*)&p_Session->c_Addr.Addr, p_Session->c_Addr.AddrLength);
266257
}
267258
while(p_Session->m_IsConnected && p_Session->m_Timer.ScheduleTask(p_Session->m_RetransmissionInterval, [this](){
268259
const auto Priority = (p_Session->m_MinBlockSequenceNumber == m_BlockSequenceNumber?TransmissionSession::MIDDLE_PRIORITY:TransmissionSession::LOW_PRIORITY);
@@ -275,7 +266,7 @@ void TransmissionBlock::Retransmission()
275266
////////////////////////////////////////////////////////////
276267
/////////////// TransmissionSession
277268
/*OK*/
278-
TransmissionSession::TransmissionSession(Transmission* const transmission, s32 Socket, u32 IPv4, u16 Port, Parameter::TRANSMISSION_MODE TransmissionMode, Parameter::BLOCK_SIZE BlockSize, u16 RetransmissionRedundancy): c_Transmission(transmission), c_Socket(Socket),c_IPv4(IPv4), c_Port(Port)
269+
TransmissionSession::TransmissionSession(Transmission* const transmission, int32_t Socket, const DataStructures::AddressType Addr, Parameter::TRANSMISSION_MODE TransmissionMode, Parameter::BLOCK_SIZE BlockSize, uint16_t RetransmissionRedundancy): c_Transmission(transmission), c_Socket(Socket), c_Addr(Addr)
279270
{
280271
m_TransmissionMode = TransmissionMode;
281272
m_BlockSize = BlockSize;
@@ -284,11 +275,12 @@ TransmissionSession::TransmissionSession(Transmission* const transmission, s32 S
284275
m_MinBlockSequenceNumber = 0;
285276
m_MaxBlockSequenceNumber = 0;
286277
p_TransmissionBlock = nullptr;
287-
for(u32 i = 0 ; i < (Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION*2) ; i++)
278+
for(uint32_t i = 0 ; i < (Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION*2) ; i++)
288279
{
289280
m_AckList[i] = true;
290281
}
291282
m_ConcurrentRetransmissions = 0;
283+
m_IsConnected = false;
292284
}
293285

294286

@@ -318,7 +310,7 @@ void TransmissionSession::ChangeBlockSize(const Parameter::BLOCK_SIZE BlockSize)
318310
}
319311

320312
/*OK*/
321-
void TransmissionSession::ChangeRetransmissionRedundancy(const u16 RetransmissionRedundancy)
313+
void TransmissionSession::ChangeRetransmissionRedundancy(const uint16_t RetransmissionRedundancy)
322314
{
323315
m_TaskQueue.Enqueue([this, RetransmissionRedundancy]()
324316
{
@@ -327,7 +319,7 @@ void TransmissionSession::ChangeRetransmissionRedundancy(const u16 Retransmissio
327319
}
328320

329321
/*OK*/
330-
void TransmissionSession::ChangeSessionParameter(const Parameter::TRANSMISSION_MODE TransmissionMode, const Parameter::BLOCK_SIZE BlockSize, const u16 RetransmissionRedundancy)
322+
void TransmissionSession::ChangeSessionParameter(const Parameter::TRANSMISSION_MODE TransmissionMode, const Parameter::BLOCK_SIZE BlockSize, const uint16_t RetransmissionRedundancy)
331323
{
332324
m_TaskQueue.Enqueue([this, TransmissionMode, BlockSize, RetransmissionRedundancy]()
333325
{
@@ -351,25 +343,21 @@ void TransmissionSession::SendPing()
351343
TransmissionSession const * self = this;
352344
std::cout<<"Pong Timeout...Client is disconnected.["<<TimeSinceLastPongTime.count()<<"]"<<std::endl;
353345
std::thread DisconnectThread = std::thread([self](){
354-
self->c_Transmission->Disconnect(self->c_IPv4, self->c_Port);
346+
//self->c_Transmission->Disconnect(self->c_IPv4, self->c_Port);
355347
});
356348
DisconnectThread.detach();
357349
return;
358350
}
359351

360-
sockaddr_in RemoteAddress = {0};
361-
RemoteAddress.sin_family = AF_INET;
362-
RemoteAddress.sin_addr.s_addr = c_IPv4;
363-
RemoteAddress.sin_port = c_Port;
364-
sendto(c_Socket, reinterpret_cast<u08*>(&ping), sizeof(Header::Ping), 0, (sockaddr*)&RemoteAddress, sizeof(RemoteAddress));
352+
sendto(c_Socket, reinterpret_cast<uint8_t*>(&ping), sizeof(Header::Ping), 0, (sockaddr*)&c_Addr.Addr, c_Addr.AddrLength);
365353
while(m_IsConnected && m_Timer.ScheduleTask(Parameter::PING_INTERVAL, [this](){
366354
while(m_IsConnected && m_TaskQueue.Enqueue([this](){
367355
SendPing();
368356
}, TransmissionSession::HIGH_PRIORITY)==false);
369357
})==false);
370358
}
371359

372-
void TransmissionSession::UpdateRetransmissionInterval(const u16 rtt)
360+
void TransmissionSession::UpdateRetransmissionInterval(const uint16_t rtt)
373361
{
374362
m_TaskQueue.Enqueue([this, rtt]()
375363
{
@@ -380,7 +368,7 @@ void TransmissionSession::UpdateRetransmissionInterval(const u16 rtt)
380368
////////////////////////////////////////////////////////////
381369
/////////////// Transmission
382370
/* OK */
383-
Transmission::Transmission(s32 Socket) : c_Socket(Socket){}
371+
Transmission::Transmission(int32_t Socket) : c_Socket(Socket){}
384372

385373
/* OK */
386374
Transmission::~Transmission()
@@ -390,12 +378,12 @@ Transmission::~Transmission()
390378
}
391379

392380
/* OK */
393-
bool Transmission::Connect(u32 IPv4, u16 Port, u32 ConnectionTimeout, Parameter::TRANSMISSION_MODE TransmissionMode, Parameter::BLOCK_SIZE BlockSize, u16 RetransmissionRedundancy)
381+
bool Transmission::Connect(const DataStructures::AddressType Addr, uint32_t ConnectionTimeout, Parameter::TRANSMISSION_MODE TransmissionMode, Parameter::BLOCK_SIZE BlockSize, uint16_t RetransmissionRedundancy)
394382
{
395383
TransmissionSession* newsession = nullptr;
396384
{
397385
std::unique_lock< std::mutex > lock(m_Lock);
398-
const DataStructures::IPv4PortKey key = {IPv4, Port};
386+
const DataStructures::SessionKey key = DataStructures::GetSessionKey((sockaddr*)&Addr.Addr, Addr.AddrLength);
399387
TransmissionSession** const session = m_Sessions.GetPtr(key);
400388
if(session != nullptr)
401389
{
@@ -406,7 +394,7 @@ bool Transmission::Connect(u32 IPv4, u16 Port, u32 ConnectionTimeout, Parameter:
406394
try
407395
{
408396
TEST_EXCEPTION(std::bad_alloc());
409-
newsession = new TransmissionSession(this, c_Socket, IPv4, Port, TransmissionMode, BlockSize, RetransmissionRedundancy);
397+
newsession = new TransmissionSession(this, c_Socket, Addr, TransmissionMode, BlockSize, RetransmissionRedundancy);
410398
}
411399
catch(const std::bad_alloc& ex)
412400
{
@@ -429,11 +417,7 @@ bool Transmission::Connect(u32 IPv4, u16 Port, u32 ConnectionTimeout, Parameter:
429417
SyncPacket.m_Type = Header::Common::HeaderType::SYNC;
430418
SyncPacket.m_Sequence = htons(newsession->m_MaxBlockSequenceNumber);
431419

432-
sockaddr_in RemoteAddress = {0};
433-
RemoteAddress.sin_family = AF_INET;
434-
RemoteAddress.sin_addr.s_addr = newsession->c_IPv4;
435-
RemoteAddress.sin_port = newsession->c_Port;
436-
if(sendto(c_Socket, (u08*)&SyncPacket, sizeof(SyncPacket), 0, (sockaddr*)&RemoteAddress, sizeof(RemoteAddress)) != sizeof(SyncPacket))
420+
if(sendto(c_Socket, (uint8_t*)&SyncPacket, sizeof(SyncPacket), 0, (sockaddr*)(&newsession->c_Addr.Addr), newsession->c_Addr.AddrLength) != sizeof(SyncPacket))
437421
{
438422
return false;
439423
}
@@ -456,12 +440,12 @@ bool Transmission::Connect(u32 IPv4, u16 Port, u32 ConnectionTimeout, Parameter:
456440
}
457441

458442
/* OK */
459-
bool Transmission::Send(u32 IPv4, u16 Port, u08* buffer, u16 buffersize)
443+
bool Transmission::Send(const DataStructures::AddressType Addr, uint8_t* buffer, uint16_t buffersize)
460444
{
461445
TransmissionSession* p_session = nullptr;
462446
{
463447
std::unique_lock< std::mutex > lock(m_Lock);
464-
const DataStructures::IPv4PortKey key = {IPv4, Port};
448+
const DataStructures::SessionKey key = DataStructures::GetSessionKey((sockaddr*)&Addr.Addr, Addr.AddrLength);
465449
TransmissionSession** const pp_session = m_Sessions.GetPtr(key);
466450
if(pp_session)
467451
{
@@ -480,7 +464,7 @@ bool Transmission::Send(u32 IPv4, u16 Port, u08* buffer, u16 buffersize)
480464
std::atomic<bool> TransmissionIsCompleted(false);
481465
std::atomic<bool> TransmissionResult(false);
482466
while(p_session->m_ConcurrentRetransmissions >= Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION ||
483-
(u16)(p_session->m_MaxBlockSequenceNumber - p_session->m_MinBlockSequenceNumber) >= (u16)Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION)
467+
(uint16_t)(p_session->m_MaxBlockSequenceNumber - p_session->m_MinBlockSequenceNumber) >= (uint16_t)Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION)
484468
{
485469
if(!p_session->m_IsConnected)
486470
{
@@ -531,12 +515,12 @@ bool Transmission::Send(u32 IPv4, u16 Port, u08* buffer, u16 buffersize)
531515
return TransmissionResult;
532516
}
533517

534-
bool Transmission::Flush(u32 IPv4, u16 Port)
518+
bool Transmission::Flush(const DataStructures::AddressType Addr)
535519
{
536520
TransmissionSession* p_session = nullptr;
537521
{
538522
std::unique_lock< std::mutex > lock(m_Lock);
539-
const DataStructures::IPv4PortKey key = {IPv4, Port};
523+
const DataStructures::SessionKey key = DataStructures::GetSessionKey((sockaddr*)&Addr.Addr, Addr.AddrLength);
540524
TransmissionSession** const pp_session = m_Sessions.GetPtr(key);
541525
if(pp_session)
542526
{
@@ -561,12 +545,12 @@ bool Transmission::Flush(u32 IPv4, u16 Port)
561545
}, TransmissionSession::LOW_PRIORITY);
562546
}
563547

564-
void Transmission::WaitUntilTxIsCompleted(u32 IPv4, u16 Port)
548+
void Transmission::WaitUntilTxIsCompleted(const DataStructures::AddressType Addr)
565549
{
566550
TransmissionSession* p_session = nullptr;
567551
{
568552
std::unique_lock< std::mutex > lock(m_Lock);
569-
const DataStructures::IPv4PortKey key = {IPv4, Port};
553+
const DataStructures::SessionKey key = DataStructures::GetSessionKey((sockaddr*)&Addr.Addr, Addr.AddrLength);
570554
TransmissionSession** const pp_session = m_Sessions.GetPtr(key);
571555
if(pp_session)
572556
{
@@ -587,9 +571,9 @@ void Transmission::WaitUntilTxIsCompleted(u32 IPv4, u16 Port)
587571
}
588572
}
589573

590-
bool Transmission::Disconnect(u32 IPv4, u16 Port)
574+
bool Transmission::Disconnect(const DataStructures::AddressType Addr)
591575
{
592-
const DataStructures::IPv4PortKey key = {IPv4, Port};
576+
const DataStructures::SessionKey key = DataStructures::GetSessionKey((sockaddr*)&Addr.Addr, Addr.AddrLength);
593577
TransmissionSession** pp_session = nullptr;
594578
{
595579
std::unique_lock< std::mutex > lock(m_Lock);
@@ -617,22 +601,22 @@ bool Transmission::Disconnect(u32 IPv4, u16 Port)
617601
}
618602

619603
/* OK */
620-
void Transmission::RxHandler(u08* buffer, u16 size, const sockaddr_in * const sender_addr, const u32 sender_addr_len)
604+
void Transmission::RxHandler(uint8_t* buffer, uint16_t size, const sockaddr* const sender_addr, const uint32_t sender_addr_len)
621605
{
622606
Header::Common* CommonHeader = reinterpret_cast< Header::Common* >(buffer);
623607
switch(CommonHeader->m_Type)
624608
{
625609
case Header::Common::HeaderType::DATA_ACK:
626610
{
627611
const Header::DataAck* Ack = reinterpret_cast< Header::DataAck* >(buffer);
628-
const DataStructures::IPv4PortKey key = {sender_addr->sin_addr.s_addr, sender_addr->sin_port};
612+
const DataStructures::SessionKey key = DataStructures::GetSessionKey(sender_addr, sender_addr_len);
629613
std::unique_lock< std::mutex > lock(m_Lock);
630614
TransmissionSession** const pp_session = m_Sessions.GetPtr(key);
631615
if(pp_session)
632616
{
633617
TransmissionSession* const SessionAddress = (*pp_session);
634-
u16 Sequence = ntohs(Ack->m_Sequence);
635-
u08 Loss = Ack->m_Losses;
618+
uint16_t Sequence = ntohs(Ack->m_Sequence);
619+
uint8_t Loss = Ack->m_Losses;
636620
(*pp_session)->m_TaskQueue.Enqueue([SessionAddress,Sequence,Loss](){
637621
if(SessionAddress->m_AckList[Sequence%(Parameter::MAXIMUM_NUMBER_OF_CONCURRENT_RETRANSMISSION*2)] == true)
638622
{
@@ -649,7 +633,7 @@ void Transmission::RxHandler(u08* buffer, u16 size, const sockaddr_in * const se
649633
case Header::Common::HeaderType::SYNC_ACK:
650634
{
651635
const Header::Sync* sync = reinterpret_cast< Header::Sync* >(buffer);
652-
const DataStructures::IPv4PortKey key = {sender_addr->sin_addr.s_addr, sender_addr->sin_port};
636+
const DataStructures::SessionKey key = DataStructures::GetSessionKey(sender_addr, sender_addr_len);
653637
std::unique_lock< std::mutex > lock(m_Lock);
654638
TransmissionSession** const pp_session = m_Sessions.GetPtr(key);
655639
if(pp_session)
@@ -665,7 +649,7 @@ void Transmission::RxHandler(u08* buffer, u16 size, const sockaddr_in * const se
665649
case Header::Common::PONG:
666650
{
667651
const Header::Pong* pong = reinterpret_cast< Header::Pong* >(buffer);
668-
const DataStructures::IPv4PortKey key = {sender_addr->sin_addr.s_addr, sender_addr->sin_port};
652+
const DataStructures::SessionKey key = DataStructures::GetSessionKey(sender_addr, sender_addr_len);
669653
std::unique_lock< std::mutex > lock(m_Lock);
670654
TransmissionSession** const pp_session = m_Sessions.GetPtr(key);
671655
if(pp_session)
@@ -680,7 +664,7 @@ void Transmission::RxHandler(u08* buffer, u16 size, const sockaddr_in * const se
680664
}
681665
else
682666
{
683-
(*pp_session)->UpdateRetransmissionInterval((u16)(rtt.count()*1000.));
667+
(*pp_session)->UpdateRetransmissionInterval((uint16_t)(rtt.count()*1000.));
684668
}
685669
}
686670
}

0 commit comments

Comments
 (0)