Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ private void flushTerminalNotifications() {
runNotification(notificationForThisThread);
}

// We can not proceed until all of the terminal notifications have executed.
// We can not proceed until all the terminal notifications have executed.
notificationProcessor.doAllWork();
}

Expand Down Expand Up @@ -836,6 +836,10 @@ private void computeStatsAndLogCycle(final long cycleTimeNanos) {
}
}

void reportLockWaitNanos(final long lockWaitNanos) {
currentCycleLockWaitTotalNanos += lockWaitNanos;
}

/**
* Is the provided cycle time on budget?
*
Expand Down Expand Up @@ -907,7 +911,7 @@ void refreshAllTables() {
private void doRefresh(@NotNull final Runnable refreshFunction) {
final long lockStartTimeNanos = System.nanoTime();
exclusiveLock().doLocked(() -> {
currentCycleLockWaitTotalNanos += System.nanoTime() - lockStartTimeNanos;
reportLockWaitNanos(System.nanoTime() - lockStartTimeNanos);
if (!running) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,22 @@ public int parallelismFactor() {
*/
@Override
public void requestRefresh() {
maybeStart();
// do the work to refresh everything, on this thread
isUpdateThread.set(true);
try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
refreshAllTables();
} finally {
isUpdateThread.remove();
if (isUpdateThread.get()) {
throw new IllegalStateException("Cannot request a refresh from an update thread");
}
maybeStart();
// Do the work to refresh everything, driven by this thread. Note that we acquire the lock "early" in order to
// avoid any inconsistencies w.r.t. assumptions about clock, lock, and update-thread state.
final long lockStartTimeNanos = System.nanoTime();
exclusiveLock().doLocked(() -> {
reportLockWaitNanos(System.nanoTime() - lockStartTimeNanos);
isUpdateThread.set(true);
try (final SafeCloseable ignored = ExecutionContext.newBuilder().setUpdateGraph(this).build().open()) {
refreshAllTables();
} finally {
isUpdateThread.remove();
}
});
final long nowNanos = System.nanoTime();
synchronized (this) {
maybeFlushUpdatePerformance(nowNanos, nowNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.engine.table.impl.sources.LongSingleValueSource;
import io.deephaven.engine.testutil.TstUtils;
import io.deephaven.engine.updategraph.TerminalNotification;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.util.TableTools;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ReflexiveUse;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableInt;
import org.junit.*;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

import static io.deephaven.engine.context.TestExecutionContext.OPERATION_INITIALIZATION;
import static io.deephaven.engine.util.TableTools.*;
Expand Down Expand Up @@ -165,6 +170,57 @@ public void testSimpleModify() {
}
}

@Test
public void testRefreshRace() throws ExecutionException, InterruptedException, TimeoutException {
final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build();
final List<Runnable> retainedReferences = new ArrayList<>();

final MutableInt sourceRefreshCount = new MutableInt(0);
final Runnable sleepingSource = () -> {
try {
Thread.sleep(100);
sourceRefreshCount.increment();
} catch (InterruptedException e) {
Assert.fail("Interrupted while sleeping");
}
};
retainedReferences.add(sleepingSource);
eventDrivenUpdateGraph.addSource(sleepingSource);

final int numConcurrentRefreshes = 10;
final Future<?>[] refreshFutures = new Future[numConcurrentRefreshes];
final ExecutorService executor = Executors.newFixedThreadPool(numConcurrentRefreshes);
try {
for (int cri = 0; cri < numConcurrentRefreshes; ++cri) {
refreshFutures[cri] = executor.submit(eventDrivenUpdateGraph::requestRefresh);
Thread.sleep(10);
}
for (final Future<?> refreshFuture : refreshFutures) {
refreshFuture.get(10, TimeUnit.SECONDS);
}
} finally {
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
}

Assert.assertEquals(numConcurrentRefreshes, sourceRefreshCount.intValue());
Assert.assertEquals(sleepingSource, retainedReferences.get(0));
}

@Test
public void testIllegalRefresh() {
final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build();

eventDrivenUpdateGraph.addNotification(new TerminalNotification() {
@Override
public void run() {
Assert.assertThrows(IllegalStateException.class, eventDrivenUpdateGraph::requestRefresh);
}
});

eventDrivenUpdateGraph.requestRefresh();
}

@Test
public void testUpdatePerformanceTracker() {
final Table upt = UpdatePerformanceTracker.getQueryTable();
Expand Down