Skip to content

Commit 33173fb

Browse files
committed
non-locking counting
1 parent 872d7cd commit 33173fb

File tree

4 files changed

+43
-57
lines changed

4 files changed

+43
-57
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,20 @@
3030
@Threads(12)
3131
@Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS)
3232
@Measurement(iterations = 5, time = 600, timeUnit = TimeUnit.MILLISECONDS)
33-
@BenchmarkMode(Mode.AverageTime)
33+
@BenchmarkMode(Mode.SampleTime)
3434
@OutputTimeUnit(TimeUnit.MICROSECONDS)
3535
@State(Scope.Benchmark)
3636
@Fork(1)
3737
public class ThreadPoolUtilizationBenchmark {
3838

39-
@Param({ "0", "10000", "100000" })
39+
@Param({ "10000" })
4040
private int callIntervalTicks;
4141

4242
/**
4343
* This makes very little difference, all the overhead is in the synchronization
4444
*/
4545
@Param({ "10" })
4646
private int utilizationIntervalMs;
47-
48-
@State(Scope.Thread)
49-
public static class TaskState {
50-
boolean running = false;
51-
52-
boolean shouldStart() {
53-
return (running = running == false);
54-
}
55-
}
56-
5747
private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker;
5848

5949
@Setup
@@ -69,32 +59,20 @@ public void baseline() {
6959
Blackhole.consumeCPU(callIntervalTicks);
7060
}
7161

72-
@Group("ReadAndWrite")
62+
@Group("StartAndEnd")
7363
@Benchmark
7464
public void startAndStopTasks(TaskState state) {
65+
timeTracker.startTask();
7566
Blackhole.consumeCPU(callIntervalTicks);
76-
if (state.shouldStart()) {
77-
timeTracker.startTask();
78-
} else {
79-
timeTracker.endTask();
80-
}
67+
timeTracker.endTask();
8168
}
8269

83-
@Benchmark
84-
@Group("ReadAndWrite")
85-
public void readPrevious(Blackhole blackhole) {
86-
Blackhole.consumeCPU(callIntervalTicks);
87-
blackhole.consume(timeTracker.previousFrameTime());
88-
}
70+
@State(Scope.Thread)
71+
public static class TaskState {
72+
boolean running = false;
8973

90-
@Benchmark
91-
@Group("JustWrite")
92-
public void startAndStopTasksOnly(TaskState state) {
93-
Blackhole.consumeCPU(callIntervalTicks);
94-
if (state.shouldStart()) {
95-
timeTracker.startTask();
96-
} else {
97-
timeTracker.endTask();
74+
boolean shouldStart() {
75+
return (running = running == false);
9876
}
9977
}
10078
}

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.concurrent.RejectedExecutionHandler;
2828
import java.util.concurrent.ThreadFactory;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicLong;
3032
import java.util.concurrent.atomic.LongAccumulator;
3133
import java.util.concurrent.atomic.LongAdder;
3234
import java.util.function.Function;
@@ -260,10 +262,11 @@ public boolean trackingMaxQueueLatency() {
260262
public static class FramedTimeTracker {
261263
final long interval;
262264
private final Supplier<Long> timeNow;
263-
long ongoingTasks;
264-
long currentFrame;
265-
long currentTime;
266-
long previousTime;
265+
private final AtomicLong ongoingTasks = new AtomicLong();
266+
private final AtomicLong currentFrame = new AtomicLong();
267+
private final AtomicLong currentTime = new AtomicLong();
268+
private final AtomicLong previousTime = new AtomicLong();
269+
private final AtomicBoolean updatingFrame = new AtomicBoolean();
267270

268271
// for testing
269272
public FramedTimeTracker(long intervalNano, Supplier<Long> timeNow) {
@@ -278,10 +281,6 @@ public FramedTimeTracker(long intervalNano, Supplier<Long> timeNow) {
278281
this.timeNow = System::nanoTime;
279282
}
280283

281-
public synchronized void updateFrame() {
282-
updateFrame0(timeNow.get());
283-
}
284-
285284
/**
286285
* Update frames to current time. There are no guaranties that it will be invoked frequently.
287286
* For example when there are no tasks and no requests for previousFrameTime.
@@ -295,44 +294,53 @@ public synchronized void updateFrame() {
295294
*/
296295
private void updateFrame0(long nowTime) {
297296
var now = nowTime / interval;
298-
if (currentFrame < now) {
299-
if (currentFrame == now - 1) {
300-
previousTime = currentTime; //
297+
var current = currentFrame.get();
298+
if (current < now) {
299+
if (updatingFrame.compareAndSet(false, true)) {
300+
var tasks = ongoingTasks.get();
301+
if (current == now - 1) {
302+
previousTime.set(currentTime.get());
303+
} else {
304+
previousTime.set(tasks * interval);
305+
}
306+
currentTime.set(tasks * interval);
307+
currentFrame.set(now);
308+
updatingFrame.set(false);
301309
} else {
302-
previousTime = ongoingTasks * interval;
310+
while (currentFrame.get() != now) {
311+
Thread.onSpinWait();
312+
}
303313
}
304-
currentTime = ongoingTasks * interval;
305-
currentFrame = now;
306314
}
307315
}
308316

309317
/**
310318
* Start tracking new task, assume that task runs indefinitely, or at least till end of frame.
311319
* If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time.
312320
*/
313-
public synchronized void startTask() {
321+
public void startTask() {
314322
var now = timeNow.get();
315323
updateFrame0(now);
316-
currentTime += (currentFrame + 1) * interval - now;
317-
++ongoingTasks;
324+
ongoingTasks.incrementAndGet();
325+
currentTime.updateAndGet((t) -> t + (currentFrame.get() + 1) * interval - now);
318326
}
319327

320328
/**
321329
* Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time.
322330
*/
323-
public synchronized void endTask() {
331+
public void endTask() {
324332
var now = timeNow.get();
325333
updateFrame0(now);
326-
currentTime -= (currentFrame + 1) * interval - now;
327-
--ongoingTasks;
334+
ongoingTasks.decrementAndGet();
335+
currentTime.updateAndGet((t) -> t - (currentFrame.get() + 1) * interval + now);
328336
}
329337

330338
/**
331339
* Returns previous frame total execution time.
332340
*/
333-
public synchronized long previousFrameTime() {
341+
public long previousFrameTime() {
334342
updateFrame0(timeNow.get());
335-
return previousTime;
343+
return previousTime.get();
336344
}
337345
}
338346
}

server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void setup() {
3232

3333
public void testNoTasks() {
3434
var tracker = newTracker(1);
35-
tracker.updateFrame();
35+
tracker.previousFrameTime();
3636
assertEquals(0, tracker.previousFrameTime());
3737
fakeTime.time += between(1, 100);
3838
assertEquals(0, tracker.previousFrameTime());

server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public void testUtilization() throws InterruptedException {
244244

245245
final Runnable waitTillNextFrame = () -> {
246246
var now = System.nanoTime();
247-
var waitTillNext = (now / interval.getNano() + 1) * interval.getNano() - now;
247+
var waitTillNext = (now / interval.toNanos() + 1) * interval.toNanos() - now;
248248
trySleep.accept(Duration.ofNanos(waitTillNext));
249249
};
250250

@@ -274,7 +274,7 @@ public void testUtilization() throws InterruptedException {
274274
waitTillNextFrame.run();
275275

276276
DoubleStream.of(0.1, 0.3, 0.5, 3.).forEach(loadFactor -> {
277-
var sleepTime = (long) (interval.getNano() * loadFactor);
277+
var sleepTime = (long) (interval.toNanos() * loadFactor);
278278
executor.submit(sleepTaskFn.apply(Duration.ofNanos(sleepTime)));
279279
});
280280

0 commit comments

Comments
 (0)