Skip to content

Commit d9fef94

Browse files
authored
Merge pull request #782 from bergerkiller/master
Replace ThreadLocal scheduleProcessPackets with queue, fixes #763
2 parents 4baa4aa + ab5fb40 commit d9fef94

File tree

2 files changed

+91
-9
lines changed

2 files changed

+91
-9
lines changed

src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector {
126126
private PacketEvent finalEvent;
127127

128128
/**
129-
* A flag set by the main thread to indiciate that a packet should not be processed.
129+
* A queue of packets that were sent with filtered=false
130130
*/
131-
private final ThreadLocal<Boolean> scheduleProcessPackets = ThreadLocal.withInitial(() -> true);
131+
private final PacketFilterQueue unfilteredProcessedPackets = new PacketFilterQueue();
132132

133133
// Other handlers
134134
private ByteToMessageDecoder vanillaDecoder;
@@ -328,7 +328,7 @@ PacketEvent handleScheduled(Object instance, FieldAccessor accessor) {
328328
Object original = accessor.get(instance);
329329

330330
// See if we've been instructed not to process packets
331-
if (!scheduleProcessPackets.get()) {
331+
if (unfilteredProcessedPackets.contains(original)) {
332332
NetworkMarker marker = getMarker(original);
333333

334334
if (marker != null) {
@@ -416,7 +416,7 @@ private void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) th
416416

417417
try {
418418
// Skip every kind of non-filtered packet
419-
if (!scheduleProcessPackets.get()) {
419+
if (unfilteredProcessedPackets.remove(packet)) {
420420
return;
421421
}
422422

@@ -663,12 +663,11 @@ private void disconnect(String message) {
663663
public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
664664
saveMarker(packet, marker);
665665

666-
try {
667-
scheduleProcessPackets.set(filtered);
668-
invokeSendPacket(packet);
669-
} finally {
670-
scheduleProcessPackets.set(true);
666+
if (!filtered) {
667+
unfilteredProcessedPackets.add(packet);
671668
}
669+
670+
invokeSendPacket(packet);
672671
}
673672

674673
/**
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
3+
* Copyright (C) 2015 dmulloy2
4+
*
5+
* This program is free software; you can redistribute it and/or modify it under the terms of the
6+
* GNU General Public License as published by the Free Software Foundation; either version 2 of
7+
* the License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
10+
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11+
* See the GNU General Public License for more details.
12+
*
13+
* You should have received a copy of the GNU General Public License along with this program;
14+
* if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
15+
* 02111-1307 USA
16+
*/
17+
package com.comphenix.protocol.injector.netty;
18+
19+
import java.util.ArrayDeque;
20+
import java.util.Queue;
21+
22+
/**
23+
* Stores packets that need to be sent without being handled by the listeners (filtered=false).
24+
* When other packets sent after sending the packet are removed, the packet is removed as well
25+
* to prevent a memory leak, assuming a consistent send order is in place.
26+
*
27+
* @author bergerkiller
28+
*/
29+
public class PacketFilterQueue {
30+
private Queue<Object> queue = new ArrayDeque<>();
31+
32+
/**
33+
* Adds a packet to this queue, indicating further on that it should not be filtered.
34+
*
35+
* @param packet
36+
*/
37+
public synchronized void add(Object packet) {
38+
queue.add(packet);
39+
}
40+
41+
/**
42+
* Checks whether a packet is contained inside this queue, indicating
43+
* it should not be filtered.
44+
*
45+
* @param packet
46+
* @return True if contained and packet should not be filtered (filtered=false)
47+
*/
48+
public synchronized boolean contains(Object packet) {
49+
return queue.contains(packet);
50+
}
51+
52+
/**
53+
* Checks whether a packet is contained inside this queue and removes it if so.
54+
* Other packets marked in this queue that were sent before this packet are
55+
* removed from the queue also, avoiding memory leaks because of dropped packets.
56+
*
57+
* @param packet
58+
* @return True if contained and packet should not be filtered (filtered=false)
59+
*/
60+
public synchronized boolean remove(Object packet) {
61+
if (queue.isEmpty()) {
62+
// Nothing in the queue
63+
return false;
64+
} else if (queue.peek() == packet) {
65+
// First in the queue (expected)
66+
queue.poll();
67+
return true;
68+
} else if (!queue.contains(packet)) {
69+
// There are unfiltered packets, but this one is not
70+
return false;
71+
} else {
72+
// We have skipped over some packets (unexpected)
73+
// Poll packets until we find it
74+
while (queue.poll() != packet) {
75+
if (queue.isEmpty()) {
76+
// This should never happen! But to avoid infinite loop.
77+
return false;
78+
}
79+
}
80+
return true;
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)