Skip to content

Commit 664c402

Browse files
lbooker42jmao-denvermargaretkennedy
authored
fix: DH-20625: SelectAndViewAnalyzer track barriers for real and aliased columns (#7328)
This PR fixes issue [DH-20625](https://deephaven.atlassian.net/browse/DH-20625) by ensuring that the SelectAndViewAnalyzer properly tracks barriers for both real and aliased columns. Previously, PreserveColumnLayer instances were added directly to the context without registering their declared barriers in the barrierToLayerIndex map, which could lead to incorrect barrier tracking. [DH-20625]: https://deephaven.atlassian.net/browse/DH-20625?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --------- Co-authored-by: Jianfeng Mao <[email protected]> Co-authored-by: margaretkennedy <[email protected]>
1 parent 56ad116 commit 664c402

File tree

3 files changed

+72
-3
lines changed

3 files changed

+72
-3
lines changed

engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectAndViewAnalyzer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,15 @@ public static AnalyzerContext createContext(
240240
final SourceColumn realColumn = sc.maybeGetSourceColumn().orElse(null);
241241
if (realColumn != null) {
242242
if (shouldPreserve(sc.getDataView())) {
243-
context.addLayer(new PreserveColumnLayer(context, sc, sc.getDataView(), distinctDeps, mcsBuilder));
243+
addDeclaredBarriersToMap(sc, barrierToLayerIndex,
244+
new PreserveColumnLayer(context, sc, sc.getDataView(), distinctDeps, mcsBuilder), context);
244245
continue;
245246
}
246247
// look for an existing alias that can be preserved instead
247248
final ColumnSource<?> alias = resultAlias.get(realColumn.getSourceName());
248249
if (alias != null) {
249-
context.addLayer(new PreserveColumnLayer(context, sc, alias, distinctDeps, mcsBuilder));
250+
addDeclaredBarriersToMap(sc, barrierToLayerIndex,
251+
new PreserveColumnLayer(context, sc, alias, distinctDeps, mcsBuilder), context);
250252
continue;
251253
}
252254
}

engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableSelectBarrierTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,74 @@ public void testMergingSelectable() {
648648
assertEquals(ss2.respectedBarriers().length, ss2.withRespectedBarriers(c, c).respectedBarriers().length);
649649
}
650650

651+
@Test
652+
public void testDH20625() {
653+
final Table source = TableTools.emptyTable(1_000)
654+
.update("a = ii % 5", "b = ii % 11", "c = ii % 17", "d = ii");
655+
656+
final Object b1 = new Object();
657+
final Object b2 = new Object();
658+
659+
Table result;
660+
661+
// update with a, c passed as real columns
662+
result = source.update(List.of(
663+
Selectable.parse("a").withDeclaredBarriers(b1, b2),
664+
Selectable.parse("c").withRespectedBarriers(b2).withSerial(),
665+
Selectable.parse("Sum = a + b + c").withRespectedBarriers(b1)));
666+
// simple test, verify we have the same number of rows
667+
assertEquals(source.size(), result.size());
668+
669+
// select with a, c passed as real columns
670+
result = source.select(List.of(
671+
Selectable.parse("a").withDeclaredBarriers(b1, b2),
672+
Selectable.parse("c").withRespectedBarriers(b2).withSerial(),
673+
Selectable.parse("Sum = a + b + c").withRespectedBarriers(b1)));
674+
// simple test, verify we have the same number of rows
675+
assertEquals(source.size(), result.size());
676+
}
677+
678+
@Test
679+
public void testDH20625_proxy() {
680+
// Create a partitioned table with 4 partitions
681+
final Table source = TableTools.emptyTable(1_000)
682+
.update("a = ii % 5", "b = ii % 11", "c = ii % 17", "d = ii");
683+
final PartitionedTable pt = source.partitionBy("a");
684+
685+
final Object b1 = new Object();
686+
final Object b2 = new Object();
687+
688+
PartitionedTable.Proxy result_proxy;
689+
// update test
690+
result_proxy = pt.proxy().update(List.of(
691+
Selectable.parse("a").withDeclaredBarriers(b1, b2),
692+
Selectable.parse("c").withRespectedBarriers(b2).withSerial(),
693+
Selectable.parse("Sum = a + b + c").withRespectedBarriers(b1)));
694+
assertEquals(pt.constituents().length, result_proxy.target().constituents().length);
695+
for (int i = 0; i < pt.constituents().length; i++) {
696+
final Table ct = pt.constituents()[i];
697+
final Table rct = result_proxy.target().constituents()[i];
698+
699+
// simple test, verify we have the same number of rows
700+
assertEquals(ct.size(), rct.size());
701+
}
702+
703+
// select test
704+
result_proxy = pt.proxy().select(List.of(
705+
Selectable.parse("a").withDeclaredBarriers(b1, b2),
706+
Selectable.parse("c").withRespectedBarriers(b2).withSerial(),
707+
Selectable.parse("Sum = a + b + c").withRespectedBarriers(b1)));
708+
assertEquals(pt.constituents().length, result_proxy.target().constituents().length);
709+
for (int i = 0; i < pt.constituents().length; i++) {
710+
final Table ct = pt.constituents()[i];
711+
final Table rct = result_proxy.target().constituents()[i];
712+
713+
// simple test, verify we have the same number of rows
714+
assertEquals(ct.size(), rct.size());
715+
}
716+
}
717+
718+
651719
private static void checkIndividualSums(int size, Table u) {
652720
final long expectedSumA = (((long) size - 1) * size) / 2;
653721
final Table us = u.aggBy(AggSum("A", "B"));

py/server/tests/test_pt_proxy.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,6 @@ def my_fn(vals):
365365
ptp = agg_by_formula()
366366
self.assertIsNotNone(ptp)
367367

368-
@unittest.skip("https://deephaven.atlassian.net/browse/DH-20625")
369368
def test_update_select_concurrency_control(self):
370369
ops = [
371370
PartitionedTableProxy.update,

0 commit comments

Comments
 (0)