|
| 1 | +use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext}; |
| 2 | +use crate::logical_plan::{ |
| 3 | + pretty_print, pretty_print_rc, FullKeyAggregate, ResolvedMultipliedMeasures, SimpleQuery, |
| 4 | + SimpleQuerySource, |
| 5 | +}; |
| 6 | +use crate::physical_plan_builder::PhysicalPlanBuilder; |
| 7 | +use crate::plan::{ |
| 8 | + Expr, Filter, From, JoinBuilder, JoinCondition, MemberExpression, QualifiedColumnName, |
| 9 | + QueryPlan, Select, SelectBuilder, Union, |
| 10 | +}; |
| 11 | +use crate::planner::query_tools::QueryTools; |
| 12 | +use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; |
| 13 | +use crate::planner::sql_evaluator::{MemberSymbol, ReferencesBuilder}; |
| 14 | +use crate::planner::sql_templates::PlanSqlTemplates; |
| 15 | +use crate::planner::{BaseMember, MemberSymbolRef}; |
| 16 | +use cubenativeutils::CubeError; |
| 17 | +use itertools::Itertools; |
| 18 | +use std::collections::HashMap; |
| 19 | +use std::rc::Rc; |
| 20 | + |
| 21 | +trait FullKeyAggregateStrategy { |
| 22 | + fn process( |
| 23 | + &self, |
| 24 | + full_key_aggregate: &FullKeyAggregate, |
| 25 | + context: &PushDownBuilderContext, |
| 26 | + ) -> Result<Rc<From>, CubeError>; |
| 27 | +} |
| 28 | + |
| 29 | +struct KeysFullKeyAggregateStrategy<'a> { |
| 30 | + builder: &'a PhysicalPlanBuilder, |
| 31 | +} |
| 32 | + |
| 33 | +impl<'a> KeysFullKeyAggregateStrategy<'a> { |
| 34 | + pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc<Self> { |
| 35 | + Rc::new(Self { builder }) |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +impl<'a> FullKeyAggregateStrategy for KeysFullKeyAggregateStrategy<'a> { |
| 40 | + fn process( |
| 41 | + &self, |
| 42 | + full_key_aggregate: &FullKeyAggregate, |
| 43 | + context: &PushDownBuilderContext, |
| 44 | + ) -> Result<Rc<From>, CubeError> { |
| 45 | + let query_tools = self.builder.query_tools(); |
| 46 | + let mut keys_queries = vec![]; |
| 47 | + let mut data_queries = vec![]; |
| 48 | + let mut keys_context = context.clone(); |
| 49 | + keys_context.dimensions_query = true; |
| 50 | + if let Some(resolved_multiplied_measures) = &full_key_aggregate.multiplied_measures_resolver |
| 51 | + { |
| 52 | + match resolved_multiplied_measures { |
| 53 | + ResolvedMultipliedMeasures::ResolveMultipliedMeasures( |
| 54 | + resolve_multiplied_measures, |
| 55 | + ) => { |
| 56 | + for regular_measure_query in resolve_multiplied_measures |
| 57 | + .regular_measure_subqueries |
| 58 | + .iter() |
| 59 | + { |
| 60 | + let keys_query = self |
| 61 | + .builder |
| 62 | + .process_node(regular_measure_query.as_ref(), &keys_context)?; |
| 63 | + keys_queries.push(keys_query); |
| 64 | + let query = self |
| 65 | + .builder |
| 66 | + .process_node(regular_measure_query.as_ref(), &context)?; |
| 67 | + data_queries.push(query); |
| 68 | + } |
| 69 | + for multiplied_measure_query in resolve_multiplied_measures |
| 70 | + .aggregate_multiplied_subqueries |
| 71 | + .iter() |
| 72 | + { |
| 73 | + let keys_query = self |
| 74 | + .builder |
| 75 | + .process_node(multiplied_measure_query.as_ref(), &keys_context)?; |
| 76 | + keys_queries.push(keys_query); |
| 77 | + let query = self |
| 78 | + .builder |
| 79 | + .process_node(multiplied_measure_query.as_ref(), &context)?; |
| 80 | + data_queries.push(query); |
| 81 | + } |
| 82 | + } |
| 83 | + ResolvedMultipliedMeasures::PreAggregation(simple_query) => todo!(), |
| 84 | + } |
| 85 | + } |
| 86 | + for cte_souce in full_key_aggregate.cte_sources.iter() { |
| 87 | + } |
| 88 | + if data_queries.is_empty() { |
| 89 | + return Err(CubeError::internal(format!( |
| 90 | + "FullKeyAggregate should have at least one source: {}", |
| 91 | + pretty_print(full_key_aggregate) |
| 92 | + ))); |
| 93 | + } |
| 94 | + |
| 95 | + if data_queries.len() == 1 { |
| 96 | + let select = data_queries[0].clone(); |
| 97 | + let result = From::new_from_subselect(select, "fk_aggregate".to_string()); |
| 98 | + return Ok(result); |
| 99 | + } |
| 100 | + |
| 101 | + let keys_from = From::new_from_union( |
| 102 | + Rc::new(Union::new_from_subselects(&keys_queries)), |
| 103 | + "pk_aggregate_keys_source".to_string(), |
| 104 | + ); |
| 105 | + let references_builder = ReferencesBuilder::new(keys_from.clone()); |
| 106 | + let mut keys_select_builder = SelectBuilder::new(keys_from); |
| 107 | + |
| 108 | + for member in full_key_aggregate.schema.all_dimensions() { |
| 109 | + let alias = references_builder.resolve_alias_for_member(&member.full_name(), &None); |
| 110 | + if alias.is_none() { |
| 111 | + return Err(CubeError::internal(format!( |
| 112 | + "Source for {} not found in full key aggregate subqueries", |
| 113 | + member.full_name() |
| 114 | + ))); |
| 115 | + } |
| 116 | + let reference = QualifiedColumnName::new(None, alias.unwrap()); |
| 117 | + let member_ref = member.clone().as_base_member(query_tools.clone())?; |
| 118 | + keys_select_builder.add_projection_member_reference(&member_ref, reference); |
| 119 | + } |
| 120 | + keys_select_builder.set_distinct(); |
| 121 | + |
| 122 | + let sql_context = SqlNodesFactory::new(); |
| 123 | + let keys_select = Rc::new(keys_select_builder.build(sql_context)); |
| 124 | + |
| 125 | + let keys_alias = "fk_aggregate_keys".to_string(); |
| 126 | + |
| 127 | + let mut join_builder = |
| 128 | + JoinBuilder::new_from_subselect(keys_select.clone(), keys_alias.clone()); |
| 129 | + |
| 130 | + for (i, query) in data_queries.into_iter().enumerate() { |
| 131 | + let query_alias = format!("q_{}", i); |
| 132 | + let conditions = full_key_aggregate |
| 133 | + .schema |
| 134 | + .all_dimensions() |
| 135 | + .map(|dim| -> Result<_, CubeError> { |
| 136 | + let member_ref = dim.clone().as_base_member(query_tools.clone())?; |
| 137 | + let alias_in_keys_query = |
| 138 | + keys_select.schema().resolve_member_alias(&member_ref); |
| 139 | + let keys_query_ref = Expr::Reference(QualifiedColumnName::new( |
| 140 | + Some(keys_alias.clone()), |
| 141 | + alias_in_keys_query, |
| 142 | + )); |
| 143 | + let alias_in_data_query = query.schema().resolve_member_alias(&member_ref); |
| 144 | + let data_query_ref = Expr::Reference(QualifiedColumnName::new( |
| 145 | + Some(query_alias.clone()), |
| 146 | + alias_in_data_query, |
| 147 | + )); |
| 148 | + |
| 149 | + Ok(vec![(keys_query_ref, data_query_ref)]) |
| 150 | + }) |
| 151 | + .collect::<Result<Vec<_>, _>>()?; |
| 152 | + |
| 153 | + join_builder.left_join_subselect( |
| 154 | + query, |
| 155 | + query_alias.clone(), |
| 156 | + JoinCondition::new_dimension_join(conditions, true), |
| 157 | + ); |
| 158 | + } |
| 159 | + |
| 160 | + let result = join_builder.build(); |
| 161 | + Ok(From::new_from_join(result)) |
| 162 | + } |
| 163 | +} |
| 164 | + |
| 165 | +struct InnerJoinFullKeyAggregateStrategy<'a> { |
| 166 | + builder: &'a PhysicalPlanBuilder, |
| 167 | +} |
| 168 | + |
| 169 | +impl<'a> InnerJoinFullKeyAggregateStrategy<'a> { |
| 170 | + pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc<Self> { |
| 171 | + Rc::new(Self { builder }) |
| 172 | + } |
| 173 | +} |
| 174 | + |
| 175 | +impl<'a> FullKeyAggregateStrategy for InnerJoinFullKeyAggregateStrategy<'a> { |
| 176 | + fn process( |
| 177 | + &self, |
| 178 | + full_key_aggregate: &FullKeyAggregate, |
| 179 | + context: &PushDownBuilderContext, |
| 180 | + ) -> Result<Rc<From>, CubeError> { |
| 181 | + let query_tools = self.builder.query_tools(); |
| 182 | + let mut data_queries = vec![]; |
| 183 | + if let Some(resolved_multiplied_measures) = &full_key_aggregate.multiplied_measures_resolver |
| 184 | + { |
| 185 | + match resolved_multiplied_measures { |
| 186 | + ResolvedMultipliedMeasures::ResolveMultipliedMeasures( |
| 187 | + resolve_multiplied_measures, |
| 188 | + ) => { |
| 189 | + for regular_measure_query in resolve_multiplied_measures |
| 190 | + .regular_measure_subqueries |
| 191 | + .iter() |
| 192 | + { |
| 193 | + let query = self |
| 194 | + .builder |
| 195 | + .process_node(regular_measure_query.as_ref(), &context)?; |
| 196 | + data_queries.push(query); |
| 197 | + } |
| 198 | + for multiplied_measure_query in resolve_multiplied_measures |
| 199 | + .aggregate_multiplied_subqueries |
| 200 | + .iter() |
| 201 | + { |
| 202 | + let query = self |
| 203 | + .builder |
| 204 | + .process_node(multiplied_measure_query.as_ref(), &context)?; |
| 205 | + data_queries.push(query); |
| 206 | + } |
| 207 | + } |
| 208 | + ResolvedMultipliedMeasures::PreAggregation(simple_query) => todo!(), |
| 209 | + } |
| 210 | + } |
| 211 | + if data_queries.is_empty() { |
| 212 | + return Err(CubeError::internal(format!( |
| 213 | + "FullKeyAggregate should have at least one source: {}", |
| 214 | + pretty_print(full_key_aggregate) |
| 215 | + ))); |
| 216 | + } |
| 217 | + |
| 218 | + if data_queries.len() == 1 { |
| 219 | + let select = data_queries[0].clone(); |
| 220 | + let result = From::new_from_subselect(select, "fk_aggregate".to_string()); |
| 221 | + return Ok(result); |
| 222 | + } |
| 223 | + |
| 224 | + let mut join_builder = |
| 225 | + JoinBuilder::new_from_subselect(data_queries[0].clone(), "q_0".to_string()); |
| 226 | + |
| 227 | + for (i, query) in data_queries.iter().skip(1).enumerate() { |
| 228 | + let prev_alias = format!("q_{}", i); |
| 229 | + let query_alias = format!("q_{}", i + 1); |
| 230 | + let conditions = full_key_aggregate |
| 231 | + .schema |
| 232 | + .all_dimensions() |
| 233 | + .map(|dim| -> Result<_, CubeError> { |
| 234 | + let member_ref = dim.clone().as_base_member(query_tools.clone())?; |
| 235 | + let alias_in_prev_query = |
| 236 | + data_queries[i].schema().resolve_member_alias(&member_ref); |
| 237 | + let prev_query_ref = Expr::Reference(QualifiedColumnName::new( |
| 238 | + Some(prev_alias.clone()), |
| 239 | + alias_in_prev_query, |
| 240 | + )); |
| 241 | + let alias_in_data_query = query.schema().resolve_member_alias(&member_ref); |
| 242 | + let data_query_ref = Expr::Reference(QualifiedColumnName::new( |
| 243 | + Some(query_alias.clone()), |
| 244 | + alias_in_data_query, |
| 245 | + )); |
| 246 | + |
| 247 | + Ok(vec![(prev_query_ref, data_query_ref)]) |
| 248 | + }) |
| 249 | + .collect::<Result<Vec<_>, _>>()?; |
| 250 | + |
| 251 | + join_builder.inner_join_subselect( |
| 252 | + query.clone(), |
| 253 | + query_alias.clone(), |
| 254 | + JoinCondition::new_dimension_join(conditions, true), |
| 255 | + ); |
| 256 | + } |
| 257 | + |
| 258 | + let result = join_builder.build(); |
| 259 | + Ok(From::new_from_join(result)) |
| 260 | + } |
| 261 | +} |
| 262 | + |
| 263 | +pub struct FullKeyAggregateProcessor<'a> { |
| 264 | + builder: &'a PhysicalPlanBuilder, |
| 265 | +} |
| 266 | + |
| 267 | +impl<'a> LogicalNodeProcessor<'a, FullKeyAggregate> for FullKeyAggregateProcessor<'a> { |
| 268 | + type PhysycalNode = Rc<From>; |
| 269 | + fn new(builder: &'a PhysicalPlanBuilder) -> Self { |
| 270 | + Self { builder } |
| 271 | + } |
| 272 | + |
| 273 | + fn process( |
| 274 | + &self, |
| 275 | + full_key_aggregate: &FullKeyAggregate, |
| 276 | + context: &PushDownBuilderContext, |
| 277 | + ) -> Result<Self::PhysycalNode, CubeError> { |
| 278 | + let strategy: Rc<dyn FullKeyAggregateStrategy> = |
| 279 | + if full_key_aggregate.schema.has_dimensions() { |
| 280 | + KeysFullKeyAggregateStrategy::new(self.builder) |
| 281 | + } else { |
| 282 | + InnerJoinFullKeyAggregateStrategy::new(self.builder) |
| 283 | + }; |
| 284 | + strategy.process(full_key_aggregate, context) |
| 285 | + } |
| 286 | +} |
| 287 | + |
| 288 | +impl ProcessableNode for FullKeyAggregate { |
| 289 | + type ProcessorType<'a> = FullKeyAggregateProcessor<'a>; |
| 290 | +} |
0 commit comments