3636import java .net .Inet6Address ;
3737import java .net .InetSocketAddress ;
3838import java .util .ArrayDeque ;
39+ import java .util .Collection ;
40+ import java .util .Iterator ;
3941import java .util .Queue ;
4042import java .util .concurrent .TimeUnit ;
4143import java .util .concurrent .atomic .AtomicInteger ;
44+ import java .util .function .BiConsumer ;
4245import java .util .function .Consumer ;
46+ import java .util .function .ObjIntConsumer ;
4347
4448import static org .cloudburstmc .netty .channel .raknet .RakConstants .*;
4549
@@ -185,6 +189,13 @@ private void initHeapWeights() {
185189
186190 @ Override
187191 public void write (ChannelHandlerContext ctx , Object msg , ChannelPromise promise ) {
192+ if (!this .channel .parent ().eventLoop ().inEventLoop ()) {
193+ // Make sure this runs on correct thread
194+ log .error ("Tried to write packet from wrong thread: {}" , Thread .currentThread ().getName (), new Throwable ());
195+ final Object finalMsg = msg ;
196+ this .channel .parent ().eventLoop ().execute (() -> this .write (ctx , finalMsg , promise ));
197+ return ;
198+ }
188199 if (msg instanceof ByteBuf ) {
189200 msg = new RakMessage ((ByteBuf ) msg );
190201 } else if (!(msg instanceof RakMessage )) {
@@ -543,7 +554,7 @@ private void onIncomingNack(ChannelHandlerContext ctx, RakDatagramPacket datagra
543554 }
544555
545556 this .slidingWindow .onNak (); // TODO: verify this
546- this .sendDatagram (ctx , datagram , curTime );
557+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
547558 }
548559
549560 private int sendStaleDatagrams (ChannelHandlerContext ctx , long curTime ) {
@@ -555,7 +566,10 @@ private int sendStaleDatagrams(ChannelHandlerContext ctx, long curTime) {
555566 int resendCount = 0 ;
556567 int transmissionBandwidth = this .slidingWindow .getRetransmissionBandwidth ();
557568
558- for (RakDatagramPacket datagram : this .sentDatagrams .values ()) {
569+ IntObjectMap <RakDatagramPacket > sent = new IntObjectHashMap <>();
570+ Iterator <RakDatagramPacket > iterator = this .sentDatagrams .values ().iterator ();
571+ while (iterator .hasNext ()) {
572+ RakDatagramPacket datagram = iterator .next ();
559573 if (datagram .getNextSend () <= curTime ) {
560574 int size = datagram .getSize ();
561575 if (transmissionBandwidth < size ) {
@@ -570,9 +584,13 @@ private int sendStaleDatagrams(ChannelHandlerContext ctx, long curTime) {
570584 log .trace ("Stale datagram {} from {}" , datagram .getSequenceIndex (), this .getRemoteAddress ());
571585 }
572586 resendCount ++;
573- this .sendDatagram (ctx , datagram , curTime );
587+ iterator .remove ();
588+ this .sendDatagram (ctx , datagram , curTime , sent );
574589 }
575590 }
591+ for (IntObjectMap .PrimitiveEntry <RakDatagramPacket > entry : sent .entries ()) {
592+ this .sentDatagrams .put (entry .key (), entry .value ());
593+ }
576594
577595 if (hasResent ) {
578596 this .slidingWindow .onResend (curTime );
@@ -602,7 +620,7 @@ private void sendDatagrams(ChannelHandlerContext ctx, long curTime, int mtuSize)
602620
603621 // Send full datagram
604622 if (!datagram .tryAddPacket (packet , mtuSize )) {
605- this .sendDatagram (ctx , datagram , curTime );
623+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
606624
607625 datagram = RakDatagramPacket .newInstance ();
608626 datagram .setSendTime (curTime );
@@ -613,7 +631,7 @@ private void sendDatagrams(ChannelHandlerContext ctx, long curTime, int mtuSize)
613631 }
614632
615633 if (!datagram .getPackets ().isEmpty ()) {
616- this .sendDatagram (ctx , datagram , curTime );
634+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
617635 }
618636 }
619637
@@ -625,19 +643,12 @@ private void sendImmediate(ChannelHandlerContext ctx, EncapsulatedPacket[] packe
625643 if (!datagram .tryAddPacket (packet , this .getMtu ())) {
626644 throw new IllegalArgumentException ("Packet too large to fit in MTU (size: " + packet .getSize () + ", MTU: " + this .getMtu () + ")" );
627645 }
628- this .sendDatagram (ctx , datagram , curTime );
646+ this .sendDatagram (ctx , datagram , curTime , this . sentDatagrams );
629647 }
630648 ctx .flush ();
631649 }
632650
633- private void sendDatagram (ChannelHandlerContext ctx , RakDatagramPacket datagram , long time ) {
634- if (!this .channel .parent ().eventLoop ().inEventLoop ()) {
635- // Make sure this runs on correct thread
636- log .error ("Tried to send datagrams from wrong thread: {}" , Thread .currentThread ().getName (), new Throwable ());
637- this .channel .parent ().eventLoop ().execute (() -> this .sendDatagram (ctx , datagram , time ));
638- return ;
639- }
640-
651+ private void sendDatagram (ChannelHandlerContext ctx , RakDatagramPacket datagram , long time , IntObjectMap <RakDatagramPacket > sent ) {
641652 if (datagram .getPackets ().isEmpty ()) {
642653 throw new IllegalArgumentException ("RakNetDatagram with no packets" );
643654 }
@@ -656,10 +667,8 @@ private void sendDatagram(ChannelHandlerContext ctx, RakDatagramPacket datagram,
656667 datagram .setNextSend (time + this .slidingWindow .getRtoForRetransmission ());
657668 if (oldIndex == -1 ) {
658669 this .slidingWindow .onReliableSend (datagram );
659- } else {
660- this .sentDatagrams .remove (oldIndex , datagram );
661670 }
662- this . sentDatagrams .put (datagram .getSequenceIndex (), datagram .retain ()); // Keep for resending
671+ sent .put (datagram .getSequenceIndex (), datagram .retain ()); // Keep for resending
663672 break ;
664673 }
665674 }
0 commit comments