Skip to content

Commit 5a3fa99

Browse files
authored
fix issue with projection __time equivalent column mapping (#18215)
* fix issue with projection __time equivalent column mapping * better names
1 parent e5469a0 commit 5a3fa99

File tree

3 files changed

+92
-31
lines changed

3 files changed

+92
-31
lines changed

processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,6 @@ public Projections.ProjectionMatch matches(
303303
}
304304
Projections.ProjectionMatchBuilder matchBuilder = new Projections.ProjectionMatchBuilder();
305305

306-
if (timeColumnName != null) {
307-
matchBuilder.remapColumn(timeColumnName, ColumnHolder.TIME_COLUMN_NAME)
308-
.addReferencedPhysicalColumn(ColumnHolder.TIME_COLUMN_NAME);
309-
}
310306
final List<String> queryGrouping = queryCursorBuildSpec.getGroupingColumns();
311307
if (queryGrouping != null) {
312308
for (String queryColumn : queryGrouping) {
@@ -398,13 +394,19 @@ private Projections.ProjectionMatchBuilder matchRequiredColumn(
398394
// check to see if we have an equivalent virtual column defined in the projection, if so we can
399395
final VirtualColumn projectionEquivalent = virtualColumns.findEquivalent(buildSpecVirtualColumn);
400396
if (projectionEquivalent != null) {
401-
if (!buildSpecVirtualColumn.getOutputName().equals(projectionEquivalent.getOutputName())) {
397+
final String remapColumnName;
398+
if (Objects.equals(projectionEquivalent.getOutputName(), timeColumnName)) {
399+
remapColumnName = ColumnHolder.TIME_COLUMN_NAME;
400+
} else {
401+
remapColumnName = projectionEquivalent.getOutputName();
402+
}
403+
if (!buildSpecVirtualColumn.getOutputName().equals(remapColumnName)) {
402404
matchBuilder.remapColumn(
403405
buildSpecVirtualColumn.getOutputName(),
404-
projectionEquivalent.getOutputName()
406+
remapColumnName
405407
);
406408
}
407-
return matchBuilder.addReferencedPhysicalColumn(projectionEquivalent.getOutputName());
409+
return matchBuilder.addReferencedPhysicalColumn(remapColumnName);
408410
}
409411

410412
matchBuilder.addReferenceedVirtualColumn(buildSpecVirtualColumn);
@@ -417,8 +419,12 @@ private Projections.ProjectionMatchBuilder matchRequiredColumn(
417419
if (virtualGranularity.isFinerThan(granularity)) {
418420
return null;
419421
}
420-
return matchBuilder.remapColumn(column, ColumnHolder.TIME_COLUMN_NAME)
421-
.addReferencedPhysicalColumn(ColumnHolder.TIME_COLUMN_NAME);
422+
// same granularity, replace virtual column directly by remapping it to the physical column
423+
if (granularity.equals(virtualGranularity)) {
424+
return matchBuilder.remapColumn(column, ColumnHolder.TIME_COLUMN_NAME)
425+
.addReferencedPhysicalColumn(ColumnHolder.TIME_COLUMN_NAME);
426+
}
427+
return matchBuilder.addReferencedPhysicalColumn(ColumnHolder.TIME_COLUMN_NAME);
422428
} else {
423429
// anything else with __time requires none granularity
424430
if (Granularities.NONE.equals(granularity)) {

processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public Cursor asCursor()
7373
spec.getQueryMetrics().vectorized(false);
7474
}
7575

76-
IncrementalIndexRowHolder currentRow = new IncrementalIndexRowHolder();
76+
final IncrementalIndexRowHolder currentRow = new IncrementalIndexRowHolder();
7777
return new IncrementalIndexCursor(
7878
rowSelector,
7979
currentRow,

processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public static List<InputRow> makeRows(List<String> dimensions)
212212
}
213213
),
214214
new AggregateProjectionSpec(
215-
"ab_daily",
215+
"ab",
216216
null,
217217
Arrays.asList(
218218
new StringDimensionSchema("a"),
@@ -221,7 +221,7 @@ public static List<InputRow> makeRows(List<String> dimensions)
221221
null
222222
),
223223
new AggregateProjectionSpec(
224-
"abfoo_daily",
224+
"abfoo",
225225
VirtualColumns.create(
226226
new ExpressionVirtualColumn(
227227
"bfoo",
@@ -278,7 +278,7 @@ public static List<InputRow> makeRows(List<String> dimensions)
278278
}
279279
),
280280
new AggregateProjectionSpec(
281-
"afoo_daily",
281+
"afoo",
282282
VirtualColumns.create(
283283
new ExpressionVirtualColumn(
284284
"afoo",
@@ -322,7 +322,7 @@ public static List<InputRow> makeRows(List<String> dimensions)
322322
))
323323
.collect(Collectors.toList());
324324

325-
@Parameterized.Parameters(name = "name: {0}, sortByDim: {5}, autoSchema: {6}")
325+
@Parameterized.Parameters(name = "name: {0}, segmentTimeOrdered: {5}, autoSchema: {6}")
326326
public static Collection<?> constructorFeeder()
327327
{
328328
final List<Object[]> constructors = new ArrayList<>();
@@ -405,7 +405,7 @@ public static Collection<?> constructorFeeder()
405405
new IncrementalIndexTimeBoundaryInspector(index),
406406
new IncrementalIndexCursorFactory(rollupIndex),
407407
new IncrementalIndexTimeBoundaryInspector(rollupIndex),
408-
sortByDim,
408+
!sortByDim,
409409
autoSchema
410410
});
411411
} else {
@@ -419,7 +419,7 @@ public static Collection<?> constructorFeeder()
419419
QueryableIndexTimeBoundaryInspector.create(index),
420420
new QueryableIndexCursorFactory(rollupIndex),
421421
QueryableIndexTimeBoundaryInspector.create(rollupIndex),
422-
sortByDim,
422+
!sortByDim,
423423
autoSchema
424424
});
425425
}
@@ -446,7 +446,7 @@ public static void cleanup() throws IOException
446446
private final TimeseriesQueryEngine timeseriesEngine;
447447

448448
private final NonBlockingPool<ByteBuffer> nonBlockingPool;
449-
public final boolean sortByDim;
449+
public final boolean segmentSortedByTime;
450450
public final boolean autoSchema;
451451

452452
@Rule
@@ -458,15 +458,15 @@ public CursorFactoryProjectionTest(
458458
TimeBoundaryInspector projectionsTimeBoundaryInspector,
459459
CursorFactory rollupProjectionsCursorFactory,
460460
TimeBoundaryInspector rollupProjectionsTimeBoundaryInspector,
461-
boolean sortByDim,
461+
boolean segmentSortedByTime,
462462
boolean autoSchema
463463
)
464464
{
465465
this.projectionsCursorFactory = projectionsCursorFactory;
466466
this.projectionsTimeBoundaryInspector = projectionsTimeBoundaryInspector;
467467
this.rollupProjectionsCursorFactory = rollupProjectionsCursorFactory;
468468
this.rollupProjectionsTimeBoundaryInspector = rollupProjectionsTimeBoundaryInspector;
469-
this.sortByDim = sortByDim;
469+
this.segmentSortedByTime = segmentSortedByTime;
470470
this.autoSchema = autoSchema;
471471
this.nonBlockingPool = closer.closeLater(
472472
new CloseableStupidPool<>(
@@ -584,7 +584,7 @@ public void testProjectionSelectionTwoDimsVirtual()
584584
10
585585
)
586586
)
587-
.setContext(ImmutableMap.of(QueryContexts.USE_PROJECTION, "abfoo_daily"))
587+
.setContext(ImmutableMap.of(QueryContexts.USE_PROJECTION, "abfoo"))
588588
.build();
589589

590590
final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, null);
@@ -964,16 +964,16 @@ public void testQueryGranularityFinerThanProjectionGranularity()
964964
.setDataSource("test")
965965
.setInterval(Intervals.ETERNITY)
966966
.addAggregator(new LongSumAggregatorFactory("c_sum", "c"));
967-
if (sortByDim) {
967+
if (segmentSortedByTime) {
968+
queryBuilder.addDimension("a")
969+
.setGranularity(Granularities.MINUTE);
970+
} else {
968971
queryBuilder.setVirtualColumns(Granularities.toVirtualColumn(Granularities.MINUTE, "__gran"))
969972
.setDimensions(
970973
DefaultDimensionSpec.of("__gran", ColumnType.LONG),
971974
DefaultDimensionSpec.of("a")
972975
)
973976
.setGranularity(Granularities.ALL);
974-
} else {
975-
queryBuilder.addDimension("a")
976-
.setGranularity(Granularities.MINUTE);
977977
}
978978
final GroupByQuery query = queryBuilder.build();
979979
final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, null);
@@ -998,7 +998,7 @@ public void testQueryGranularityFinerThanProjectionGranularity()
998998
final List<ResultRow> results = resultRows.toList();
999999
Assert.assertEquals(8, results.size());
10001000

1001-
if (sortByDim && projectionsCursorFactory instanceof QueryableIndexCursorFactory) {
1001+
if (!segmentSortedByTime && projectionsCursorFactory instanceof QueryableIndexCursorFactory) {
10021002
// this sorts funny when not time ordered
10031003
Set<Object[]> resultsInNoParticularOrder = makeArrayResultSet();
10041004
resultsInNoParticularOrder.addAll(
@@ -1053,16 +1053,16 @@ public void testQueryGranularityFitsProjectionGranularity()
10531053
.setDataSource("test")
10541054
.setInterval(Intervals.ETERNITY)
10551055
.addAggregator(new LongSumAggregatorFactory("c_sum", "c"));
1056-
if (sortByDim) {
1056+
if (segmentSortedByTime) {
1057+
queryBuilder.addDimension("a")
1058+
.setGranularity(Granularities.HOUR);
1059+
} else {
10571060
queryBuilder.setGranularity(Granularities.ALL)
10581061
.setDimensions(
10591062
DefaultDimensionSpec.of("__gran", ColumnType.LONG),
10601063
DefaultDimensionSpec.of("a")
10611064
)
10621065
.setVirtualColumns(Granularities.toVirtualColumn(Granularities.HOUR, "__gran"));
1063-
} else {
1064-
queryBuilder.addDimension("a")
1065-
.setGranularity(Granularities.HOUR);
10661066
}
10671067
final GroupByQuery query = queryBuilder.build();
10681068
final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, null);
@@ -1086,7 +1086,7 @@ public void testQueryGranularityFitsProjectionGranularity()
10861086

10871087
final List<ResultRow> results = resultRows.toList();
10881088
Assert.assertEquals(3, results.size());
1089-
if (sortByDim && projectionsCursorFactory instanceof QueryableIndexCursorFactory) {
1089+
if (!segmentSortedByTime && projectionsCursorFactory instanceof QueryableIndexCursorFactory) {
10901090
Set<Object[]> resultsInNoParticularOrder = makeArrayResultSet(
10911091
new Object[]{TIMESTAMP.getMillis(), "a", 4L},
10921092
new Object[]{TIMESTAMP.getMillis(), "b", 12L},
@@ -1102,6 +1102,61 @@ public void testQueryGranularityFitsProjectionGranularity()
11021102
}
11031103
}
11041104

1105+
@Test
1106+
public void testQueryGranularityLargerProjectionGranularity()
1107+
{
1108+
final GroupByQuery.Builder queryBuilder =
1109+
GroupByQuery.builder()
1110+
.setDataSource("test")
1111+
.setInterval(Intervals.ETERNITY)
1112+
.addAggregator(new LongSumAggregatorFactory("c_sum", "c"));
1113+
if (segmentSortedByTime) {
1114+
queryBuilder.addDimension("a")
1115+
.setGranularity(Granularities.DAY);
1116+
} else {
1117+
queryBuilder.setGranularity(Granularities.ALL)
1118+
.setDimensions(
1119+
DefaultDimensionSpec.of("__gran", ColumnType.LONG),
1120+
DefaultDimensionSpec.of("a")
1121+
)
1122+
.setVirtualColumns(Granularities.toVirtualColumn(Granularities.DAY, "__gran"));
1123+
}
1124+
final GroupByQuery query = queryBuilder.build();
1125+
final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, null);
1126+
try (final CursorHolder cursorHolder = projectionsCursorFactory.makeCursorHolder(buildSpec)) {
1127+
final Cursor cursor = cursorHolder.asCursor();
1128+
int rowCount = 0;
1129+
while (!cursor.isDone()) {
1130+
rowCount++;
1131+
cursor.advance();
1132+
}
1133+
Assert.assertEquals(3, rowCount);
1134+
}
1135+
1136+
final Sequence<ResultRow> resultRows = groupingEngine.process(
1137+
query,
1138+
projectionsCursorFactory,
1139+
projectionsTimeBoundaryInspector,
1140+
nonBlockingPool,
1141+
null
1142+
);
1143+
1144+
final List<ResultRow> results = resultRows.toList();
1145+
Assert.assertEquals(2, results.size());
1146+
if (!segmentSortedByTime && projectionsCursorFactory instanceof QueryableIndexCursorFactory) {
1147+
Set<Object[]> resultsInNoParticularOrder = makeArrayResultSet(
1148+
new Object[]{TIMESTAMP.getMillis(), "a", 7L},
1149+
new Object[]{TIMESTAMP.getMillis(), "b", 12L}
1150+
);
1151+
for (ResultRow row : results) {
1152+
Assert.assertTrue(resultsInNoParticularOrder.contains(row.getArray()));
1153+
}
1154+
} else {
1155+
Assert.assertArrayEquals(new Object[]{TIMESTAMP.getMillis(), "a", 7L}, results.get(0).getArray());
1156+
Assert.assertArrayEquals(new Object[]{TIMESTAMP.getMillis(), "b", 12L}, results.get(1).getArray());
1157+
}
1158+
}
1159+
11051160
@Test
11061161
public void testProjectionSelectionMissingAggregatorButHasAggregatorInput()
11071162
{
@@ -1327,7 +1382,7 @@ public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithNoGrou
13271382
@Test
13281383
public void testTimeseriesQueryGranularityFinerThanProjectionGranularity()
13291384
{
1330-
Assume.assumeFalse(sortByDim);
1385+
Assume.assumeTrue(segmentSortedByTime);
13311386
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
13321387
.dataSource("test")
13331388
.intervals(ImmutableList.of(Intervals.ETERNITY))

0 commit comments

Comments
 (0)