Skip to content

Commit d90c674

Browse files
committed
EventDrivenUpdateGraph: Allow two concurrent threads to safely requestRefresh()
1 parent 5f62c5b commit d90c674

File tree

3 files changed

+59
-9
lines changed

3 files changed

+59
-9
lines changed

engine/table/src/main/java/io/deephaven/engine/updategraph/impl/BaseUpdateGraph.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ private void flushTerminalNotifications() {
629629
runNotification(notificationForThisThread);
630630
}
631631

632-
// We can not proceed until all of the terminal notifications have executed.
632+
// We can not proceed until all the terminal notifications have executed.
633633
notificationProcessor.doAllWork();
634634
}
635635

@@ -836,6 +836,10 @@ private void computeStatsAndLogCycle(final long cycleTimeNanos) {
836836
}
837837
}
838838

839+
void reportLockWaitNanos(final long lockWaitNanos) {
840+
currentCycleLockWaitTotalNanos += lockWaitNanos;
841+
}
842+
839843
/**
840844
* Is the provided cycle time on budget?
841845
*
@@ -907,7 +911,7 @@ void refreshAllTables() {
907911
private void doRefresh(@NotNull final Runnable refreshFunction) {
908912
final long lockStartTimeNanos = System.nanoTime();
909913
exclusiveLock().doLocked(() -> {
910-
currentCycleLockWaitTotalNanos += System.nanoTime() - lockStartTimeNanos;
914+
reportLockWaitNanos(System.nanoTime() - lockStartTimeNanos);
911915
if (!running) {
912916
return;
913917
}

engine/table/src/main/java/io/deephaven/engine/updategraph/impl/EventDrivenUpdateGraph.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,23 @@ public int parallelismFactor() {
5454
@Override
5555
public void requestRefresh() {
5656
maybeStart();
57-
// do the work to refresh everything, on this thread
58-
isUpdateThread.set(true);
59-
try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
60-
refreshAllTables();
61-
} finally {
62-
isUpdateThread.remove();
63-
}
57+
// Do the work to refresh everything, driven by this thread. Note that we acquire the lock "early" in order to
58+
// avoid any inconsistencies w.r.t. assumptions about clock, lock, and update-thread state.
59+
final long lockStartTimeNanos = System.nanoTime();
60+
exclusiveLock().doLocked(() -> {
61+
reportLockWaitNanos(System.nanoTime() - lockStartTimeNanos);
62+
final boolean wasUpdateThread = isUpdateThread.get();
63+
if (!wasUpdateThread) {
64+
isUpdateThread.set(true);
65+
}
66+
try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
67+
refreshAllTables();
68+
} finally {
69+
if (!wasUpdateThread) {
70+
isUpdateThread.remove();
71+
}
72+
}
73+
});
6474
final long nowNanos = System.nanoTime();
6575
synchronized (this) {
6676
maybeFlushUpdatePerformance(nowNanos, nowNanos);

engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import io.deephaven.util.SafeCloseable;
2323
import io.deephaven.util.annotations.ReflexiveUse;
2424
import junit.framework.TestCase;
25+
import org.apache.commons.lang3.mutable.MutableInt;
2526
import org.junit.*;
2627

2728
import java.nio.file.Path;
2829
import java.util.Collections;
30+
import java.util.concurrent.*;
2931

3032
import static io.deephaven.engine.context.TestExecutionContext.OPERATION_INITIALIZATION;
3133
import static io.deephaven.engine.util.TableTools.*;
@@ -165,6 +167,40 @@ public void testSimpleModify() {
165167
}
166168
}
167169

170+
@Test
171+
public void testRefreshRace() throws ExecutionException, InterruptedException, TimeoutException {
172+
final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build();
173+
174+
final MutableInt sourceRefreshCount = new MutableInt(0);
175+
final Runnable sleepingSource = () -> {
176+
try {
177+
Thread.sleep(100);
178+
sourceRefreshCount.increment();
179+
} catch (InterruptedException e) {
180+
Assert.fail("Interrupted while sleeping");
181+
}
182+
};
183+
eventDrivenUpdateGraph.addSource(sleepingSource);
184+
185+
final int numConcurrentRefreshes = 10;
186+
final Future<?>[] refreshFutures = new Future[numConcurrentRefreshes];
187+
final ExecutorService executor = Executors.newFixedThreadPool(numConcurrentRefreshes);
188+
try {
189+
for (int cri = 0; cri < numConcurrentRefreshes; ++cri) {
190+
refreshFutures[cri] = executor.submit(eventDrivenUpdateGraph::requestRefresh);
191+
Thread.sleep(10);
192+
}
193+
for (final Future<?> refreshFuture : refreshFutures) {
194+
refreshFuture.get();// 1, TimeUnit.SECONDS);
195+
}
196+
} finally {
197+
executor.shutdown();
198+
Assert.assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
199+
}
200+
201+
Assert.assertEquals(numConcurrentRefreshes, sourceRefreshCount.intValue());
202+
}
203+
168204
@Test
169205
public void testUpdatePerformanceTracker() {
170206
final Table upt = UpdatePerformanceTracker.getQueryTable();

0 commit comments

Comments
 (0)