|
39 | 39 | import org.apache.druid.msq.querykit.QueryKitSpec; |
40 | 40 | import org.apache.druid.msq.querykit.QueryKitUtils; |
41 | 41 | import org.apache.druid.msq.querykit.ShuffleSpecFactory; |
| 42 | +import org.apache.druid.msq.querykit.WindowOperatorQueryKit; |
42 | 43 | import org.apache.druid.msq.querykit.results.ExportResultsStageProcessor; |
43 | 44 | import org.apache.druid.msq.querykit.results.QueryResultStageProcessor; |
44 | 45 | import org.apache.druid.msq.util.MSQTaskQueryMakerUtils; |
45 | 46 | import org.apache.druid.msq.util.MultiStageQueryContext; |
| 47 | +import org.apache.druid.query.DataSource; |
46 | 48 | import org.apache.druid.query.Query; |
47 | 49 | import org.apache.druid.query.QueryContext; |
| 50 | +import org.apache.druid.query.QueryDataSource; |
| 51 | +import org.apache.druid.query.operator.WindowOperatorQuery; |
48 | 52 | import org.apache.druid.segment.column.ColumnHolder; |
49 | 53 | import org.apache.druid.sql.calcite.planner.ColumnMappings; |
50 | 54 | import org.apache.druid.sql.http.ResultFormat; |
51 | 55 | import org.apache.druid.storage.ExportStorageProvider; |
52 | 56 |
|
| 57 | +import java.util.ArrayList; |
| 58 | +import java.util.List; |
| 59 | +import java.util.Map; |
| 60 | + |
53 | 61 | public class QueryKitBasedMSQPlanner |
54 | 62 | { |
55 | 63 | private final MSQSpec querySpec; |
@@ -122,7 +130,7 @@ private QueryDefinition makeQueryDefinitionInternal(final Query<?> queryToPlan) |
122 | 130 | try { |
123 | 131 | queryDef = queryKitSpec.getQueryKit().makeQueryDefinition( |
124 | 132 | queryKitSpec, |
125 | | - queryToPlan, |
| 133 | + propagateWindowOperatorTransformationContext(queryToPlan), |
126 | 134 | makeResultShuffleSpecFacory(), |
127 | 135 | 0 |
128 | 136 | ); |
@@ -227,4 +235,52 @@ private ShuffleSpecFactory makeResultShuffleSpecFacory() |
227 | 235 | return destination.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(query.context())); |
228 | 236 | } |
229 | 237 | } |
| 238 | + |
| 239 | + /** |
| 240 | + * Propagates {@link MultiStageQueryContext#WINDOW_FUNCTION_OPERATOR_TRANSFORMATION} from the outer query to |
| 241 | + * any inner {@link WindowOperatorQuery}, so {@link WindowOperatorQueryKit} can see it. The original change |
| 242 | + * in https://github.com/apache/druid/pull/17443 only added it to the outer query at the Broker, and since |
| 243 | + * QueryKit became injectable via Guice, {@link WindowOperatorQueryKit} can no longer "see" parameters that |
| 244 | + * come from the context of the outermost query. For compatibility reasons, this propagation must be done here |
| 245 | + * rather than during SQL planning. |
| 246 | + */ |
| 247 | + private static Query<?> propagateWindowOperatorTransformationContext(final Query<?> query) |
| 248 | + { |
| 249 | + final Boolean val = query.context().getBoolean(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION); |
| 250 | + if (val != null) { |
| 251 | + // Update inner queries. |
| 252 | + return query.withDataSource(setWindowOperatorTransformationContext(query.getDataSource(), val)); |
| 253 | + } else { |
| 254 | + return query; |
| 255 | + } |
| 256 | + } |
| 257 | + |
| 258 | + /** |
| 259 | + * Sets {@link MultiStageQueryContext#WINDOW_FUNCTION_OPERATOR_TRANSFORMATION} on all {@link QueryDataSource} |
| 260 | + * found within the provided {@link DataSource}. |
| 261 | + */ |
| 262 | + private static DataSource setWindowOperatorTransformationContext(final DataSource dataSource, final boolean val) |
| 263 | + { |
| 264 | + final List<DataSource> children = dataSource.getChildren(); |
| 265 | + final List<DataSource> newChildren = new ArrayList<>(); |
| 266 | + for (final DataSource child : children) { |
| 267 | + newChildren.add(setWindowOperatorTransformationContext(child, val)); |
| 268 | + } |
| 269 | + if (dataSource instanceof QueryDataSource |
| 270 | + && ((QueryDataSource) dataSource).getQuery() instanceof WindowOperatorQuery) { |
| 271 | + return new QueryDataSource( |
| 272 | + ((QueryDataSource) dataSource) |
| 273 | + .getQuery() |
| 274 | + .withOverriddenContext( |
| 275 | + Map.of( |
| 276 | + MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, |
| 277 | + val |
| 278 | + ) |
| 279 | + ) |
| 280 | + .withDataSource(Iterables.getOnlyElement(newChildren)) |
| 281 | + ); |
| 282 | + } else { |
| 283 | + return dataSource.withChildren(newChildren); |
| 284 | + } |
| 285 | + } |
230 | 286 | } |
0 commit comments