Skip to content

Commit 82d4f51

Browse files
committed
Add queue length check
1 parent 1e26b20 commit 82d4f51

File tree

7 files changed

+72
-2
lines changed

7 files changed

+72
-2
lines changed

transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/RakDisconnectReason.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ public enum RakDisconnectReason {
2626
NO_FREE_INCOMING_CONNECTIONS,
2727
INCOMPATIBLE_PROTOCOL_VERSION,
2828
IP_RECENTLY_CONNECTED,
29-
BAD_PACKET
29+
BAD_PACKET,
30+
QUEUE_TOO_LONG
3031
}

transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/DefaultRakSessionConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DefaultRakSessionConfig extends DefaultChannelConfig implements Rak
3838
private volatile long sessionTimeout = SESSION_TIMEOUT_MS;
3939
private volatile boolean autoFlush = true;
4040
private volatile int flushInterval = 10;
41+
private volatile int maxQueuedBytes = 64 * 1024 * 1024; // 64 MB
4142

4243
public DefaultRakSessionConfig(Channel channel) {
4344
super(channel);
@@ -78,6 +79,9 @@ public <T> T getOption(ChannelOption<T> option) {
7879
if (option == RakChannelOption.RAK_FLUSH_INTERVAL) {
7980
return (T) Integer.valueOf(this.getFlushInterval());
8081
}
82+
if (option == RakChannelOption.RAK_MAX_QUEUED_BYTES) {
83+
return (T) Integer.valueOf(this.getMaxQueuedBytes());
84+
}
8185
return this.channel.parent().config().getOption(option);
8286
}
8387

@@ -102,6 +106,8 @@ public <T> boolean setOption(ChannelOption<T> option, T value) {
102106
this.setAutoFlush((Boolean) value);
103107
} else if (option == RakChannelOption.RAK_FLUSH_INTERVAL) {
104108
this.setFlushInterval((Integer) value);
109+
} else if (option == RakChannelOption.RAK_MAX_QUEUED_BYTES) {
110+
this.setMaxQueuedBytes((Integer) value);
105111
} else {
106112
return this.channel.parent().config().setOption(option, value);
107113
}
@@ -194,4 +200,14 @@ public int getFlushInterval() {
194200
public void setFlushInterval(int flushInterval) {
195201
this.flushInterval = flushInterval;
196202
}
203+
204+
@Override
205+
public void setMaxQueuedBytes(int maxQueuedBytes) {
206+
this.maxQueuedBytes = maxQueuedBytes;
207+
}
208+
209+
@Override
210+
public int getMaxQueuedBytes() {
211+
return maxQueuedBytes;
212+
}
197213
}

transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,8 @@ public interface RakChannelConfig extends ChannelConfig {
5151
int getFlushInterval();
5252

5353
void setFlushInterval(int intervalMillis);
54+
55+
void setMaxQueuedBytes(int maxQueuedBytes);
56+
57+
int getMaxQueuedBytes();
5458
}

transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelMetrics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ default void nackIn(int count) {
5555

5656
default void stateChange(RakState state) {
5757
}
58+
59+
default void queuedPacketBytes(int count) {
60+
}
5861
}

transport-raknet/src/main/java/org/cloudburstmc/netty/channel/raknet/config/RakChannelOption.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ public class RakChannelOption<T> extends ChannelOption<T> {
189189
public static final ChannelOption<Integer> RAK_TIME_BETWEEN_SEND_CONNECTION_ATTEMPTS_MS =
190190
valueOf(RakChannelOption.class, "RAK_TIME_BETWEEN_SEND_CONNECTION_ATTEMPTS_MS");
191191

192+
/**
193+
* Maximum amount of bytes that can be queued in a single RakNet session.
194+
*/
195+
public static final ChannelOption<Integer> RAK_MAX_QUEUED_BYTES =
196+
valueOf(RakChannelOption.class, "RAK_MAX_QUEUED_BYTES");
197+
192198
@SuppressWarnings("deprecation")
193199
protected RakChannelOption() {
194200
super(null);

transport-raknet/src/main/java/org/cloudburstmc/netty/handler/codec/raknet/common/RakSessionCodec.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.ArrayDeque;
3939
import java.util.Queue;
4040
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.function.Consumer;
4143

4244
import static org.cloudburstmc.netty.channel.raknet.RakConstants.*;
4345

@@ -410,6 +412,23 @@ private void tryTick() {
410412
private void onTick() {
411413
long curTime = System.currentTimeMillis();
412414

415+
int maxQueuedBytes = this.channel.config().getOption(RakChannelOption.RAK_MAX_QUEUED_BYTES);
416+
417+
if (maxQueuedBytes > 0) {
418+
int queuedBytes = 0;
419+
try {
420+
for (EncapsulatedPacket packet : this.outgoingPackets) {
421+
queuedBytes += packet.getBuffer().readableBytes();
422+
if (queuedBytes > maxQueuedBytes) {
423+
this.disconnect(RakDisconnectReason.QUEUE_TOO_LONG);
424+
return;
425+
}
426+
}
427+
} finally {
428+
getMetrics().queuedPacketBytes(queuedBytes);
429+
}
430+
}
431+
413432
if (this.state == RakState.UNCONNECTED) {
414433
if (this.isTimedOut(curTime)) {
415434
this.close(RakDisconnectReason.TIMED_OUT);

transport-raknet/src/main/java/org/cloudburstmc/netty/util/FastBinaryMinHeap.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import io.netty.util.internal.ObjectPool;
2121

2222
import java.util.Arrays;
23+
import java.util.Iterator;
2324
import java.util.NoSuchElementException;
2425
import java.util.Objects;
26+
import java.util.function.Consumer;
2527

26-
public class FastBinaryMinHeap<E> extends AbstractReferenceCounted {
28+
public class FastBinaryMinHeap<E> extends AbstractReferenceCounted implements Iterable<E> {
2729

2830
private static final Entry INFIMUM = new Entry(Long.MAX_VALUE);
2931
private static final Entry SUPREMUM = new Entry(Long.MIN_VALUE);
@@ -205,6 +207,11 @@ public FastBinaryMinHeap<E> touch(Object hint) {
205207
return this;
206208
}
207209

210+
@Override
211+
public Iterator<E> iterator() {
212+
return new SimpleIterator();
213+
}
214+
208215
private static class Entry extends AbstractReferenceCounted {
209216
private final ObjectPool.Handle<Entry> handle;
210217
private Object element;
@@ -233,4 +240,18 @@ public Entry touch(Object hint) {
233240
return this;
234241
}
235242
}
243+
244+
private class SimpleIterator implements Iterator<E> {
245+
private int index = 1;
246+
247+
@Override
248+
public boolean hasNext() {
249+
return heap.length > index && heap[index] != INFIMUM;
250+
}
251+
252+
@Override
253+
public E next() {
254+
return (E) heap[index++].element;
255+
}
256+
}
236257
}

0 commit comments

Comments
 (0)