Skip to content

Commit 42cede9

Browse files
authored
Merge pull request #67 from jentfoo/statChanges
Client: Add the ability to disable stats
2 parents a46ae25 + 9bf0901 commit 42cede9

File tree

12 files changed

+190
-86
lines changed

12 files changed

+190
-86
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
group = org.threadly
22
version = 4.10-SNAPSHOT
3-
threadlyVersion = 5.34
3+
threadlyVersion = 5.37

src/main/java/org/threadly/litesockets/Client.java

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.threadly.litesockets.buffers.ReuseableMergedByteBuffers;
1515
import org.threadly.litesockets.utils.IOUtils;
1616
import org.threadly.litesockets.utils.SimpleByteStats;
17-
import org.threadly.util.ArgumentVerifier;
1817
import org.threadly.util.Clock;
1918
import org.threadly.util.ExceptionUtils;
2019

@@ -40,9 +39,9 @@ public abstract class Client implements Closeable {
4039
protected final SocketExecuterCommonBase se;
4140
protected final long startTime = Clock.lastKnownForwardProgressingMillis();
4241
protected final Object readerLock = new Object();
43-
protected final ClientByteStats stats = new ClientByteStats();
4442
protected final AtomicBoolean closed = new AtomicBoolean(false);
4543
protected final ConcurrentLinkedQueue<ClientCloseListener> closerListener = new ConcurrentLinkedQueue<>();
44+
protected volatile ClientByteStats stats;
4645
protected volatile Runnable readerCaller = null;
4746
protected volatile boolean useNativeBuffers = false;
4847
protected volatile boolean keepReadBuffer = true;
@@ -51,12 +50,17 @@ public abstract class Client implements Closeable {
5150
protected volatile int newReadBufferSize = IOUtils.DEFAULT_CLIENT_READ_BUFFER_SIZE;
5251
private ByteBuffer readByteBuffer = IOUtils.EMPTY_BYTEBUFFER;
5352

54-
public Client(final SocketExecuterCommonBase se) {
53+
public Client(final SocketExecuterCommonBase se, final boolean statsEnabled) {
54+
setStatsEnabled(statsEnabled);
55+
5556
this.se = se;
5657
this.clientExecutor = se.getExecutorFor(this);
5758
}
5859

59-
protected Client(final SocketExecuterCommonBase se, final SubmitterExecutor lclientExecutor) {
60+
protected Client(final SocketExecuterCommonBase se, final SubmitterExecutor lclientExecutor,
61+
final boolean statsEnabled) {
62+
setStatsEnabled(statsEnabled);
63+
6064
this.se = se;
6165
this.clientExecutor = lclientExecutor;
6266
}
@@ -120,14 +124,12 @@ protected <T> SettableListenableFuture<T> makeClientSettableListenableFuture() {
120124
*/
121125
public abstract SocketAddress getRemoteSocketAddress();
122126

123-
124127
/**
125128
*
126129
* @return the local {@link SocketAddress} this client is using.
127130
*/
128131
public abstract SocketAddress getLocalSocketAddress();
129132

130-
131133
/**
132134
* Returns true if this client has data pending in its write buffers. False if there is no data pending write.
133135
*
@@ -232,11 +234,11 @@ public void close() {
232234

233235
/*Implemented functions*/
234236

235-
protected void addReadStats(final int size) {
237+
protected void recordReadStats(final int size) {
236238
stats.addRead(size);
237239
}
238240

239-
protected void addWriteStats(final int size) {
241+
protected void recordWriteStats(final int size) {
240242
stats.addWrite(size);
241243
}
242244

@@ -311,8 +313,8 @@ protected void addReadBuffer(final ByteBuffer bb) {
311313
if (! bb.hasRemaining()) {
312314
return;
313315
}
314-
addReadStats(bb.remaining());
315-
se.addReadAmount(bb.remaining());
316+
recordReadStats(bb.remaining());
317+
se.recordReadStats(bb.remaining());
316318
int start;
317319
// synchronize to ensure readBuffers are not modified by non-client thread getRead call
318320
synchronized (readerLock) {
@@ -441,24 +443,49 @@ protected boolean setClose() {
441443
public SimpleByteStats getStats() {
442444
return stats;
443445
}
446+
447+
/**
448+
* Can be invoked with {@code false} to disable stat collection on client. This can reduce
449+
* minor memory and performance overheads.
450+
*
451+
* @param enabled {@code false} to disable stat collection, or {@code true} to enable / reset stats
452+
*/
453+
public void setStatsEnabled(boolean enabled) {
454+
if (enabled) {
455+
ClientByteStats stats = this.stats;
456+
if (stats == ClientByteStats.NO_OP_STATS || stats == null) {
457+
this.stats = new ClientByteStats();
458+
} else {
459+
stats.resetStats();
460+
}
461+
} else {
462+
stats = ClientByteStats.NO_OP_STATS;
463+
}
464+
}
444465

445466
/**
446467
* Implementation of the SimpleByteStats.
447468
*/
448469
private static class ClientByteStats extends SimpleByteStats {
449-
public ClientByteStats() {
450-
super();
451-
}
470+
public static final ClientByteStats NO_OP_STATS = new ClientByteStats() {
471+
@Override
472+
protected void addWrite(final int size) {
473+
// ignored
474+
}
475+
476+
@Override
477+
protected void addRead(final int size) {
478+
// ignored
479+
}
480+
};
452481

453482
@Override
454483
protected void addWrite(final int size) {
455-
ArgumentVerifier.assertNotNegative(size, "size");
456484
super.addWrite(size);
457485
}
458486

459487
@Override
460488
protected void addRead(final int size) {
461-
ArgumentVerifier.assertNotNegative(size, "size");
462489
super.addRead(size);
463490
}
464491
}
@@ -491,7 +518,6 @@ public interface Reader {
491518
public void onRead(Client client);
492519
}
493520

494-
495521
/**
496522
* Used to notify when a Client is closed.
497523
*

src/main/java/org/threadly/litesockets/NoThreadSocketExecuter.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,15 @@ public class NoThreadSocketExecuter extends SocketExecuterCommonBase {
4040
* Constructs a NoThreadSocketExecuter. {@link #start()} must still be called before using it.
4141
*/
4242
public NoThreadSocketExecuter() {
43-
super(new NoThreadScheduler());
43+
this(new NoThreadScheduler());
44+
}
45+
46+
/**
47+
* Constructs a NoThreadSocketExecuter. {@link #start()} must still be called before using it.
48+
*/
49+
public NoThreadSocketExecuter(NoThreadScheduler scheduler) {
50+
super(scheduler);
51+
4452
localNoThreadScheduler = (NoThreadScheduler)schedulerPool;
4553
}
4654

@@ -88,8 +96,6 @@ public void setUDPServerOperations(final UDPServer udpServer, final boolean enab
8896
protected void startupService() {
8997
commonSelector = openSelector();
9098
this.acceptSelector = commonSelector;
91-
this.readSelector = commonSelector;
92-
this.writeSelector = commonSelector;
9399
}
94100

95101
@Override
@@ -219,7 +225,7 @@ public void select(final int delay) {
219225
if(server != null) {
220226
if(server instanceof UDPServer) {
221227
UDPServer us = (UDPServer) server;
222-
stats.addWrite(us.doWrite());
228+
recordWriteStats(us.doWrite());
223229
setUDPServerOperations(us, true);
224230
}
225231
}
Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,69 @@
11
package org.threadly.litesockets;
22

3-
import org.threadly.concurrent.SingleThreadScheduler;
3+
import org.threadly.concurrent.ConfigurableThreadFactory;
4+
import org.threadly.concurrent.NoThreadScheduler;
5+
import org.threadly.util.ExceptionUtils;
46

57
/**
6-
* This is a SingleThreaded implementation of a SocketExecuter.
7-
*
8-
*
9-
*
8+
* This is a SingleThreaded implementation of a SocketExecuter. If single threaded performance is
9+
* desired it should be slightly less overhead than a {@link ThreadedSocketExecuter}.
1010
*/
1111
public class SingleThreadSocketExecuter extends NoThreadSocketExecuter {
1212
private static final int SELECT_TIME_MS = 10000;
13+
private static final ConfigurableThreadFactory THREAD_FACTORY =
14+
new ConfigurableThreadFactory("SingleThreadSocketExecuter-", false, true,
15+
Thread.currentThread().getPriority(), null, null);
1316

14-
private final SingleThreadScheduler sts = new SingleThreadScheduler();
15-
17+
private Thread runningThread = null;
18+
19+
/**
20+
* Constructs a SingleThreadSocketExecuter. {@link #start()} must still be called before using it.
21+
*/
1622
public SingleThreadSocketExecuter() {
1723
super();
1824
}
25+
26+
/**
27+
* Constructs a SingleThreadSocketExecuter. {@link #start()} must still be called before using it.
28+
* <p>
29+
* This accepts a {@link NoThreadScheduler} so that stats can be collected if desired.
30+
*/
31+
public SingleThreadSocketExecuter(NoThreadScheduler scheduler) {
32+
super(scheduler);
33+
}
1934

2035
@Override
2136
protected void startupService() {
2237
super.startupService();
23-
sts.execute(()->{
38+
runningThread = THREAD_FACTORY.newThread(()->{
2439
while(isRunning()) {
25-
super.select(SELECT_TIME_MS);
40+
try {
41+
super.select(SELECT_TIME_MS);
42+
} catch (Throwable t) {
43+
ExceptionUtils.handleException(t);
44+
}
2645
}
2746
});
47+
runningThread.start();
2848
}
2949

50+
@Override
51+
protected void shutdownService() {
52+
super.shutdownService();
53+
54+
try {
55+
runningThread.join();
56+
} catch (InterruptedException e) {
57+
Thread.currentThread().interrupt();
58+
throw new RuntimeException(e);
59+
}
60+
}
61+
62+
@Override
3063
public void select() {
3164
}
3265

66+
@Override
3367
public void select(int delay) {
3468
}
3569
}

src/main/java/org/threadly/litesockets/SocketExecuter.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,30 @@
2121
*
2222
*/
2323
public interface SocketExecuter extends Service {
24+
/**
25+
* By default per-connection stats will be enabled. Stats can always be disabled on a per-Client
26+
* basis, but this allows the default state of the stats to be enabled / disabled.
27+
*
28+
* @param enabled {@code false} if stats should not be collected per-connection by default
29+
*/
30+
public void setPerConnectionStatsEnabled(boolean enabled);
31+
32+
33+
/**
34+
* Check the total number of bytes pending to be sent by clients. This is data which has been
35+
* provided to the client to write, but is waiting on the network and kernel to accept the write.
36+
*
37+
* @return The total number of bytes pending to write by clients
38+
*/
39+
public long getTotalPendingWriteBytes();
40+
41+
/**
42+
* Check the total amount of pending reads across all associated clients. Bytes here indicate
43+
* that the client has been notified a read is available but has not consumed it.
44+
*
45+
* @return The total number of bytes pending to read by clients
46+
*/
47+
public long getTotalPendingReadBytes();
2448

2549
/**
2650
* This will create a UDPServer on the specified {@link SocketExecuter}.

src/main/java/org/threadly/litesockets/SocketExecuterCommonBase.java

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,45 +23,58 @@
2323
* This is a common base class for the Threaded and NoThread SocketExecuters.
2424
*/
2525
abstract class SocketExecuterCommonBase extends AbstractService implements SocketExecuter {
26-
private final Logger log = Logger.getLogger(this.getClass().toString());
27-
protected final SubmitterScheduler acceptScheduler;
28-
protected final SubmitterScheduler readScheduler;
29-
protected final SubmitterScheduler writeScheduler;
26+
protected final Logger log = Logger.getLogger(this.getClass().toString());
3027
protected final SubmitterScheduler schedulerPool;
28+
protected final SubmitterScheduler acceptScheduler;
3129
protected final ConcurrentHashMap<SocketChannel, Client> clients = new ConcurrentHashMap<>();
3230
protected final ConcurrentHashMap<SelectableChannel, Server> servers = new ConcurrentHashMap<>();
3331
protected final SocketExecuterByteStats stats = new SocketExecuterByteStats();
3432
protected final WatchdogCache dogCache;
35-
protected volatile boolean verboseLogging = false;
36-
protected Selector readSelector;
37-
protected Selector writeSelector;
33+
protected volatile boolean perConnectionStatsEnabled = true;
3834
protected Selector acceptSelector;
3935

4036
SocketExecuterCommonBase(final SubmitterScheduler scheduler) {
41-
this(scheduler,scheduler,scheduler,scheduler);
37+
this(scheduler, scheduler);
4238
}
4339

44-
SocketExecuterCommonBase(final SubmitterScheduler acceptScheduler,
45-
final SubmitterScheduler readScheduler,
46-
final SubmitterScheduler writeScheduler,
47-
final SubmitterScheduler ssi) {
40+
SocketExecuterCommonBase(final SubmitterScheduler acceptScheduler, final SubmitterScheduler ssi) {
4841
log.setParent(Logger.getGlobal());
4942
ArgumentVerifier.assertNotNull(ssi, "ThreadScheduler");
5043
ArgumentVerifier.assertNotNull(acceptScheduler, "Accept Scheduler");
51-
ArgumentVerifier.assertNotNull(readScheduler, "Read Scheduler");
52-
ArgumentVerifier.assertNotNull(writeScheduler, "Write Scheduler");
44+
5345
schedulerPool = ssi;
5446
dogCache = new WatchdogCache(ssi, true);
5547
this.acceptScheduler = acceptScheduler;
56-
this.readScheduler = readScheduler;
57-
this.writeScheduler = writeScheduler;
5848
}
5949

60-
protected void addReadAmount(int size) {
50+
@Override
51+
public long getTotalPendingWriteBytes() {
52+
long result = 0;
53+
for (Client c : clients.values()) {
54+
result += c.getWriteBufferSize();
55+
}
56+
return result;
57+
}
58+
59+
@Override
60+
public long getTotalPendingReadBytes() {
61+
long result = 0;
62+
for (Client c : clients.values()) {
63+
result += c.getReadBufferSize();
64+
}
65+
return result;
66+
}
67+
68+
@Override
69+
public void setPerConnectionStatsEnabled(boolean enabled) {
70+
perConnectionStatsEnabled = enabled;
71+
}
72+
73+
protected void recordReadStats(int size) {
6174
stats.addRead(size);
6275
}
6376

64-
protected void addWriteAmount(int size) {
77+
protected void recordWriteStats(int size) {
6578
stats.addWrite(size);
6679
}
6780

@@ -74,7 +87,7 @@ protected void checkRunning() {
7487
@Override
7588
public TCPClient createTCPClient(final String host, final int port) throws IOException {
7689
checkRunning();
77-
TCPClient tc = new TCPClient(this, host, port);
90+
TCPClient tc = new TCPClient(this, host, port, perConnectionStatsEnabled);
7891
clients.put(((Client)tc).getChannel(), tc);
7992
return tc;
8093
}
@@ -83,7 +96,7 @@ public TCPClient createTCPClient(final String host, final int port) throws IOExc
8396
@Override
8497
public TCPClient createTCPClient(final SocketChannel sc) throws IOException {
8598
checkRunning();
86-
final TCPClient tc = new TCPClient(this, sc);
99+
final TCPClient tc = new TCPClient(this, sc, perConnectionStatsEnabled);
87100
clients.put(((Client)tc).getChannel(), tc);
88101
this.setClientOperations(tc);
89102
return tc;
@@ -173,10 +186,6 @@ public SimpleByteStats getStats() {
173186
return stats;
174187
}
175188

176-
protected SocketExecuterByteStats writeableStats() {
177-
return stats;
178-
}
179-
180189
@Override
181190
public void watchFuture(final ListenableFuture<?> lf, final long delay) {
182191
dogCache.watch(lf, delay);

0 commit comments

Comments
 (0)