Skip to content

Commit 596056d

Browse files
committed
implement transformData and related helpers
1 parent 3c207b7 commit 596056d

File tree

4 files changed

+361
-14
lines changed

4 files changed

+361
-14
lines changed

rust/cubeorchestrator/Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubeorchestrator/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ chrono = "0.4.38"
88
cubeshared = { path = "../cubeshared" }
99
serde = { version = "1.0.215", features = ["derive"] }
1010
serde_json = "1.0.133"
11+
anyhow = "1.0"
1112

1213
[dependencies.neon]
1314
version = "=1"

rust/cubeorchestrator/src/cubestore_result_transform.rs

Lines changed: 314 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1+
use crate::types::{
2+
ConfigItem, DBResponsePrimitive, DBResponseValue, MemberOrMemberExpression, MembersMap,
3+
NormalizedQuery, QueryTimeDimension, QueryType, ResultType, TransformedData,
4+
BLENDING_QUERY_KEY_PREFIX, BLENDING_QUERY_RES_SEPARATOR, COMPARE_DATE_RANGE_FIELD,
5+
COMPARE_DATE_RANGE_SEPARATOR, MEMBER_SEPARATOR,
6+
};
7+
use anyhow::{bail, Context, Result};
18
use chrono::SecondsFormat;
2-
use crate::types::{DBResponsePrimitive, DBResponseValue};
9+
use std::collections::HashMap;
310

