Skip to content

Commit f3c81db

Browse files
committed
rwlock
1 parent 33173fb commit f3c81db

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import java.util.concurrent.RejectedExecutionHandler;
2828
import java.util.concurrent.ThreadFactory;
2929
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.concurrent.atomic.AtomicLong;
3231
import java.util.concurrent.atomic.LongAccumulator;
3332
import java.util.concurrent.atomic.LongAdder;
33+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3434
import java.util.function.Function;
3535
import java.util.function.Supplier;
3636

@@ -260,13 +260,13 @@ public boolean trackingMaxQueueLatency() {
260260
* Can be extended to remember multiple past frames.
261261
*/
262262
public static class FramedTimeTracker {
263-
final long interval;
263+
private final long interval;
264264
private final Supplier<Long> timeNow;
265+
private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
265266
private final AtomicLong ongoingTasks = new AtomicLong();
266267
private final AtomicLong currentFrame = new AtomicLong();
267268
private final AtomicLong currentTime = new AtomicLong();
268269
private final AtomicLong previousTime = new AtomicLong();
269-
private final AtomicBoolean updatingFrame = new AtomicBoolean();
270270

271271
// for testing
272272
public FramedTimeTracker(long intervalNano, Supplier<Long> timeNow) {
@@ -281,6 +281,10 @@ public FramedTimeTracker(long intervalNano, Supplier<Long> timeNow) {
281281
this.timeNow = System::nanoTime;
282282
}
283283

284+
public long interval() {
285+
return interval;
286+
}
287+
284288
/**
285289
* Update frames to current time. There are no guaranties that it will be invoked frequently.
286290
* For example when there are no tasks and no requests for previousFrameTime.
@@ -296,7 +300,10 @@ private void updateFrame0(long nowTime) {
296300
var now = nowTime / interval;
297301
var current = currentFrame.get();
298302
if (current < now) {
299-
if (updatingFrame.compareAndSet(false, true)) {
303+
rwlock.readLock().unlock();
304+
rwlock.writeLock().lock();
305+
current = currentFrame.get(); // make sure it didnt change during lock acquisition
306+
if (current < now) {
300307
var tasks = ongoingTasks.get();
301308
if (current == now - 1) {
302309
previousTime.set(currentTime.get());
@@ -305,12 +312,9 @@ private void updateFrame0(long nowTime) {
305312
}
306313
currentTime.set(tasks * interval);
307314
currentFrame.set(now);
308-
updatingFrame.set(false);
309-
} else {
310-
while (currentFrame.get() != now) {
311-
Thread.onSpinWait();
312-
}
313315
}
316+
rwlock.readLock().lock();
317+
rwlock.writeLock().unlock();
314318
}
315319
}
316320

@@ -319,28 +323,37 @@ private void updateFrame0(long nowTime) {
319323
* If task finishes sooner than end of interval {@link FramedTimeTracker#endTask()} will deduct remaining time.
320324
*/
321325
public void startTask() {
326+
rwlock.readLock().lock();
322327
var now = timeNow.get();
323328
updateFrame0(now);
324329
ongoingTasks.incrementAndGet();
325330
currentTime.updateAndGet((t) -> t + (currentFrame.get() + 1) * interval - now);
331+
rwlock.readLock().unlock();
326332
}
327333

328334
/**
329335
* Stop task tracking. We already assumed that task runs till end of frame, here we deduct not used time.
330336
*/
331337
public void endTask() {
338+
rwlock.readLock().lock();
332339
var now = timeNow.get();
333340
updateFrame0(now);
334341
ongoingTasks.decrementAndGet();
335342
currentTime.updateAndGet((t) -> t - (currentFrame.get() + 1) * interval + now);
343+
rwlock.readLock().unlock();
336344
}
337345

338346
/**
339347
* Returns previous frame total execution time.
340348
*/
341349
public long previousFrameTime() {
342-
updateFrame0(timeNow.get());
343-
return previousTime.get();
350+
try {
351+
rwlock.readLock().lock();
352+
updateFrame0(timeNow.get());
353+
return previousTime.get();
354+
} finally {
355+
rwlock.readLock().unlock();
356+
}
344357
}
345358
}
346359
}

server/src/test/java/org/elasticsearch/common/util/concurrent/FramedTimeTrackerTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@ public void testSingleFrameTask() {
4444
tracker.startTask();
4545
fakeTime.time += 10;
4646
tracker.endTask();
47-
fakeTime.time += tracker.interval;
47+
fakeTime.time += tracker.interval();
4848
assertEquals(10, tracker.previousFrameTime());
4949
}
5050

5151
public void testTwoFrameTask() {
5252
var tracker = newTracker(100);
5353
var startTime = between(0, 100);
54-
var taskDuration = tracker.interval;
54+
var taskDuration = tracker.interval();
5555
fakeTime.time += startTime;
5656
tracker.startTask();
5757
fakeTime.time += taskDuration;
5858
tracker.endTask();
59-
assertEquals(tracker.interval - startTime, tracker.previousFrameTime());
59+
assertEquals(tracker.interval() - startTime, tracker.previousFrameTime());
6060
}
6161

6262
public void testMultiFrameTask() {
@@ -66,16 +66,16 @@ public void testMultiFrameTask() {
6666
var taskDuration = between(3, 100) * interval;
6767
fakeTime.time += taskDuration;
6868
tracker.endTask();
69-
assertEquals(tracker.interval, tracker.previousFrameTime());
69+
assertEquals(tracker.interval(), tracker.previousFrameTime());
7070
}
7171

7272
public void testOngoingTask() {
7373
var interval = 10;
7474
var tracker = newTracker(interval);
7575
tracker.startTask();
7676
for (int i = 0; i < between(10, 100); i++) {
77-
fakeTime.time += tracker.interval;
78-
assertEquals(tracker.interval, tracker.previousFrameTime());
77+
fakeTime.time += tracker.interval();
78+
assertEquals(tracker.interval(), tracker.previousFrameTime());
7979
}
8080
}
8181

0 commit comments

Comments
 (0)