Skip to content

Commit 32d1cd2

Browse files
committed
Client: Synchronize addReadBuffer to ensure it's not run concurrently with getRead()
getRead() typically is invoked on the client thread, and when used that way this should not be a concern. However if you are not on the reader thread there is the potential for it to try and read while a write is being added. This is a use case we want to continue to support, so this fixes it by locking while the read is being added.
1 parent 7b4679f commit 32d1cd2

File tree

4 files changed

+12
-8
lines changed

4 files changed

+12
-8
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Include the litesockets library into your project from maven central:
99
<dependency>
1010
<groupId>org.threadly</groupId>
1111
<artifactId>litesockets</artifactId>
12-
<version>4.4</version>
12+
<version>4.5</version>
1313
</dependency>
1414
```
1515

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
group = org.threadly
2-
version = 4.4
2+
version = 4.5
33
threadlyVersion = 5.24

src/main/java/org/threadly/litesockets/Client.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public abstract class Client implements Closeable {
4040
protected final SocketExecuterCommonBase se;
4141
protected final long startTime = Clock.lastKnownForwardProgressingMillis();
4242
protected final Object readerLock = new Object();
43-
protected final Object writerLock = new Object();
4443
protected final ClientByteStats stats = new ClientByteStats();
4544
protected final AtomicBoolean closed = new AtomicBoolean(false);
4645
protected final ConcurrentLinkedQueue<ClientCloseListener> closerListener = new ConcurrentLinkedQueue<>();
@@ -308,14 +307,18 @@ private void runListener(Runnable listener, boolean invokedOnClientThread) {
308307
* @param bb the {@link ByteBuffer} to add to the clients readBuffer.
309308
*/
310309
protected void addReadBuffer(final ByteBuffer bb) {
310+
if (! bb.hasRemaining()) {
311+
return;
312+
}
311313
addReadStats(bb.remaining());
312314
se.addReadAmount(bb.remaining());
313315
int start;
314-
int end;
315-
start = readBuffers.remaining();
316-
readBuffers.add(bb);
317-
end = readBuffers.remaining();
318-
if(end > 0 && start == 0){
316+
// synchronize to ensure readBuffers are not modified by non-client thread getRead call
317+
synchronized (readerLock) {
318+
start = readBuffers.remaining();
319+
readBuffers.add(bb);
320+
}
321+
if(start == 0){
319322
callReader(true); // we assume all buffers are added from the clients thread
320323
}
321324
}

src/main/java/org/threadly/litesockets/TCPClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class TCPClient extends Client {
4040
private final ReuseableMergedByteBuffers writeBuffers = new ReuseableMergedByteBuffers();
4141
private final Deque<Pair<Long, SettableListenableFuture<Long>>> writeFutures = new ArrayDeque<>(8);
4242
private final TCPSocketOptions tso = new TCPSocketOptions();
43+
protected final Object writerLock = new Object();
4344
protected final AtomicBoolean startedConnection = new AtomicBoolean(false);
4445
protected final SettableListenableFuture<Boolean> connectionFuture;
4546
protected final SocketChannel channel;

0 commit comments

Comments
 (0)