Skip to content

Commit b97268f

Browse files
authored
fix(cubesql): Remove excessive limit on inner wrapped queries (#7864)
1 parent 00a6ec0 commit b97268f

File tree

15 files changed

+180
-31
lines changed

15 files changed

+180
-31
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,7 +1180,8 @@ class ApiGateway {
11801180
exportAnnotatedSql,
11811181
memberExpressions,
11821182
expressionParams,
1183-
disableExternalPreAggregations
1183+
disableExternalPreAggregations,
1184+
disableLimitEnforcing,
11841185
}: QueryRequest) {
11851186
const requestStarted = new Date();
11861187

@@ -1193,7 +1194,7 @@ class ApiGateway {
11931194
query = this.parseMemberExpressionsInQueries(query);
11941195
}
11951196

1196-
const [queryType, normalizedQueries] = await this.getNormalizedQueries(query, context);
1197+
const [queryType, normalizedQueries] = await this.getNormalizedQueries(query, context, disableLimitEnforcing);
11971198

11981199
const sqlQueries = await Promise.all<any>(
11991200
normalizedQueries.map(async (normalizedQuery) => (await this.getCompilerApi(context)).getSql(

packages/cubejs-api-gateway/src/sql-server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ export class SQLServer {
159159
memberExpressions: true,
160160
disableExternalPreAggregations: true,
161161
queryType: 'multi',
162+
disableLimitEnforcing: true,
162163
context,
163164
res: (message) => {
164165
resolve(message);

packages/cubejs-api-gateway/src/types/request.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ type QueryRequest = BaseRequest & {
127127
exportAnnotatedSql?: boolean;
128128
memberExpressions?: boolean;
129129
disableExternalPreAggregations?: boolean;
130+
disableLimitEnforcing?: boolean;
130131
};
131132

132133
type SqlApiRequest = BaseRequest & {

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2500,10 +2500,12 @@ export class BaseQuery {
25002500
},
25012501
statements: {
25022502
select: 'SELECT {{ select_concat | map(attribute=\'aliased\') | join(\', \') }} \n' +
2503-
'FROM (\n {{ from }}\n) AS {{ from_alias }} \n' +
2504-
'{% if filter %} WHERE {{ filter }}{% endif %}' +
2505-
'{% if group_by %} GROUP BY {{ group_by | map(attribute=\'index\') | join(\', \') }}{% endif %}' +
2506-
'{% if order_by %} ORDER BY {{ order_by | map(attribute=\'expr\') | join(\', \') }}{% endif %}' +
2503+
'FROM (\n' +
2504+
'{{ from | indent(2, true) }}\n' +
2505+
') AS {{ from_alias }}' +
2506+
'{% if filter %}\nWHERE {{ filter }}{% endif %}' +
2507+
'{% if group_by %}\nGROUP BY {{ group_by | map(attribute=\'index\') | join(\', \') }}{% endif %}' +
2508+
'{% if order_by %}\nORDER BY {{ order_by | map(attribute=\'expr\') | join(\', \') }}{% endif %}' +
25072509
'{% if limit %}\nLIMIT {{ limit }}{% endif %}' +
25082510
'{% if offset %}\nOFFSET {{ offset }}{% endif %}',
25092511
},
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
cube(`BigOrders`, {
2+
sql: `
3+
select * from (
4+
select 1 as id, 100 as amount, 'new' status, '2024-01-01'::timestamptz created_at
5+
UNION ALL
6+
select 2 as id, 200 as amount, 'new' status, '2024-01-02'::timestamptz created_at
7+
UNION ALL
8+
select 3 as id, 300 as amount, 'processed' status, '2024-01-03'::timestamptz created_at
9+
UNION ALL
10+
select 4 as id, 500 as amount, 'processed' status, '2024-01-04'::timestamptz created_at
11+
UNION ALL
12+
select 5 as id, 600 as amount, 'shipped' status, '2024-01-05'::timestamptz created_at
13+
) data
14+
CROSS JOIN GENERATE_SERIES(1, 20000) value
15+
`,
16+
measures: {
17+
totalAmount: {
18+
sql: `amount`,
19+
type: `sum`,
20+
},
21+
toRemove: {
22+
type: `count`,
23+
},
24+
},
25+
dimensions: {
26+
id: {
27+
sql: `id`,
28+
type: `number`,
29+
primaryKey: true,
30+
},
31+
32+
status: {
33+
sql: `status`,
34+
type: `string`,
35+
},
36+
37+
createdAt: {
38+
sql: `created_at`,
39+
type: `time`
40+
}
41+
},
42+
});

packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ Array [
1717
]
1818
`;
1919

20+
exports[`SQL API Postgres (Data) no limit for non matching count push down: no limit for non matching count push down 1`] = `
21+
Array [
22+
Object {
23+
"a0": 2024-01-05T00:00:00.000Z,
24+
"a1": 2024-01-01T00:00:00.000Z,
25+
"a2": "100000",
26+
},
27+
]
28+
`;
29+
2030
exports[`SQL API Postgres (Data) powerbi min max push down: powerbi_min_max_push_down 1`] = `
2131
Array [
2232
Object {

packages/cubejs-testing/test/smoke-cubesql.test.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,17 @@ from
180180
`);
181181
expect(res.rows).toMatchSnapshot('powerbi_min_max_push_down');
182182
});
183+
184+
test('no limit for non matching count push down', async () => {
185+
const res = await connection.query(`
186+
select
187+
max("rows"."createdAt") as "a0",
188+
min("rows"."createdAt") as "a1",
189+
count(*) as "a2"
190+
from
191+
"public"."BigOrders" "rows"
192+
`);
193+
expect(res.rows).toMatchSnapshot('no limit for non matching count push down');
194+
});
183195
});
184196
});

rust/cubesql/cubesql/src/compile/engine/df/planner.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use crate::config::ConfigObj;
34
use async_trait::async_trait;
45
use datafusion::{
56
error::Result,
@@ -15,11 +16,20 @@ use super::scan::CubeScanExtensionPlanner;
1516
pub struct CubeQueryPlanner {
1617
pub transport: Arc<dyn TransportService>,
1718
pub meta: LoadRequestMeta,
19+
pub config_obj: Arc<dyn ConfigObj>,
1820
}
1921

2022
impl CubeQueryPlanner {
21-
pub fn new(transport: Arc<dyn TransportService>, meta: LoadRequestMeta) -> Self {
22-
Self { transport, meta }
23+
pub fn new(
24+
transport: Arc<dyn TransportService>,
25+
meta: LoadRequestMeta,
26+
config_obj: Arc<dyn ConfigObj>,
27+
) -> Self {
28+
Self {
29+
transport,
30+
meta,
31+
config_obj,
32+
}
2333
}
2434
}
2535

@@ -36,6 +46,7 @@ impl QueryPlanner for CubeQueryPlanner {
3646
CubeScanExtensionPlanner {
3747
transport: self.transport.clone(),
3848
meta: self.meta.clone(),
49+
config_obj: self.config_obj.clone(),
3950
},
4051
)]);
4152
// Delegate most work of physical planning to the default physical planner

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::{
3434
find_cube_scans_deep_search,
3535
rewrite::WrappedSelectType,
3636
},
37+
config::ConfigObj,
3738
sql::AuthContextRef,
3839
transport::{CubeStreamReceiver, LoadRequestMeta, SpanId, TransportService},
3940
CubeError,
@@ -341,6 +342,7 @@ impl UserDefinedLogicalNode for WrappedSelectNode {
341342
pub struct CubeScanExtensionPlanner {
342343
pub transport: Arc<dyn TransportService>,
343344
pub meta: LoadRequestMeta,
345+
pub config_obj: Arc<dyn ConfigObj>,
344346
}
345347

346348
impl ExtensionPlanner for CubeScanExtensionPlanner {
@@ -369,6 +371,7 @@ impl ExtensionPlanner for CubeScanExtensionPlanner {
369371
options: scan_node.options.clone(),
370372
meta: self.meta.clone(),
371373
span_id: scan_node.span_id.clone(),
374+
config_obj: self.config_obj.clone(),
372375
}))
373376
} else if let Some(wrapper_node) = node.as_any().downcast_ref::<CubeScanWrapperNode>() {
374377
// TODO
@@ -404,6 +407,7 @@ impl ExtensionPlanner for CubeScanExtensionPlanner {
404407
options: scan_node.options.clone(),
405408
meta: self.meta.clone(),
406409
span_id: scan_node.span_id.clone(),
410+
config_obj: self.config_obj.clone(),
407411
}))
408412
} else {
409413
None
@@ -426,6 +430,7 @@ struct CubeScanExecutionPlan {
426430
// injected by extension planner
427431
meta: LoadRequestMeta,
428432
span_id: Option<Arc<SpanId>>,
433+
config_obj: Arc<dyn ConfigObj>,
429434
}
430435

431436
#[derive(Debug)]
@@ -582,14 +587,8 @@ impl ExecutionPlan for CubeScanExecutionPlan {
582587
_context: Arc<TaskContext>,
583588
) -> Result<SendableRecordBatchStream> {
584589
// TODO: move envs to config
585-
let stream_mode = std::env::var("CUBESQL_STREAM_MODE")
586-
.ok()
587-
.map(|v| v.parse::<bool>().unwrap())
588-
.unwrap_or(false);
589-
let query_limit = std::env::var("CUBEJS_DB_QUERY_LIMIT")
590-
.ok()
591-
.map(|v| v.parse::<i32>().unwrap())
592-
.unwrap_or(50000);
590+
let stream_mode = self.config_obj.stream_mode();
591+
let query_limit = self.config_obj.non_streaming_query_max_row_limit();
593592

594593
let stream_mode = match (stream_mode, self.request.limit) {
595594
(true, None) => true,
@@ -1406,6 +1405,7 @@ mod tests {
14061405
transport: get_test_transport(),
14071406
meta: get_test_load_meta(DatabaseProtocol::PostgreSQL),
14081407
span_id: None,
1408+
config_obj: crate::config::Config::test().config_obj(),
14091409
};
14101410

14111411
let runtime = Arc::new(

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::{
33
engine::df::scan::{CubeScanNode, DataType, MemberField, WrappedSelectNode},
44
rewrite::WrappedSelectType,
55
},
6+
config::ConfigObj,
67
sql::AuthContextRef,
78
transport::{
89
AliasedColumn, LoadRequestMeta, MetaContext, SpanId, SqlGenerator, SqlTemplates,
@@ -107,6 +108,7 @@ pub struct CubeScanWrapperNode {
107108
pub request: Option<V1LoadRequestQuery>,
108109
pub member_fields: Option<Vec<MemberField>>,
109110
pub span_id: Option<Arc<SpanId>>,
111+
pub config_obj: Arc<dyn ConfigObj>,
110112
}
111113

112114
impl CubeScanWrapperNode {
@@ -115,6 +117,7 @@ impl CubeScanWrapperNode {
115117
meta: Arc<MetaContext>,
116118
auth_context: AuthContextRef,
117119
span_id: Option<Arc<SpanId>>,
120+
config_obj: Arc<dyn ConfigObj>,
118121
) -> Self {
119122
Self {
120123
wrapped_plan,
@@ -124,6 +127,7 @@ impl CubeScanWrapperNode {
124127
request: None,
125128
member_fields: None,
126129
span_id,
130+
config_obj,
127131
}
128132
}
129133

@@ -141,6 +145,7 @@ impl CubeScanWrapperNode {
141145
request: Some(request),
142146
member_fields: Some(member_fields),
143147
span_id: self.span_id.clone(),
148+
config_obj: self.config_obj.clone(),
144149
}
145150
}
146151
}
@@ -214,11 +219,12 @@ impl CubeScanWrapperNode {
214219
load_request_meta: Arc<LoadRequestMeta>,
215220
) -> result::Result<Self, CubeError> {
216221
let schema = self.schema();
222+
let wrapped_plan = self.wrapped_plan.clone();
217223
let (sql, request, member_fields) = Self::generate_sql_for_node(
218224
Arc::new(self.clone()),
219225
transport,
220226
load_request_meta,
221-
self.wrapped_plan.clone(),
227+
self.clone().set_max_limit_for_node(wrapped_plan),
222228
true,
223229
)
224230
.await
@@ -256,6 +262,45 @@ impl CubeScanWrapperNode {
256262
Ok(self.with_sql_and_request(sql, request, member_fields))
257263
}
258264

265+
pub fn set_max_limit_for_node(self, node: Arc<LogicalPlan>) -> Arc<LogicalPlan> {
266+
let stream_mode = self.config_obj.stream_mode();
267+
if stream_mode {
268+
return node;
269+
}
270+
271+
let query_limit = self.config_obj.non_streaming_query_max_row_limit();
272+
match node.as_ref() {
273+
LogicalPlan::Extension(Extension {
274+
node: extension_node,
275+
}) => {
276+
let cube_scan_node = extension_node
277+
.as_any()
278+
.downcast_ref::<CubeScanNode>()
279+
.cloned();
280+
let wrapped_select_node = extension_node
281+
.as_any()
282+
.downcast_ref::<WrappedSelectNode>()
283+
.cloned();
284+
if let Some(node) = cube_scan_node {
285+
let mut new_node = node.clone();
286+
new_node.request.limit = Some(query_limit);
287+
Arc::new(LogicalPlan::Extension(Extension {
288+
node: Arc::new(new_node),
289+
}))
290+
} else if let Some(node) = wrapped_select_node {
291+
let mut new_node = node.clone();
292+
new_node.limit = Some(query_limit as usize);
293+
Arc::new(LogicalPlan::Extension(Extension {
294+
node: Arc::new(new_node),
295+
}))
296+
} else {
297+
node.clone()
298+
}
299+
}
300+
_ => node.clone(),
301+
}
302+
}
303+
259304
pub fn generate_sql_for_node(
260305
plan: Arc<Self>,
261306
transport: Arc<dyn TransportService>,
@@ -1787,6 +1832,7 @@ impl UserDefinedLogicalNode for CubeScanWrapperNode {
17871832
request: self.request.clone(),
17881833
member_fields: self.member_fields.clone(),
17891834
span_id: self.span_id.clone(),
1835+
config_obj: self.config_obj.clone(),
17901836
})
17911837
}
17921838
}

0 commit comments

Comments
 (0)