Skip to content

Commit b8f301c

Browse files
authored
fix: DH-18395: Prefer bulk-unmanage whenever LivenessManager.unmanage is called on multiple referents (#6557)
* Make all looped LivenessManager.unmanage calls use the bulk variant, and improve documentation for the single variant to prompt use of the bulk variant * Document TableLocationSubscriptionBuffer.LocationUpdate a bit better
1 parent 75187f9 commit b8f301c

File tree

6 files changed

+62
-19
lines changed

6 files changed

+62
-19
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
9797
@SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes.
9898
private final Runnable processNewLocationsUpdateRoot;
9999

100-
private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsComitter;
100+
private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsCommitter;
101101
private List<Table> removedConstituents = null;
102102

103103
private UnderlyingTableMaintainer(
@@ -156,12 +156,12 @@ protected void instrumentedRefresh() {
156156
};
157157
refreshCombiner.addSource(processNewLocationsUpdateRoot);
158158

159-
this.removedLocationsComitter = new UpdateCommitter<>(
159+
this.removedLocationsCommitter = new UpdateCommitter<>(
160160
this,
161161
result.getUpdateGraph(),
162162
ignored -> {
163163
Assert.neqNull(removedConstituents, "removedConstituents");
164-
removedConstituents.forEach(result::unmanage);
164+
result.unmanage(removedConstituents.stream());
165165
removedConstituents = null;
166166
});
167167
processPendingLocations(false);
@@ -170,7 +170,7 @@ protected void instrumentedRefresh() {
170170
pendingLocationStates = null;
171171
readyLocationStates = null;
172172
processNewLocationsUpdateRoot = null;
173-
removedLocationsComitter = null;
173+
removedLocationsCommitter = null;
174174
tableLocationProvider.refresh();
175175

176176
final Collection<TableLocation> locations = new ArrayList<>();
@@ -203,7 +203,8 @@ private QueryTable result() {
203203
private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> locations) {
204204
final long initialLastRowKey = resultRows.lastRowKey();
205205
final MutableLong lastInsertedRowKey = new MutableLong(initialLastRowKey);
206-
locations.sorted(Comparator.comparing(TableLocation::getKey)).forEach(tl -> {
206+
// Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
207+
unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
207208
final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
208209
final Table constituentTable = makeConstituentTable(tl);
209210

@@ -216,7 +217,7 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
216217
if (result.isRefreshing()) {
217218
result.manage(constituentTable);
218219
}
219-
});
220+
}));
220221
return initialLastRowKey == lastInsertedRowKey.get()
221222
? RowSetFactory.empty()
222223
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
@@ -235,7 +236,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
235236
// Transfer management to the constituent CSM. NOTE: this is likely to end up double-managed
236237
// after the CSM adds the location to the table, but that's acceptable.
237238
constituent.columnSourceManager.manage(tableLocation);
238-
unmanage(tableLocation);
239+
// Note that the caller is now responsible for unmanaging tableLocation on behalf of this.
239240

240241
// Be careful to propagate the systemic attribute properly to child tables
241242
constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject());
@@ -293,8 +294,12 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
293294
readyLocationStates.offer(pendingLocationState);
294295
}
295296
}
296-
final RowSet added = sortAndAddLocations(readyLocationStates.stream()
297-
.map(PendingLocationState::release));
297+
298+
if (readyLocationStates.isEmpty()) {
299+
return RowSetFactory.empty();
300+
}
301+
302+
final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
298303
readyLocationStates.clearFast();
299304
return added;
300305
}
@@ -312,14 +317,23 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
312317
}
313318

314319
// Iterate through the pending locations and remove any that are in the removed set.
320+
List<LivenessReferent> toUnmanage = null;
315321
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
316322
final PendingLocationState pendingLocationState = iter.next();
317323
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
318324
iter.remove();
319-
// Release the state and unmanage the location
320-
unmanage(pendingLocationState.release());
325+
// Release the state and plan to unmanage the location
326+
if (toUnmanage == null) {
327+
toUnmanage = new ArrayList<>();
328+
}
329+
toUnmanage.add(pendingLocationState.release());
321330
}
322331
}
332+
if (toUnmanage != null) {
333+
unmanage(toUnmanage.stream());
334+
// noinspection UnusedAssignment
335+
toUnmanage = null;
336+
}
323337

324338
// At the end of the cycle we need to make sure we unmanage any removed constituents.
325339
this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size());
@@ -350,7 +364,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
350364
removedConstituents = null;
351365
return RowSetFactory.empty();
352366
}
353-
this.removedLocationsComitter.maybeActivate();
367+
this.removedLocationsCommitter.maybeActivate();
354368

355369
final WritableRowSet deletedRows = deleteBuilder.build();
356370
resultTableLocationKeys.setNull(deletedRows);

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocationProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,9 @@ protected void endTransaction(@NotNull final Object token) {
331331
}
332332
// Release the keys that were removed after we have delivered the notifications and the
333333
// subscribers have had a chance to process them
334-
removedKeys.forEach(livenessManager::unmanage);
334+
if (!removedKeys.isEmpty()) {
335+
livenessManager.unmanage(removedKeys.stream());
336+
}
335337
}
336338

