Skip to content

Commit bcc863f

Browse files
committed
[test] fixing test to account for sharding
1 parent 183d368 commit bcc863f

File tree

5 files changed

+81
-23
lines changed

5 files changed

+81
-23
lines changed

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
324324

325325
// similar settings, but a single worker and non-blocking.
326326
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
327-
poolSize, 1, false, 0, aggregationShards);
327+
poolSize, 1, 1, false, 0, aggregationShards);
328328
}
329329

330330
this.telemetry = new Telemetry(telemetrytags, telemetryStatsDProcessor);
@@ -1001,7 +1001,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
10011001

10021002
this(prefix, queueSize, constantTags, errorHandler, addressLookup, addressLookup, timeout,
10031003
bufferSize, maxPacketSizeBytes, entityID, poolSize, processorWorkers, senderWorkers,
1004-
blocking, enableTelemetry, telemetryFlushInterval, 0, 0);
1004+
DEFAULT_LOCK_SHARD_GRAIN, blocking, enableTelemetry, telemetryFlushInterval, 0, 0);
10051005
}
10061006

10071007
protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,

src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,12 @@ public void run() {
107107
final int aggregatorFlushInterval, final int aggregatorShards) throws Exception {
108108

109109
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards);
110+
110111
this.messages = new ArrayBlockingQueue[lockShardGrain];
111112
for (int i = 0 ; i < lockShardGrain ; i++) {
112113
this.messages[i] = new ArrayBlockingQueue<Message>(queueSize);
113114
}
115+
114116
this.processorWorkQueue = new ArrayBlockingQueue[workers];
115117
for (int i = 0 ; i < workers ; i++) {
116118
this.processorWorkQueue[i] = new ArrayBlockingQueue<Integer>(queueSize);
@@ -126,7 +128,16 @@ protected ProcessingTask createProcessingTask(int id) {
126128
throws Exception {
127129

128130
super(processor);
129-
this.messages = new ArrayBlockingQueue<>(processor.getQcapacity());
131+
132+
this.messages = new ArrayBlockingQueue[lockShardGrain];
133+
for (int i = 0 ; i < lockShardGrain ; i++) {
134+
this.messages[i] = new ArrayBlockingQueue<Message>(getQcapacity());
135+
}
136+
137+
this.processorWorkQueue = new ArrayBlockingQueue[workers];
138+
for (int i = 0 ; i < workers ; i++) {
139+
this.processorWorkQueue[i] = new ArrayBlockingQueue<Integer>(getQcapacity());
140+
}
130141
}
131142

132143
@Override

src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ public class StatsDNonBlockingProcessor extends StatsDProcessor {
1717

1818
private class ProcessingTask extends StatsDProcessor.ProcessingTask {
1919

20-
private final int processorQueueId;
21-
2220
public ProcessingTask(int id) {
2321
super(id);
2422
}
@@ -28,6 +26,7 @@ public void run() {
2826
ByteBuffer sendBuffer;
2927
boolean empty = true;
3028
boolean emptyHighPrio = true;
29+
int messageQueueIdx = 0;
3130

3231
try {
3332
sendBuffer = bufferPool.borrow();
@@ -57,7 +56,7 @@ public void run() {
5756
message = highPrioMessages.poll();
5857
} else {
5958

60-
final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll();
59+
messageQueueIdx = processorWorkQueue[this.processorQueueId].poll();
6160
message = messages[messageQueueIdx].poll();
6261
}
6362

@@ -126,7 +125,6 @@ public void run() {
126125
final int aggregatorShards) throws Exception {
127126

128127
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain, aggregatorFlushInterval, aggregatorShards);
129-
this.qsize = new AtomicInteger(0);
130128

131129
this.qsize = new AtomicInteger[lockShardGrain];
132130
this.messages = new ConcurrentLinkedQueue[lockShardGrain];
@@ -149,8 +147,19 @@ protected ProcessingTask createProcessingTask(int id) {
149147

150148
StatsDNonBlockingProcessor(final StatsDNonBlockingProcessor processor) throws Exception {
151149
super(processor);
152-
this.qsize = new AtomicInteger(0);
153-
this.messages = new ConcurrentLinkedQueue<>();
150+
151+
this.qsize = new AtomicInteger[lockShardGrain];
152+
this.messages = new ConcurrentLinkedQueue[lockShardGrain];
153+
for (int i = 0 ; i < lockShardGrain ; i++) {
154+
this.qsize[i] = new AtomicInteger();
155+
this.messages[i] = new ConcurrentLinkedQueue<Message>();
156+
this.qsize[i].set(0);
157+
}
158+
159+
this.processorWorkQueue = new ConcurrentLinkedQueue[workers];
160+
for (int i = 0 ; i < workers ; i++) {
161+
this.processorWorkQueue[i] = new ConcurrentLinkedQueue<Integer>();
162+
}
154163
}
155164

156165
@Override

src/main/java/com/timgroup/statsd/StatsDProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public Thread newThread(final Runnable runnable) {
116116

117117
this.handler = processor.handler;
118118
this.workers = processor.workers;
119+
this.lockShardGrain = processor.lockShardGrain;
119120
this.qcapacity = processor.getQcapacity();
120121

121122
this.executor = Executors.newFixedThreadPool(workers, new ThreadFactory() {

src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Queue;
2020
import java.util.Random;
2121
import java.util.concurrent.atomic.AtomicInteger;
22+
import java.util.concurrent.ArrayBlockingQueue;
2223
import java.util.concurrent.Callable;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -81,22 +82,42 @@ protected void writeTo(StringBuilder builder){}
8182
// fakeProcessor store messages from the telemetry only
8283
public static class FakeProcessor extends StatsDProcessor {
8384

84-
private final Queue<Message> messages;
85+
private final Queue<Message>[] messages;
86+
private final Queue<Integer>[] processorWorkQueue;
87+
private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate.
8588
private final AtomicInteger messageSent = new AtomicInteger(0);
8689
private final AtomicInteger messageAggregated = new AtomicInteger(0);
8790

8891
FakeProcessor(final StatsDClientErrorHandler handler) throws Exception {
89-
super(0, handler, 0, 1, 1, 0, 0);
90-
this.messages = new ConcurrentLinkedQueue<>();
91-
}
92+
super(50, handler, 0, 1, 1, 1, 0, StatsDAggregator.DEFAULT_SHARDS);
93+
94+
// 1 queue (lockShardGrain = 1)
95+
this.qsize = new AtomicInteger[lockShardGrain];
96+
this.messages = new ArrayBlockingQueue[lockShardGrain];
97+
for (int i = 0 ; i < lockShardGrain ; i++) {
98+
this.qsize[i] = new AtomicInteger();
99+
this.messages[i] = new ArrayBlockingQueue<Message>(getQcapacity());
100+
this.qsize[i].set(0);
101+
}
92102

103+
// 1 worker
104+
this.processorWorkQueue = new ArrayBlockingQueue[workers];
105+
for (int i = 0 ; i < workers ; i++) {
106+
this.processorWorkQueue[i] = new ArrayBlockingQueue<Integer>(getQcapacity());
107+
}
108+
}
93109

94110
private class FakeProcessingTask extends StatsDProcessor.ProcessingTask {
111+
112+
public FakeProcessingTask(int id) {
113+
super(id);
114+
}
115+
95116
@Override
96117
public void run() {
97118

98119
while (!shutdown) {
99-
final Message message = messages.poll();
120+
final Message message = messages[0].poll();
100121
if (message == null) {
101122

102123
try{
@@ -106,6 +127,7 @@ public void run() {
106127
continue;
107128
}
108129

130+
qsize[0].decrementAndGet();
109131
if (aggregator.aggregateMessage(message)) {
110132
messageAggregated.incrementAndGet();
111133
continue;
@@ -118,23 +140,38 @@ public void run() {
118140
}
119141

120142
@Override
121-
protected StatsDProcessor.ProcessingTask createProcessingTask() {
122-
return new FakeProcessingTask();
143+
protected StatsDProcessor.ProcessingTask createProcessingTask(int id) {
144+
return new FakeProcessingTask(id);
123145
}
124146

125147
@Override
126148
public boolean send(final Message msg) {
127-
messages.offer(msg);
128-
return true;
149+
if (!shutdown) {
150+
int threadId = getThreadId();
151+
int shard = threadId % lockShardGrain;
152+
int processQueue = threadId % workers;
153+
154+
if (qsize[shard].get() < qcapacity) {
155+
messages[shard].offer(msg);
156+
qsize[shard].incrementAndGet();
157+
processorWorkQueue[processQueue].offer(shard);
158+
return true;
159+
}
160+
}
161+
162+
return false;
129163
}
130164

131-
public Queue<Message> getMessages() {
132-
return messages;
165+
public Queue<Message> getMessages(int id) {
166+
return messages[id];
133167
}
134168

135169
public void clear() {
136170
try {
137-
messages.clear();
171+
172+
for (int i = 0 ; i < lockShardGrain ; i++) {
173+
messages[i].clear();
174+
}
138175
highPrioMessages.clear();
139176
} catch (Exception e) {}
140177
}
@@ -181,7 +218,7 @@ public void aggregate_messages() throws Exception {
181218
fakeProcessor.send(new FakeAlphaMessage("some.set", Message.Type.SET, "value"));
182219
}
183220

184-
waitForQueueSize(fakeProcessor.messages, 0);
221+
waitForQueueSize(fakeProcessor.messages[0], 0);
185222

186223
// 10 gauges, 10 counts, 10 sets
187224
assertEquals(30, fakeProcessor.messageAggregated.get());
@@ -206,7 +243,7 @@ public void aggregation_sharding() throws Exception {
206243
fakeProcessor.send(gauge);
207244
}
208245

209-
waitForQueueSize(fakeProcessor.messages, 0);
246+
waitForQueueSize(fakeProcessor.messages[0], 0);
210247

211248
for (int i=0 ; i<StatsDAggregator.DEFAULT_SHARDS ; i++) {
212249
Map<Message, Message> map = fakeProcessor.aggregator.aggregateMetrics.get(i);

0 commit comments

Comments
 (0)