11+
/// Transform specified `value` with specified `type` to the network protocol type.
412
pub fn transform_value(value: DBResponseValue, type_: &str) -> DBResponsePrimitive {
513
match value {
614
DBResponseValue::DateTime(dt) if type_ == "time" || type_.is_empty() => {
@@ -13,3 +21,308 @@ pub fn transform_value(value: DBResponseValue, type_: &str) -> DBResponsePrimiti
1321
}
1422
}
1523

24+
/// Parse date range value from time dimension.
25+
pub fn get_date_range_value(time_dimensions: Option<&Vec<QueryTimeDimension>>) -> Result<String> {
26+
let time_dimensions = match time_dimensions {
27+
Some(time_dimensions) => time_dimensions,
28+
None => bail!("QueryTimeDimension should be specified for the compare date range query."),
29+
};
30+
31+
let dim = match time_dimensions.get(0) {
32+
Some(dim) => dim,
33+
None => bail!("No time dimension provided."),
34+
};
35+
36+
let date_range: &Vec<String> = match &dim.date_range {
37+
Some(date_range) => date_range.as_ref(),
38+
None => bail!("Inconsistent QueryTimeDimension configuration: dateRange required."),
39+
};
40+
41+
if date_range.len() == 1 {
42+
bail!(
43+
"Inconsistent dateRange configuration for the compare date range query: {}",
44+
date_range[0]
45+
);
46+
}
47+
48+
Ok(date_range.join(COMPARE_DATE_RANGE_SEPARATOR))
49+
}
50+
51+
/// Parse blending query key from time dimension for query.
52+
pub fn get_blending_query_key(time_dimensions: Option<&Vec<QueryTimeDimension>>) -> Result<String> {
53+
let dim = time_dimensions
54+
.and_then(|dims| dims.first().cloned())
55+
.context("QueryTimeDimension should be specified for the blending query.")?;
56+
57+
let granularity = dim
58+
.granularity.clone()
59+
.context(format!(
60+
"Inconsistent QueryTimeDimension configuration for the blending query, granularity required: {:?}",
61+
dim
62+
))?;
63+
64+
Ok(format!("{}{}", BLENDING_QUERY_KEY_PREFIX, granularity))
65+
}
66+
67+
/// Parse blending query key from time dimension for response.
68+
pub fn get_blending_response_key(
69+
time_dimensions: Option<&Vec<QueryTimeDimension>>,
70+
) -> Result<String> {
71+
let dim = time_dimensions
72+
.and_then(|dims| dims.first().cloned())
73+
.context("QueryTimeDimension should be specified for the blending query.")?;
74+
75+
let granularity = dim
76+
.granularity.clone()
77+
.context(format!(
78+
"Inconsistent QueryTimeDimension configuration for the blending query, granularity required: {:?}",
79+
dim
80+
))?;
81+
82+
let dimension = dim.dimension.clone();
83+
84+
Ok(format!(
85+
"{}{}{}",
86+
dimension, BLENDING_QUERY_RES_SEPARATOR, granularity
87+
))
88+
}
89+
90+
/// Parse member names from request/response.
91+
pub fn get_members(
92+
query_type: &QueryType,
93+
query: &NormalizedQuery,
94+
db_data: &Vec<HashMap<String, DBResponseValue>>,
95+
alias_to_member_name_map: &HashMap<String, String>,
96+
annotation: &HashMap<String, ConfigItem>,
97+
) -> Result<MembersMap> {
98+
let mut members: MembersMap = HashMap::new();
99+
100+
if db_data.is_empty() {
101+
return Ok(members);
102+
}
103+
104+
let columns = db_data[0].keys().collect::<Vec<_>>();
105+
106+
for column in columns {
107+
let member_name = alias_to_member_name_map
108+
.get(column)
109+
.context(format!("Member name not found for alias: '{}'", column))?;
110+
111+
if !annotation.contains_key(member_name) {
112+
bail!(
113+
concat!(
114+
"You requested hidden member: '{}'. Please make it visible using `shown: true`. ",
115+
"Please note primaryKey fields are `shown: false` by default: ",
116+
"https://cube.dev/docs/schema/reference/joins#setting-a-primary-key."
117+
),
118+
column
119+
);
120+
}
121+
122+
members.insert(member_name.clone(), column.clone());
123+
124+
let path = member_name.split(MEMBER_SEPARATOR).collect::<Vec<&str>>();
125+
let calc_member = format!("{}{}{}", path[0], MEMBER_SEPARATOR, path[1]);
126+
127+
if path.len() == 3
128+
&& query.dimensions.as_ref().map_or(true, |dims| {
129+
!dims.iter().any(|dim| match dim {
130+
MemberOrMemberExpression::Member(name) => *name == calc_member,
131+
MemberOrMemberExpression::MemberExpression(expr) => expr.name == calc_member,
132+
})
133+
})
134+
{
135+
members.insert(calc_member, column.clone());
136+
}
137+
}
138+
139+
match query_type {
140+
QueryType::CompareDateRangeQuery => {
141+
members.insert(
142+
COMPARE_DATE_RANGE_FIELD.to_string(),
143+
QueryType::CompareDateRangeQuery.to_string(),
144+
);
145+
}
146+
QueryType::BlendingQuery => {
147+
let blending_key = get_blending_query_key(query.time_dimensions.as_ref())
148+
.context("Failed to generate blending query key")?;
149+
if let Some(dim) = query.time_dimensions.as_ref().and_then(|dims| dims.first().cloned()) {
150+
members.insert(blending_key, dim.dimension.clone());
151+
}
152+
}
153+
_ => {}
154+
}
155+
156+
Ok(members)
157+
}
158+
159+
/// Convert DB response object to the compact output format.
160+
pub fn get_compact_row(
161+
members_to_alias_map: &HashMap<String, String>,
162+
annotation: &HashMap<String, ConfigItem>,
163+
query_type: &QueryType,
164+
members: &[String],
165+
time_dimensions: Option<&Vec<QueryTimeDimension>>,
166+
db_row: &HashMap<String, DBResponseValue>,
167+
) -> Result<Vec<DBResponsePrimitive>> {
168+
let mut row: Vec<DBResponsePrimitive> = Vec::with_capacity(members.len());
169+
170+
for m in members {
171+
if let Some(annotation_item) = annotation.get(m) {
172+
if let Some(alias) = members_to_alias_map.get(m) {
173+
if let Some(value) = db_row.get(alias) {
174+
row.push(transform_value(value.clone(), &annotation_item.member_type));
175+
}
176+
}
177+
}
178+
}
179+
180+
match query_type {
181+
QueryType::CompareDateRangeQuery => {
182+
row.push(DBResponsePrimitive::String(get_date_range_value(
183+
time_dimensions,
184+
)?));
185+
}
186+
QueryType::BlendingQuery => {
187+
let blending_key = get_blending_response_key(time_dimensions)?;
188+
189+
if let Some(alias) = members_to_alias_map.get(&blending_key) {
190+
if let Some(value) = db_row.get(alias) {
191+
let member_type = annotation
192+
.get(alias)
193+
.map_or("", |annotation_item| &annotation_item.member_type);
194+
195+
row.push(transform_value(value.clone(), member_type));
196+
}
197+
}
198+
}
199+
_ => {}
200+
}
201+
202+
Ok(row)
203+
}
204+
205+
/// Convert DB response object to the vanilla output format.
206+
pub fn get_vanilla_row(
207+
alias_to_member_name_map: &HashMap<String, String>,
208+
annotation: &HashMap<String, ConfigItem>,
209+
query_type: &QueryType,
210+
query: &NormalizedQuery,
211+
db_row: &HashMap<String, DBResponseValue>,
212+
) -> Result<HashMap<String, DBResponsePrimitive>> {
213+
let mut row = HashMap::new();
214+
215+
for (alias, value) in db_row {
216+
let member_name = match alias_to_member_name_map.get(alias) {
217+
Some(m) => m,
218+
None => {
219+
bail!("Missing member name for alias: {}", alias);
220+
}
221+
};
222+
223+
let annotation_for_member = match annotation.get(member_name) {
224+
Some(am) => am,
225+
None => {
226+
bail!(
227+
concat!(
228+
"You requested hidden member: '{}'. Please make it visible using `shown: true`. ",
229+
"Please note primaryKey fields are `shown: false` by default: ",
230+
"https://cube.dev/docs/schema/reference/joins#setting-a-primary-key."
231+
),
232+
alias
233+
)
234+
}
235+
};
236+
237+
let transformed_value = transform_value(value.clone(), &annotation_for_member.member_type);
238+
239+
// Handle deprecated time dimensions without granularity
240+
let path: Vec<&str> = member_name.split(MEMBER_SEPARATOR).collect();
241+
let member_name_without_granularity = format!("{}{}{}", path[0], MEMBER_SEPARATOR, path[1]);
242+
if path.len() == 3
243+
&& query.dimensions.as_ref().map_or(true, |dims| {
244+
!dims.iter().any(|dim| match dim {
245+
MemberOrMemberExpression::Member(name) => {
246+
*name == member_name_without_granularity
247+
}
248+
MemberOrMemberExpression::MemberExpression(expr) => {
249+
expr.name == member_name_without_granularity
250+
}
251+
})
252+
})
253+
{
254+
row.insert(member_name_without_granularity, transformed_value);
255+
} else {
256+
row.insert(member_name.clone(), transformed_value);
257+
}
258+
}
259+
260+
match query_type {
261+
QueryType::CompareDateRangeQuery => {
262+
let date_range_value = get_date_range_value(query.time_dimensions.as_ref())?;
263+
row.insert(
264+
"compareDateRange".to_string(),
265+
DBResponsePrimitive::String(date_range_value),
266+
);
267+
}
268+
QueryType::BlendingQuery => {
269+
let blending_key = get_blending_query_key(query.time_dimensions.as_ref())?;
270+
let response_key = get_blending_response_key(query.time_dimensions.as_ref())?;
271+
272+
if let Some(value) = row.get(&response_key) {
273+
row.insert(blending_key, value.clone());
274+
}
275+
}
276+
_ => {}
277+
}
278+
279+
Ok(row)
280+
}
281+
282+
/// Transforms queried data array to the output format.
283+
pub fn transform_data(
284+
alias_to_member_name_map: &HashMap<String, String>,
285+
annotation: &HashMap<String, ConfigItem>,
286+
data: Vec<HashMap<String, DBResponseValue>>,
287+
query: &NormalizedQuery,
288+
query_type: &QueryType,
289+
res_type: Option<ResultType>,
290+
) -> Result<TransformedData> {
291+
let members_to_alias_map = get_members(
292+
query_type,
293+
query,
294+
&data,
295+
alias_to_member_name_map,
296+
annotation,
297+
)?;
298+
let members: Vec<String> = members_to_alias_map.keys().cloned().collect();
299+
300+
match res_type {
301+
Some(ResultType::Compact) => {
302+
let dataset: Vec<_> = data
303+
.into_iter()
304+
.map(|row| get_compact_row(
305+
&members_to_alias_map,
306+
annotation,
307+
query_type,
308+
&members,
309+
query.time_dimensions.as_ref(),
310+
&row))
311+
.collect::<Result<Vec<_>>>()?;
312+
Ok(TransformedData::Compact { members, dataset })
313+
}
314+
_ => {
315+
let dataset: Vec<_> = data
316+
.into_iter()
317+
.map(|row| get_vanilla_row(
318+
alias_to_member_name_map,
319+
annotation,
320+
query_type,
321+
query,
322+
&row,
323+
))
324+
.collect::<Result<Vec<_>>>()?;
325+
Ok(TransformedData::Vanilla(dataset))
326+
}
327+
}
328+
}

0 commit comments

Comments
 (0)