337339
/**

engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/TableLocationSubscriptionBuffer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ private LocationUpdate() {
5454

5555
private void processAdd(@NotNull final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier) {
5656
final ImmutableTableLocationKey addedKey = addedKeySupplier.get();
57+
// Note that we might have a remove for this key if it previously existed and is being replaced. Hence, we
58+
// don't look for an existing remove, which is apparently asymmetric w.r.t. processRemove but still correct.
59+
// Consumers of a LocationUpdate must process removes before adds.
60+
5761
// Need to verify that we don't have stacked adds (without intervening removes).
5862
if (added.containsKey(addedKey)) {
5963
throw new IllegalStateException("TableLocationKey " + addedKey
@@ -99,10 +103,16 @@ private void processTransaction(
99103
}
100104
}
101105

106+
/**
107+
* @return The pending location keys to add. <em>Note that removes should be processed before adds.</em>
108+
*/
102109
public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingAddedLocationKeys() {
103110
return added.values();
104111
}
105112

113+
/**
114+
* @return The pending location keys to remove. <em>Note that removes should be processed before adds.</em>
115+
*/
106116
public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingRemovedLocationKeys() {
107117
return removed.values();
108118
}

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import io.deephaven.base.verify.Assert;
77
import io.deephaven.engine.context.ExecutionContext;
8+
import io.deephaven.engine.liveness.LivenessReferent;
89
import io.deephaven.engine.rowset.*;
910
import io.deephaven.engine.rowset.RowSetFactory;
1011
import io.deephaven.engine.table.*;
@@ -323,12 +324,16 @@ private final class ChangeProcessingContext implements SafeCloseable {
323324
* {@link #resultRows}. The truncating constituent and following will need to insert their entire shifted row
324325
* set, and must update the next slot in {@link #currFirstRowKeys}.
325326
*/
326-
boolean slotAllocationChanged;
327+
private boolean slotAllocationChanged;
327328
/**
328329
* The first key after which we began inserting shifted constituent row sets instead of trying for piecemeal
329330
* updates.
330331
*/
331-
long firstTruncatedResultKey;
332+
private long firstTruncatedResultKey;
333+
/**
334+
* Removed constituent listeners to bulk-unmanage.
335+
*/
336+
private List<LivenessReferent> toUnmanage;
332337

333338
private ChangeProcessingContext(@NotNull final TableUpdate constituentChanges) {
334339
modifiedColumnSet.clear();
@@ -388,7 +393,10 @@ public void close() {
388393
final SafeCloseable ignored3 = removedValues;
389394
final SafeCloseable ignored4 = addedKeys;
390395
final SafeCloseable ignored5 = modifiedKeys;
391-
final SafeCloseable ignored6 = modifiedPreviousValues) {
396+
final SafeCloseable ignored6 = modifiedPreviousValues;
397+
final SafeCloseable ignored7 = toUnmanage == null
398+
? null
399+
: () -> mergedListener.unmanage(toUnmanage.stream())) {
392400
}
393401
// @formatter:on
394402
}
@@ -504,7 +512,10 @@ private void processRemove(@NotNull final Table removedConstituent) {
504512
listeners.remove();
505513
}
506514
removedConstituent.removeUpdateListener(nextListener);
507-
mergedListener.unmanage(nextListener);
515+
if (toUnmanage == null) {
516+
toUnmanage = new ArrayList<>();
517+
}
518+
toUnmanage.add(nextListener);
508519
advanceListener();
509520
}
510521
final long firstRemovedKey = prevFirstRowKeys[nextPreviousSlot];

engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,10 @@ protected void destroy() {
239239
private synchronized void invalidateAndRelease() {
240240
invalidatedLocations.forEach(IncludedTableLocationEntry::invalidate);
241241
invalidatedLocations.clear();
242-
releasedLocations.forEach(this::unmanage);
243-
releasedLocations.clear();
242+
if (!releasedLocations.isEmpty()) {
243+
unmanage(releasedLocations.stream());
244+
releasedLocations.clear();
245+
}
244246
}
245247

246248
@Override

engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ default void manage(@NotNull final LivenessReferent referent) {
4242
/**
4343
* If this manager manages {@code referent} one or more times, drop one such reference. If this manager is also a
4444
* {@link LivenessReferent}, then it must also be live.
45+
* <p>
46+
* <em>Strongly prefer using {@link #unmanage(Stream)} when multiple referents should be unmanaged.</em>
4547
*
4648
* @param referent The referent to drop
4749
*/
@@ -55,6 +57,8 @@ default void unmanage(@NotNull LivenessReferent referent) {
5557
/**
5658
* If this manager manages referent one or more times, drop one such reference. If this manager is also a
5759
* {@link LivenessReferent}, then this method is a no-op if {@code this} is not live.
60+
* <p>
61+
* <em>Strongly prefer using {@link #tryUnmanage(Stream)}} when multiple referents should be unmanaged.</em>
5862
*
5963
* @param referent The referent to drop
6064
* @return If this node is also a {@link LivenessReferent}, whether this node was live and thus in fact tried to

0 commit comments

Comments
 (0)