Skip to content

Commit 2f11d20

Browse files
authored
feat: Initial support for grouped join pushdown (#9032)
* Load request extended with list of grouped subqueries to join with. Each subquery come with SQL string, alias, condition and join type. SQL is treated as static query, fully porcessed by SQL API. Condition - as member expression, because it can contain references to dimensions * `BaseQuery` will add grouped subqueries to SQL after subquery dimensions and join tree parts * Wrapper rewrite rules handle `wrapper_pullup_replacer` in `join` position of `wrapped_select`, but still mostly with empty list inside. Notable exception is aggregation flattening - it is allowed to try to flatten aggregation into WrappedSelect with non-empty joins. * Wrapper pull up rules no handle trivial pulling up over single `wrapped_select_join` and over list `WrappedSelectJoins` list * Wrapper replacers now have `grouped_subqueries` in a context, it is used to track qualifiers of grouped subqueries, to allow rewriting of column expressions referencing grouped subquery with push-to-cube active * Converter from egraph to logical plan reconstructs schema for joins * Penalize joins harder than wrappers. For queries ungrouped queries with grouped join, like `SELECT * FROM (ungrouped) JOIN (grouped);`, this would allow to prefer representation without join, but with wrapper. Without this cost change join representation would have higher `ast_size_outside_wrapper` component, but it is less important then `wrapper_nodes`, so wrapper representation would be more expensive. * SQL generation for `WrappedSelectNode` will generate each grouped subquery separately, generate member expressions for join conditions and push all that to Cube * Rewrite rules are too relaxed for now: they allow any `push_to_cube=false` query as grouped, which is incorrect, as we may drop push-to-Cube when wrapping `WrappedSelect` with another layer, even if inner `WrappedSelect` is projection. This is covered by check in SQL generation: only `CubeScan(grouped=true)` are allowed. This require new flag in wrapper replacers to fix. Supporting changes: * Extract `CubeScanWrappedSqlNode` from `CubeScanWrapperNode`. Both a logical plan nodes for now, `Wrapper` is generated during rewrites, and `WrappedSql` stores SQL, generated generated during `evaluate_wrapped_sql` * Extracted `__cubeJoinField` join check to function, and make it stricter, only `left.__cubeJoinField = right.__cubeJoinField` is allowed * Avoid clone calls in `find_column_by_alias` and `__cubeJoinField` join check * Add `PushToCubeContext` in SQL generation code, used to track everything necessary to generate SQL for push-to-Cube case. For now it contains ungrouped `CubeScanNode` that whould be pushed into, and qualifiers and generated SQL for all grouped subqueries. Qualifiers are necessary to generate column expressions on top of ungrouped-grouped join. * New `copy_value!` macro, to extend `copy_flag!`, allows to copy single value between different unary language variants * Prettify extracted best plan and cost in logs
1 parent aaa4364 commit 2f11d20

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2784
-718
lines changed

packages/cubejs-api-gateway/openspec.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,24 @@ components:
329329
- $ref: "#/components/schemas/V1LoadRequestQueryFilterBase"
330330
- $ref: "#/components/schemas/V1LoadRequestQueryFilterLogicalOr"
331331
- $ref: "#/components/schemas/V1LoadRequestQueryFilterLogicalAnd"
332+
V1LoadRequestQueryJoinSubquery:
333+
type: "object"
334+
properties:
335+
sql:
336+
type: "string"
337+
# TODO This is _always_ a member expression, maybe pass as parsed, without intermediate string?
338+
"on":
339+
type: "string"
340+
# TODO why string? it's enum
341+
joinType:
342+
type: "string"
343+
alias:
344+
type: "string"
345+
required:
346+
- sql
347+
- "on"
348+
- joinType
349+
- alias
332350
V1LoadRequestQuery:
333351
type: "object"
334352
properties:
@@ -366,6 +384,12 @@ components:
366384
$ref: "#/components/schemas/V1LoadRequestQueryFilterItem"
367385
ungrouped:
368386
type: "boolean"
387+
# vector of (subquery sql: string, join condition: member expression, join type: enum)
388+
# they will be added to end of joinQuery in BaseQuery, in same order as here
389+
subqueryJoins:
390+
type: "array"
391+
items:
392+
$ref: "#/components/schemas/V1LoadRequestQueryJoinSubquery"
369393
V1LoadRequest:
370394
type: "object"
371395
properties:

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1312,7 +1312,12 @@ class ApiGateway {
13121312
}
13131313

13141314
private hasExpressionsInQuery(query: Query): boolean {
1315-
const arraysToCheck = [query.measures, query.dimensions, query.segments];
1315+
const arraysToCheck = [
1316+
query.measures,
1317+
query.dimensions,
1318+
query.segments,
1319+
(query.subqueryJoins ?? []).map(join => join.on),
1320+
];
13161321

13171322
return arraysToCheck.some(array => array?.some(item => typeof item === 'string' && item.startsWith('{')));
13181323
}
@@ -1323,6 +1328,10 @@ class ApiGateway {
13231328
measures: (query.measures || []).map(m => (typeof m === 'string' ? this.parseMemberExpression(m) : m)),
13241329
dimensions: (query.dimensions || []).map(m => (typeof m === 'string' ? this.parseMemberExpression(m) : m)),
13251330
segments: (query.segments || []).map(m => (typeof m === 'string' ? this.parseMemberExpression(m) : m)),
1331+
subqueryJoins: (query.subqueryJoins ?? []).map(join => (typeof join.on === 'string' ? {
1332+
...join,
1333+
on: this.parseMemberExpression(join.on),
1334+
} : join)),
13261335
};
13271336
}
13281337

