Skip to content

Commit c2ec86e

Browse files
gianmkgyrtkirk
authored andcommitted
MSQ: Fix WindowOperatorQuery planning. (#18911)
PR #18875 caused a regression in WindowOperatorQuery planning, by causing non-outermost WindowOperatorQuery to be planned without windowFunctionOperatorTransformation (see #17443). This happened because only the outermost query has the windowFunctionOperatorTransformation parameter. This patch fixes the problem by propagating the context parameter from the outermost queries to any inner WindowOperatorQuery instances. (cherry picked from commit 07de2ff)
1 parent 55dfcf9 commit c2ec86e

File tree

1 file changed

+57
-1
lines changed

1 file changed

+57
-1
lines changed

multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryKitBasedMSQPlanner.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,25 @@
3939
import org.apache.druid.msq.querykit.QueryKitSpec;
4040
import org.apache.druid.msq.querykit.QueryKitUtils;
4141
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
42+
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
4243
import org.apache.druid.msq.querykit.results.ExportResultsStageProcessor;
4344
import org.apache.druid.msq.querykit.results.QueryResultStageProcessor;
4445
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
4546
import org.apache.druid.msq.util.MultiStageQueryContext;
47+
import org.apache.druid.query.DataSource;
4648
import org.apache.druid.query.Query;
4749
import org.apache.druid.query.QueryContext;
50+
import org.apache.druid.query.QueryDataSource;
51+
import org.apache.druid.query.operator.WindowOperatorQuery;
4852
import org.apache.druid.segment.column.ColumnHolder;
4953
import org.apache.druid.sql.calcite.planner.ColumnMappings;
5054
import org.apache.druid.sql.http.ResultFormat;
5155
import org.apache.druid.storage.ExportStorageProvider;
5256

57+
import java.util.ArrayList;
58+
import java.util.List;
59+
import java.util.Map;
60+
5361
public class QueryKitBasedMSQPlanner
5462
{
5563
private final MSQSpec querySpec;
@@ -122,7 +130,7 @@ private QueryDefinition makeQueryDefinitionInternal(final Query<?> queryToPlan)
122130
try {
123131
queryDef = queryKitSpec.getQueryKit().makeQueryDefinition(
124132
queryKitSpec,
125-
queryToPlan,
133+
propagateWindowOperatorTransformationContext(queryToPlan),
126134
makeResultShuffleSpecFacory(),
127135
0
128136
);
@@ -227,4 +235,52 @@ private ShuffleSpecFactory makeResultShuffleSpecFacory()
227235
return destination.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(query.context()));
228236
}
229237
}
238+
239+
/**
240+
* Propagates {@link MultiStageQueryContext#WINDOW_FUNCTION_OPERATOR_TRANSFORMATION} from the outer query to
241+
* any inner {@link WindowOperatorQuery}, so {@link WindowOperatorQueryKit} can see it. The original change
242+
* in https://github.com/apache/druid/pull/17443 only added it to the outer query at the Broker, and since
243+
* QueryKit became injectable via Guice, {@link WindowOperatorQueryKit} can no longer "see" parameters that
244+
* come from the context of the outermost query. For compatibility reasons, this propagation must be done here
245+
* rather than during SQL planning.
246+
*/
247+
private static Query<?> propagateWindowOperatorTransformationContext(final Query<?> query)
248+
{
249+
final Boolean val = query.context().getBoolean(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION);
250+
if (val != null) {
251+
// Update inner queries.
252+
return query.withDataSource(setWindowOperatorTransformationContext(query.getDataSource(), val));
253+
} else {
254+
return query;
255+
}
256+
}
257+
258+
/**
259+
* Sets {@link MultiStageQueryContext#WINDOW_FUNCTION_OPERATOR_TRANSFORMATION} on all {@link QueryDataSource}
260+
* found within the provided {@link DataSource}.
261+
*/
262+
private static DataSource setWindowOperatorTransformationContext(final DataSource dataSource, final boolean val)
263+
{
264+
final List<DataSource> children = dataSource.getChildren();
265+
final List<DataSource> newChildren = new ArrayList<>();
266+
for (final DataSource child : children) {
267+
newChildren.add(setWindowOperatorTransformationContext(child, val));
268+
}
269+
if (dataSource instanceof QueryDataSource
270+
&& ((QueryDataSource) dataSource).getQuery() instanceof WindowOperatorQuery) {
271+
return new QueryDataSource(
272+
((QueryDataSource) dataSource)
273+
.getQuery()
274+
.withOverriddenContext(
275+
Map.of(
276+
MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
277+
val
278+
)
279+
)
280+
.withDataSource(Iterables.getOnlyElement(newChildren))
281+
);
282+
} else {
283+
return dataSource.withChildren(newChildren);
284+
}
285+
}
230286
}

0 commit comments

Comments
 (0)