@@ -872,7 +872,7 @@ class CPacketLagger : private IThinker
872872public:
873873 ~CPacketLagger () { Clear (); }
874874
875- void LagPacket ( bool bSend, CRawUDPSocketImpl *pSock, const netadr_t &adr, int msDelay, int nChunks, const iovec *pChunks )
875+ void LagPacket ( CRawUDPSocketImpl *pSock, const netadr_t &adr, int msDelay, int nChunks, const iovec *pChunks )
876876 {
877877 SteamNetworkingGlobalLock::AssertHeldByCurrentThread ( " LagPacket" );
878878
@@ -919,7 +919,6 @@ class CPacketLagger : private IThinker
919919 pkt = &m_list[ m_list.AddToHead () ];
920920 }
921921
922- pkt->m_bSend = bSend;
923922 pkt->m_pSockOwner = pSock;
924923 pkt->m_adrRemote = adr;
925924 pkt->m_usecTime = usecTime;
@@ -951,33 +950,18 @@ class CPacketLagger : private IThinker
951950
952951 // Make sure socket is still in good shape.
953952 CRawUDPSocketImpl *pSock = pkt.m_pSockOwner ;
954- if ( pSock-> m_socket == INVALID_SOCKET || !pSock-> m_callback . m_fnCallback )
953+ if ( pSock )
955954 {
956- AssertMsg ( false , " Lagged packet remains in queue after socket destroyed or queued for destruction!" );
957- }
958- else
959- {
960-
961- // Sending, or receiving?
962- if ( pkt.m_bSend )
955+ if ( pSock->m_socket == INVALID_SOCKET || !pSock->m_callback .m_fnCallback )
963956 {
964- iovec temp;
965- temp.iov_len = pkt.m_cbPkt ;
966- temp.iov_base = pkt.m_pkt ;
967- pSock->BReallySendRawPacket ( 1 , &temp, pkt.m_adrRemote );
957+ AssertMsg ( false , " Lagged packet remains in queue after socket destroyed or queued for destruction!" );
968958 }
969959 else
970960 {
971- // Copy data out of queue into local variables, just in case a
972- // packet is queued while we're in this function. We don't want
973- // our list to shift in memory, and the pointer we pass to the
974- // caller to dangle.
975- char temp[ k_cbSteamNetworkingSocketsMaxUDPMsgLen ];
976- memcpy ( temp, pkt.m_pkt , pkt.m_cbPkt );
977- pSock->m_callback ( RecvPktInfo_t{ temp, pkt.m_cbPkt , pkt.m_adrRemote , pSock } );
961+ ProcessPacket ( pkt );
978962 }
963+ m_list.RemoveFromHead ();
979964 }
980- m_list.RemoveFromHead ();
981965 }
982966
983967 Schedule ();
@@ -1001,14 +985,12 @@ class CPacketLagger : private IThinker
1001985 {
1002986 int idxNext = m_list.Next ( idx );
1003987 if ( m_list[idx].m_pSockOwner == pSock )
1004- m_list. Remove ( idx ) ;
988+ m_list[idx]. m_pSockOwner = nullptr ;
1005989 idx = idxNext;
1006990 }
1007-
1008- Schedule ();
1009991 }
1010992
1011- private :
993+ protected :
1012994
1013995 // / Set the next think time as appropriate
1014996 void Schedule ()
@@ -1021,7 +1003,6 @@ class CPacketLagger : private IThinker
10211003
10221004 struct LaggedPacket
10231005 {
1024- bool m_bSend; // true for outbound, false for inbound
10251006 CRawUDPSocketImpl *m_pSockOwner;
10261007 netadr_t m_adrRemote;
10271008 SteamNetworkingMicroseconds m_usecTime; // / Time when it should be sent or received
@@ -1030,9 +1011,39 @@ class CPacketLagger : private IThinker
10301011 };
10311012 CUtlLinkedList<LaggedPacket> m_list;
10321013
1014+ // / Do whatever we're supposed to do with the next packet
1015+ virtual void ProcessPacket ( const LaggedPacket &pkt ) = 0;
1016+ };
1017+
1018+ class CPacketLaggerSend final : public CPacketLagger
1019+ {
1020+ public:
1021+ virtual void ProcessPacket ( const LaggedPacket &pkt ) override
1022+ {
1023+ iovec temp;
1024+ temp.iov_len = pkt.m_cbPkt ;
1025+ temp.iov_base = (void *)pkt.m_pkt ;
1026+ pkt.m_pSockOwner ->BReallySendRawPacket ( 1 , &temp, pkt.m_adrRemote );
1027+ }
1028+ };
1029+
1030+ class CPacketLaggerRecv final : public CPacketLagger
1031+ {
1032+ public:
1033+ virtual void ProcessPacket ( const LaggedPacket &pkt ) override
1034+ {
1035+ // Copy data out of queue into local variables, just in case a
1036+ // packet is queued while we're in this function. We don't want
1037+ // our list to shift in memory, and the pointer we pass to the
1038+ // caller to dangle.
1039+ char temp[ k_cbSteamNetworkingSocketsMaxUDPMsgLen ];
1040+ memcpy ( temp, pkt.m_pkt , pkt.m_cbPkt );
1041+ pkt.m_pSockOwner ->m_callback ( RecvPktInfo_t{ temp, pkt.m_cbPkt , pkt.m_adrRemote , pkt.m_pSockOwner } );
1042+ }
10331043};
10341044
1035- static CPacketLagger s_packetLagQueue;
1045+ static CPacketLaggerSend s_packetLagQueueSend;
1046+ static CPacketLaggerRecv s_packetLagQueueRecv;
10361047
10371048// / Object used to wake our background thread efficiently
10381049#if defined( _WIN32 )
@@ -1115,23 +1126,22 @@ bool CRawUDPSocketImpl::BSendRawPacketGather( int nChunks, const iovec *pChunks,
11151126 {
11161127 int32 nDupLag = nPacketFakeLagTotal + WeakRandomInt ( 0 , g_Config_FakePacketDup_TimeMax.Get () );
11171128 nDupLag = std::max ( 1 , nDupLag );
1118- s_packetLagQueue .LagPacket ( true , const_cast <CRawUDPSocketImpl *>( this ), adrTo, nDupLag, nChunks, pChunks );
1129+ s_packetLagQueueSend .LagPacket ( const_cast <CRawUDPSocketImpl *>( this ), adrTo, nDupLag, nChunks, pChunks );
11191130 }
11201131
11211132 // Lag the original packet?
11221133 if ( nPacketFakeLagTotal > 0 )
11231134 {
1124- s_packetLagQueue .LagPacket ( true , const_cast <CRawUDPSocketImpl *>( this ), adrTo, nPacketFakeLagTotal, nChunks, pChunks );
1135+ s_packetLagQueueSend .LagPacket ( const_cast <CRawUDPSocketImpl *>( this ), adrTo, nPacketFakeLagTotal, nChunks, pChunks );
11251136 return true ;
11261137 }
11271138
11281139 // Now really send it
11291140 return BReallySendRawPacket ( nChunks, pChunks, adrTo );
11301141}
11311142
1132- void CRawUDPSocketImpl::Close ()
1143+ void CRawUDPSocketImpl::InternalAddToCleanupQueue ()
11331144{
1134- SteamNetworkingGlobalLock::AssertHeldByCurrentThread ( " IRawUDPSocket::Close" );
11351145
11361146 // / Clear the callback, to ensure that no further callbacks will be executed.
11371147 // / This marks the socket as pending destruction.
@@ -1144,7 +1154,16 @@ void CRawUDPSocketImpl::Close()
11441154 s_vecRawSocketsPendingDeletion.AddToTail ( this );
11451155
11461156 // Clean up lagged packets, if any
1147- s_packetLagQueue.AboutToDestroySocket ( this );
1157+ s_packetLagQueueSend.AboutToDestroySocket ( this );
1158+ s_packetLagQueueRecv.AboutToDestroySocket ( this );
1159+ }
1160+
1161+ void CRawUDPSocketImpl::Close ()
1162+ {
1163+ SteamNetworkingGlobalLock::AssertHeldByCurrentThread ( " IRawUDPSocket::Close" );
1164+
1165+ // Mark the callback as detached, and put us in the queue for cleanup when it's safe.
1166+ InternalAddToCleanupQueue ();
11481167
11491168 // Make sure we don't delay doing this too long
11501169 if ( s_bManualPollMode || ( s_pThreadSteamDatagram && s_pThreadSteamDatagram->get_id () != std::this_thread::get_id () ) )
@@ -1697,7 +1716,7 @@ static bool PollRawUDPSockets( int nMaxTimeoutMS, bool bManualPoll )
16971716 iovec temp;
16981717 temp.iov_len = ret;
16991718 temp.iov_base = buf;
1700- s_packetLagQueue .LagPacket ( false , pSock, info.m_adrFrom , nDupLag, 1 , &temp );
1719+ s_packetLagQueueRecv .LagPacket ( pSock, info.m_adrFrom , nDupLag, 1 , &temp );
17011720 }
17021721
17031722 // Check for simulating lag
@@ -1706,7 +1725,7 @@ static bool PollRawUDPSockets( int nMaxTimeoutMS, bool bManualPoll )
17061725 iovec temp;
17071726 temp.iov_len = ret;
17081727 temp.iov_base = buf;
1709- s_packetLagQueue .LagPacket ( false , pSock, info.m_adrFrom , nPacketFakeLagTotal, 1 , &temp );
1728+ s_packetLagQueueRecv .LagPacket ( pSock, info.m_adrFrom , nPacketFakeLagTotal, 1 , &temp );
17101729 }
17111730 else
17121731 {
@@ -2473,6 +2492,10 @@ void SteamNetworkingSocketsLowLevelDecRef()
24732492 Assert ( s_vecRawSocketsPendingDeletion.IsEmpty () );
24742493 s_vecRawSocketsPendingDeletion.Purge ();
24752494
2495+ // Nuke packet lagger queues and make sure we are not registered to think
2496+ s_packetLagQueueRecv.Clear ();
2497+ s_packetLagQueueSend.Clear ();
2498+
24762499 // Shutdown event tracing
24772500 ETW_Kill ();
24782501
0 commit comments