@@ -1361,6 +1370,10 @@ class ApiGateway {
13611370
measures: (query.measures || []).map(m => (typeof m !== 'string' ? this.evalMemberExpression(m as ParsedMemberExpression) : m)),
13621371
dimensions: (query.dimensions || []).map(m => (typeof m !== 'string' ? this.evalMemberExpression(m as ParsedMemberExpression) : m)),
13631372
segments: (query.segments || []).map(m => (typeof m !== 'string' ? this.evalMemberExpression(m as ParsedMemberExpression) : m)),
1373+
subqueryJoins: (query.subqueryJoins ?? []).map(join => (typeof join.on !== 'string' ? {
1374+
...join,
1375+
on: this.evalMemberExpression(join.on as ParsedMemberExpression)
1376+
} : join)),
13641377
};
13651378
}
13661379

packages/cubejs-api-gateway/src/query.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ const oneCondition = Joi.object().keys({
9696
and: Joi.array().items(oneFilter, Joi.link('...').description('oneCondition schema')),
9797
}).xor('or', 'and');
9898

99+
const subqueryJoin = Joi.object().keys({
100+
sql: Joi.string(),
101+
// TODO This is _always_ a member expression, maybe pass as parsed, without intermediate string?
102+
// TODO there are three different types instead of alternatives for this actually
103+
on: Joi.alternatives(Joi.string(), memberExpression, parsedMemberExpression),
104+
joinType: Joi.string().valid('LEFT', 'INNER'),
105+
alias: Joi.string(),
106+
});
107+
99108
const querySchema = Joi.object().keys({
100109
// TODO add member expression alternatives only for SQL API queries?
101110
measures: Joi.array().items(Joi.alternatives(id, memberExpression, parsedMemberExpression)),
@@ -122,6 +131,7 @@ const querySchema = Joi.object().keys({
122131
renewQuery: Joi.boolean(),
123132
ungrouped: Joi.boolean(),
124133
responseFormat: Joi.valid('default', 'compact'),
134+
subqueryJoins: Joi.array().items(subqueryJoin),
125135
});
126136

127137
const normalizeQueryOrder = order => {

packages/cubejs-api-gateway/src/types/query.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ interface QueryTimeDimension {
6767
granularity?: QueryTimeDimensionGranularity;
6868
}
6969

70+
type SubqueryJoins = {
71+
sql: string,
72+
// TODO This is _always_ a member expression, maybe pass as parsed, without intermediate string?
73+
// TODO there are three different types instead of alternatives for this actually
74+
on: string | ParsedMemberExpression | MemberExpression,
75+
joinType: 'LEFT' | 'INNER',
76+
alias: string,
77+
};
78+
7079
/**
7180
* Incoming network query data type.
7281
*/
@@ -85,6 +94,9 @@ interface Query {
8594
renewQuery?: boolean;
8695
ungrouped?: boolean;
8796
responseFormat?: ResultType;
97+
98+
// TODO incoming query, query with parsed exprs and query with evaluated exprs are all different types
99+
subqueryJoins?: Array<SubqueryJoins>,
88100
}
89101

90102
/**

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,23 @@ const SecondsDurations = {
6060
* @property {*} headCommitId
6161
*/
6262

63+
/**
64+
* @typedef {Object} JoinRoot
65+
* @property {string} sql
66+
* @property {string} alias
67+
*/
68+
69+
/**
70+
* @typedef {Object} JoinItem
71+
* @property {string} sql
72+
* @property {string} alias
73+
* @property {string} on
74+
*/
75+
76+
/**
77+
* @typedef {[JoinRoot, ...JoinItem]} JoinChain
78+
*/
79+
6380
/**
6481
* BaseQuery class. BaseQuery object encapsulates the logic of
6582
* transforming an incoming to a specific cube request to the
@@ -224,6 +241,7 @@ export class BaseQuery {
224241
multiStageQuery: this.options.multiStageQuery,
225242
multiStageDimensions: this.options.multiStageDimensions,
226243
multiStageTimeDimensions: this.options.multiStageTimeDimensions,
244+
subqueryJoins: this.options.subqueryJoins,
227245
});
228246
this.from = this.options.from;
229247
this.multiStageQuery = this.options.multiStageQuery;
@@ -269,6 +287,11 @@ export class BaseQuery {
269287
this.preAggregationsSchemaOption = this.options.preAggregationsSchema ?? DEFAULT_PREAGGREGATIONS_SCHEMA;
270288
this.externalQueryClass = this.options.externalQueryClass;
271289

290+
/**
291+
* @type {Array<{sql: string, on: {expression: Function}, joinType: 'LEFT' | 'INNER', alias: string}>}
292+
*/
293+
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
294+
272295
// Set the default order only when options.order is not provided at all
273296
// if options.order is set (empty array [] or with data) - use it as is
274297
this.order = this.options.order ?? this.defaultOrder();
@@ -1604,19 +1627,44 @@ export class BaseQuery {
16041627
return this.joinSql([
16051628
{ sql: cubeSql, alias: cubeAlias },
16061629
...(subQueryDimensionsByCube[join.root] || []).map(d => this.subQueryJoin(d)),
1607-
...joins
1630+
...joins,
1631+
...this.customSubQueryJoins.map((customJoin) => this.customSubQueryJoin(customJoin)),
16081632
]);
16091633
}
16101634

16111635
joinSql(toJoin) {
16121636
const [root, ...rest] = toJoin;
16131637
const joins = rest.map(
1614-
j => `LEFT JOIN ${j.sql} ${this.asSyntaxJoin} ${j.alias} ON ${j.on}`
1638+
j => {
1639+
const joinType = j.joinType ?? 'LEFT';
1640+
return `${joinType} JOIN ${j.sql} ${this.asSyntaxJoin} ${j.alias} ON ${j.on}`;
1641+
}
16151642
);
16161643

16171644
return [`${root.sql} ${this.asSyntaxJoin} ${root.alias}`, ...joins].join('\n');
16181645
}
16191646

1647+
/**
1648+
*
1649+
* @param {{sql: string, on: {cubeName: string, expression: Function}, joinType: 'LEFT' | 'INNER', alias: string}} customJoin
1650+
* @returns {JoinItem}
1651+
*/
1652+
customSubQueryJoin(customJoin) {
1653+
const on = this.evaluateSql(customJoin.on.cubeName, customJoin.on.expression);
1654+
1655+
return {
1656+
sql: `(${customJoin.sql})`,
1657+
alias: customJoin.alias,
1658+
on,
1659+
joinType: customJoin.joinType,
1660+
};
1661+
}
1662+
1663+
/**
1664+
*
1665+
* @param {string} dimension
1666+
* @returns {JoinItem}
1667+
*/
16201668
subQueryJoin(dimension) {
16211669
const { prefix, subQuery, cubeName } = this.subQueryDescription(dimension);
16221670
const primaryKeys = this.cubeEvaluator.primaryKeys[cubeName];

packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,32 @@ Array [
5757
]
5858
`;
5959

60+
exports[`SQL API Postgres (Data) join with filtered grouped query: join grouped with filter 1`] = `
61+
Array [
62+
Object {
63+
"count": "2",
64+
"status": "processed",
65+
},
66+
Object {
67+
"count": "2",
68+
"status": "new",
69+
},
70+
]
71+
`;
72+
73+
exports[`SQL API Postgres (Data) join with grouped query: join grouped 1`] = `
74+
Array [
75+
Object {
76+
"count": "2",
77+
"status": "processed",
78+
},
79+
Object {
80+
"count": "1",
81+
"status": "shipped",
82+
},
83+
]
84+
`;
85+
6086
exports[`SQL API Postgres (Data) metabase max number: metabase max number 1`] = `
6187
Array [
6288
Object {

packages/cubejs-testing/test/smoke-cubesql.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,65 @@ filter_subq AS (
476476
expect(res.rows).toMatchSnapshot('select __user and literal in wrapper');
477477
});
478478

479+
test('join with grouped query', async () => {
480+
const query = `
481+
SELECT
482+
"Orders".status AS status,
483+
COUNT(*) AS count
484+
FROM
485+
"Orders"
486+
INNER JOIN
487+
(
488+
SELECT
489+
status,
490+
SUM(totalAmount)
491+
FROM
492+
"Orders"
493+
GROUP BY 1
494+
ORDER BY 2 DESC
495+
LIMIT 2
496+
) top_orders
497+
ON
498+
"Orders".status = top_orders.status
499+
GROUP BY 1
500+
ORDER BY 1
501+
`;
502+
503+
const res = await connection.query(query);
504+
// Expect only top statuses 2 by total amount: processed and shipped
505+
expect(res.rows).toMatchSnapshot('join grouped');
506+
});
507+
508+
test('join with filtered grouped query', async () => {
509+
const query = `
510+
SELECT
511+
"Orders".status AS status,
512+
COUNT(*) AS count
513+
FROM
514+
"Orders"
515+
INNER JOIN
516+
(
517+
SELECT
518+
status,
519+
SUM(totalAmount)
520+
FROM
521+
"Orders"
522+
WHERE
523+
status NOT IN ('shipped')
524+
GROUP BY 1
525+
ORDER BY 2 DESC
526+
LIMIT 2
527+
) top_orders
528+
ON
529+
"Orders".status = top_orders.status
530+
GROUP BY 1
531+
`;
532+
533+
const res = await connection.query(query);
534+
// Expect only top statuses 2 by total amount, with shipped filtered out: processed and new
535+
expect(res.rows).toMatchSnapshot('join grouped with filter');
536+
});
537+
479538
test('where segment is false', async () => {
480539
const query =
481540
'SELECT value AS val, * FROM "SegmentTest" WHERE segment_eq_1 IS FALSE ORDER BY value;';

rust/cubesql/cubeclient/.openapi-generator/FILES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ src/models/v1_load_request_query_filter_base.rs
1616
src/models/v1_load_request_query_filter_item.rs
1717
src/models/v1_load_request_query_filter_logical_and.rs
1818
src/models/v1_load_request_query_filter_logical_or.rs
19+
src/models/v1_load_request_query_join_subquery.rs
1920
src/models/v1_load_request_query_time_dimension.rs
2021
src/models/v1_load_response.rs
2122
src/models/v1_load_result.rs

rust/cubesql/cubeclient/src/models/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub mod v1_load_request_query_filter_logical_and;
2828
pub use self::v1_load_request_query_filter_logical_and::V1LoadRequestQueryFilterLogicalAnd;
2929
pub mod v1_load_request_query_filter_logical_or;
3030
pub use self::v1_load_request_query_filter_logical_or::V1LoadRequestQueryFilterLogicalOr;
31+
pub mod v1_load_request_query_join_subquery;
32+
pub use self::v1_load_request_query_join_subquery::V1LoadRequestQueryJoinSubquery;
3133
pub mod v1_load_request_query_time_dimension;
3234
pub use self::v1_load_request_query_time_dimension::V1LoadRequestQueryTimeDimension;
3335
pub mod v1_load_response;

rust/cubesql/cubeclient/src/models/v1_load_request_query.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub struct V1LoadRequestQuery {
2828
pub filters: Option<Vec<crate::models::V1LoadRequestQueryFilterItem>>,
2929
#[serde(rename = "ungrouped", skip_serializing_if = "Option::is_none")]
3030
pub ungrouped: Option<bool>,
31+
#[serde(rename = "subqueryJoins", skip_serializing_if = "Option::is_none")]
32+
pub subquery_joins: Option<Vec<crate::models::V1LoadRequestQueryJoinSubquery>>,
3133
}
3234

3335
impl V1LoadRequestQuery {
@@ -42,6 +44,7 @@ impl V1LoadRequestQuery {
4244
offset: None,
4345
filters: None,
4446
ungrouped: None,
47+
subquery_joins: None,
4548
}
4649
}
4750
}

0 commit comments

Comments
 (0)