Skip to content

Commit 3816a50

Browse files
committed
non-locking frame windows
1 parent 18115c4 commit 3816a50

File tree

2 files changed

+118
-41
lines changed

2 files changed

+118
-41
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/common/util/concurrent/ThreadPoolUtilizationBenchmark.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,18 @@
1414
import org.openjdk.jmh.annotations.BenchmarkMode;
1515
import org.openjdk.jmh.annotations.Fork;
1616
import org.openjdk.jmh.annotations.Group;
17-
import org.openjdk.jmh.annotations.Measurement;
1817
import org.openjdk.jmh.annotations.Mode;
1918
import org.openjdk.jmh.annotations.OutputTimeUnit;
2019
import org.openjdk.jmh.annotations.Param;
2120
import org.openjdk.jmh.annotations.Scope;
2221
import org.openjdk.jmh.annotations.Setup;
2322
import org.openjdk.jmh.annotations.State;
2423
import org.openjdk.jmh.annotations.Threads;
25-
import org.openjdk.jmh.annotations.Warmup;
2624
import org.openjdk.jmh.infra.Blackhole;
2725

2826
import java.util.concurrent.TimeUnit;
2927

3028
@Threads(Threads.MAX)
31-
@Warmup(iterations = 3, time = 200, timeUnit = TimeUnit.MILLISECONDS)
32-
@Measurement(iterations = 1, time = 60, timeUnit = TimeUnit.SECONDS)
3329
@BenchmarkMode(Mode.SampleTime)
3430
@OutputTimeUnit(TimeUnit.MICROSECONDS)
3531
@State(Scope.Benchmark)
@@ -42,7 +38,7 @@ public class ThreadPoolUtilizationBenchmark {
4238
/**
4339
* This makes very little difference, all the overhead is in the synchronization
4440
*/
45-
@Param({ "100" })
41+
@Param({ "10" })
4642
private int utilizationIntervalMs;
4743
private TaskExecutionTimeTrackingEsThreadPoolExecutor.FramedTimeTracker timeTracker;
4844

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 117 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import java.util.concurrent.RejectedExecutionHandler;
2828
import java.util.concurrent.ThreadFactory;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
import java.util.concurrent.atomic.AtomicReference;
3033
import java.util.concurrent.atomic.LongAccumulator;
3134
import java.util.concurrent.atomic.LongAdder;
3235
import java.util.function.Function;
@@ -258,12 +261,11 @@ public boolean trackingMaxQueueLatency() {
258261
* Can be extended to remember multiple past frames.
259262
*/
260263
public static class FramedTimeTracker {
261-
final long interval;
264+
private final long interval;
262265
private final Supplier<Long> timeNow;
263-
private long ongoingTasks;
264-
private long currentFrame;
265-
private long currentTime;
266-
private long previousTime;
266+
private final AtomicReference<FrameWindow> frameWindowRef = new AtomicReference<>(new FrameWindow());
267+
private final AtomicBoolean updatingFrame = new AtomicBoolean();
268+
private final AtomicLong currentFrameNum = new AtomicLong();
267269

268270
// for testing
269271
public FramedTimeTracker(long intervalNano, Supplier<Long> timeNow) {
@@ -283,56 +285,135 @@ public long interval() {
283285
}
284286

285287
/**
286-
* Update frames to current time. There are no guaranties that it will be invoked frequently.
287-
* For example when there are no tasks and no requests for previousFrameTime.
288-
*
289-
* When it's invoked frequently, at least once per frame, we move currentTime into previousTime.
290-
* That concludes currentTime and it's accurate.
291-
*
292-
* When it's invoked infrequently, once in multiple frames, current and previous frames are going to be stale.
293-
* Which is ok, that means there were no changes in tasks(start/end), all ongoing tasks are still running.
294-
* That means ongoing tasks fully utilized previous frames. And we can accurately tell previous frame usage.
288+
* Returns current FrameWindow. If window is stale, it will slide to current time.
289+
* @param now - current frame
295290
*/
296-
private void updateFrame0(long nowTime) {
297-
var now = nowTime / interval;
298-
if (currentFrame < now) {
299-
if (currentFrame == now - 1) {
300-
previousTime = currentTime; //
291+
private FrameWindow getWindow(long now) {
292+
var current = currentFrameNum.get();
293+
// first time in new frame
294+
if (current < now) {
295+
// only one thread will perform frame update, others spinWait
296+
if (updatingFrame.compareAndSet(false, true)) {
297+
final var moveOffset = now - current;
298+
final var newWindow = frameWindowRef.get().moveBy(moveOffset);
299+
frameWindowRef.set(newWindow);
300+
currentFrameNum.set(now);
301+
updatingFrame.set(false);
301302
} else {
302-
previousTime = ongoingTasks * interval;
303+
while (updatingFrame.get()) {
304+
Thread.onSpinWait();
305+
}
306+
// an edge case when all the following happen:
307+
// 1. window was stale, at least 1 frame
308+
// 2. two or more threads try to update window
309+
// 3. it's happening at the end of the frame, beginning new frame
310+
// for example, lets say interval is 10
311+
// and there are two concurrent calls getWindow(9)->frame0 and getWindow(10)->frame1
312+
// both need to update window, but those are different windows,
313+
// two things might happen:
314+
// 1. getWindow(9) updates window and uses it, but getWindow(10) need to update window again
315+
// 2. getWindow(10) updates window, then getWindow(9) will see a newer window, so we record task in a newer frame,
316+
// basically rounding-up frame when it's happening.
317+
if (currentFrameNum.get() < now) {
318+
return getWindow(now);
319+
}
303320
}
304-
currentTime = ongoingTasks * interval;
305-
currentFrame = now;
306321
}
322+
return frameWindowRef.get();
307323
}
308324

309325
/**
310326
* Start tracking new task, assume that task runs indefinitely, or at least till end of frame.
311327
* If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time.
312328
*/
313-
public synchronized void startTask() {
314-
var now = timeNow.get();
315-
updateFrame0(now);
316-
currentTime += (currentFrame + 1) * interval - now;
317-
++ongoingTasks;
329+
public void startTask() {
330+
final var nowTime = timeNow.get();
331+
final var now = nowTime / interval;
332+
final var frameWindow = getWindow(now);
333+
frameWindow.now().ongoingTasks.increment();
334+
frameWindow.now().startEndDiff.add((now + 1) * interval - nowTime);
318335
}
319336

320337
/**
321338
* Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time.
322339
*/
323-
public synchronized void endTask() {
324-
var now = timeNow.get();
325-
updateFrame0(now);
326-
currentTime -= (currentFrame + 1) * interval - now;
327-
--ongoingTasks;
340+
public void endTask() {
341+
final var nowTime = timeNow.get();
342+
final var now = nowTime / interval;
343+
final var frameWindow = getWindow(now);
344+
frameWindow.now().ongoingTasks.decrement();
345+
frameWindow.now().startEndDiff.add(-((now + 1) * interval - nowTime));
328346
}
329347

330348
/**
331-
* Returns previous frame total execution time.
349+
* Returns previous frame total execution time
332350
*/
333-
public synchronized long previousFrameTime() {
334-
updateFrame0(timeNow.get());
335-
return previousTime;
351+
public long previousFrameTime() {
352+
final var now = timeNow.get() / interval;
353+
final var frameWindow = getWindow(now);
354+
// total time is sum of ongoing tasks in frame N-1 and all starts and ends in N frame
355+
// so for the previous frame (now-1), it would be (now-2) ongoing tasks + (now -1) start/end tasks
356+
final var ongoingTasks = frameWindow.now(-2).ongoingTasks.sum();
357+
final var startEndDiff = frameWindow.now(-1).startEndDiff.sum();
358+
return ongoingTasks * interval + startEndDiff;
359+
}
360+
361+
/**
362+
* A single frame that tracks how many tasks are still running at the end of frame
363+
* and diffs from task start and end.
364+
*/
365+
record Frame(LongAdder ongoingTasks, LongAdder startEndDiff) {
366+
Frame() {
367+
this(new LongAdder(), new LongAdder());
368+
}
369+
}
370+
371+
/**
372+
* A frame window represent 3 consecutive frames. frames[0] is now, frames[1] is now-1.
373+
*/
374+
record FrameWindow(Frame[] frames) {
375+
FrameWindow() {
376+
this(new Frame(), new Frame(), new Frame());
377+
}
378+
379+
FrameWindow(Frame past2, Frame past1, Frame now) {
380+
this(new Frame[] { now, past1, past2 });
381+
}
382+
383+
FrameWindow {
384+
assert frames.length == 3;
385+
}
386+
387+
/**
388+
* Creates a new window by sliding current by moveFrames. If new window overlaps with current Frames are reused.
389+
* So there is no risk of losing data when start/end update frame in a past window.
390+
*/
391+
FrameWindow moveBy(long moveFrames) {
392+
// a new frame always starts with previous ongoing tasks
393+
final var ongoingTasks = now().ongoingTasks.sum();
394+
final FrameWindow newWindow;
395+
if (moveFrames == 1) {
396+
newWindow = new FrameWindow(now(-1), now(), new Frame());
397+
} else if (moveFrames == 2) {
398+
newWindow = new FrameWindow(now(), new Frame(), new Frame());
399+
} else {
400+
newWindow = new FrameWindow();
401+
}
402+
// propagate ongoing tasks to all new frames
403+
for (var newFrame = 0; newFrame < Math.min(moveFrames, 3); newFrame++) {
404+
newWindow.frames[newFrame].ongoingTasks.add(ongoingTasks);
405+
}
406+
return newWindow;
407+
}
408+
409+
Frame now() {
410+
return frames[0];
411+
}
412+
413+
Frame now(int offset) {
414+
assert offset >= -2 && offset <= 0;
415+
return frames[-offset];
416+
}
336417
}
337418
}
338419
}

0 commit comments

Comments
 (0)