Skip to content

Commit e7cf108

Browse files
authored
refactor(cubesql): Transport - hide cubeclient with own types (#8582)
1 parent df3334c commit e7cf108

File tree

13 files changed

+165
-148
lines changed

13 files changed

+165
-148
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ cubenativeutils = { path = "../../rust/cubenativeutils" }
2121
async-channel = { version = "2" }
2222
async-trait = "0.1.36"
2323
convert_case = "0.6.0"
24-
cubeclient = { path = "../../rust/cubesql/cubeclient" }
2524
pin-project = "1.1.5"
2625
cubesql = { path = "../../rust/cubesql/cubesql" }
2726
findshlibs = "0.10.2"

packages/cubejs-backend-native/src/transport.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ use std::collections::HashMap;
44
use std::fmt::Display;
55

66
use async_trait::async_trait;
7-
use cubeclient::models::{V1Error, V1LoadRequestQuery, V1LoadResponse, V1MetaResponse};
87
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
98
use cubesql::compile::engine::df::wrapper::SqlQuery;
10-
use cubesql::transport::{SpanId, SqlGenerator, SqlResponse};
9+
use cubesql::transport::{
10+
SpanId, SqlGenerator, SqlResponse, TransportError, TransportLoadRequestQuery,
11+
TransportLoadResponse, TransportMetaResponse,
12+
};
1113
use cubesql::{
1214
di_service,
1315
sql::AuthContextRef,
@@ -76,7 +78,7 @@ struct CanSwitchUserForSessionRequest {
7678
#[derive(Debug, Serialize)]
7779
struct LoadRequest {
7880
request: TransportRequest,
79-
query: V1LoadRequestQuery,
81+
query: TransportLoadRequestQuery,
8082
#[serde(rename = "sqlQuery", skip_serializing_if = "Option::is_none")]
8183
sql_query: Option<(String, Vec<Option<String>>)>,
8284
session: SessionContext,
@@ -129,7 +131,7 @@ impl TransportService for NodeBridgeTransport {
129131
only_compiler_id: false,
130132
})?;
131133

132-
let response = call_js_with_channel_as_callback::<V1MetaResponse>(
134+
let response = call_js_with_channel_as_callback::<TransportMetaResponse>(
133135
self.channel.clone(),
134136
self.on_meta.clone(),
135137
Some(extra.clone()),
@@ -229,7 +231,7 @@ impl TransportService for NodeBridgeTransport {
229231
},
230232
only_compiler_id: true,
231233
})?;
232-
let response = call_js_with_channel_as_callback::<V1MetaResponse>(
234+
let response = call_js_with_channel_as_callback::<TransportMetaResponse>(
233235
self.channel.clone(),
234236
self.on_meta.clone(),
235237
Some(extra.clone()),
@@ -251,7 +253,7 @@ impl TransportService for NodeBridgeTransport {
251253
async fn sql(
252254
&self,
253255
span_id: Option<Arc<SpanId>>,
254-
query: V1LoadRequestQuery,
256+
query: TransportLoadRequestQuery,
255257
ctx: AuthContextRef,
256258
meta: LoadRequestMeta,
257259
member_to_alias: Option<HashMap<String, String>>,
@@ -328,11 +330,11 @@ impl TransportService for NodeBridgeTransport {
328330
async fn load(
329331
&self,
330332
span_id: Option<Arc<SpanId>>,
331-
query: V1LoadRequestQuery,
333+
query: TransportLoadRequestQuery,
332334
sql_query: Option<SqlQuery>,
333335
ctx: AuthContextRef,
334336
meta: LoadRequestMeta,
335-
) -> Result<V1LoadResponse, CubeError> {
337+
) -> Result<TransportLoadResponse, CubeError> {
336338
trace!("[transport] Request ->");
337339

338340
let native_auth = ctx
@@ -381,14 +383,14 @@ impl TransportService for NodeBridgeTransport {
381383
#[cfg(not(debug_assertions))]
382384
trace!("[transport] Request <- <hidden>");
383385

384-
let load_err = match serde_json::from_value::<V1LoadResponse>(response.clone()) {
386+
let load_err = match serde_json::from_value::<TransportLoadResponse>(response.clone()) {
385387
Ok(r) => {
386388
return Ok(r);
387389
}
388390
Err(err) => err,
389391
};
390392

391-
if let Ok(res) = serde_json::from_value::<V1Error>(response) {
393+
if let Ok(res) = serde_json::from_value::<TransportError>(response) {
392394
if res.error.to_lowercase() == *"continue wait" {
393395
debug!(
394396
"[transport] load - retrying request (continue wait) requestId: {}",
@@ -413,7 +415,7 @@ impl TransportService for NodeBridgeTransport {
413415
async fn load_stream(
414416
&self,
415417
span_id: Option<Arc<SpanId>>,
416-
query: V1LoadRequestQuery,
418+
query: TransportLoadRequestQuery,
417419
sql_query: Option<SqlQuery>,
418420
ctx: AuthContextRef,
419421
meta: LoadRequestMeta,

rust/cubesql/cubesql/src/compile/engine/context.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{any::Any, collections::HashMap, sync::Arc};
22

33
use async_trait::async_trait;
4-
use cubeclient::models::V1CubeMeta;
54
use datafusion::{
65
arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
76
datasource::{self, TableProvider},
@@ -15,7 +14,7 @@ use datafusion::{
1514
use crate::{
1615
compile::{DatabaseProtocolDetails, MetaContext},
1716
sql::{ColumnType, SessionManager, SessionState},
18-
transport::V1CubeMetaExt,
17+
transport::{CubeMeta, V1CubeMetaExt},
1918
CubeError,
2019
};
2120

@@ -92,11 +91,11 @@ pub trait TableName {
9291
}
9392

9493
pub struct CubeTableProvider {
95-
cube: V1CubeMeta,
94+
cube: CubeMeta,
9695
}
9796

9897
impl CubeTableProvider {
99-
pub fn new(cube: V1CubeMeta) -> Self {
98+
pub fn new(cube: CubeMeta) -> Self {
10099
Self { cube }
101100
}
102101
}

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@ use crate::{
1414
sql::AuthContextRef,
1515
transport::{
1616
AliasedColumn, LoadRequestMeta, MetaContext, SpanId, SqlGenerator, SqlTemplates,
17-
TransportService,
17+
TransportLoadRequestQuery, TransportService,
1818
},
1919
CubeError,
2020
};
2121
use chrono::{Days, NaiveDate, SecondsFormat, TimeZone, Utc};
22-
use cubeclient::models::V1LoadRequestQuery;
2322
use datafusion::{
2423
error::{DataFusionError, Result},
2524
logical_plan::{
@@ -200,7 +199,7 @@ pub struct CubeScanWrapperNode {
200199
pub meta: Arc<MetaContext>,
201200
pub auth_context: AuthContextRef,
202201
pub wrapped_sql: Option<SqlQuery>,
203-
pub request: Option<V1LoadRequestQuery>,
202+
pub request: Option<TransportLoadRequestQuery>,
204203
pub member_fields: Option<Vec<MemberField>>,
205204
pub span_id: Option<Arc<SpanId>>,
206205
pub config_obj: Arc<dyn ConfigObj>,
@@ -229,7 +228,7 @@ impl CubeScanWrapperNode {
229228
pub fn with_sql_and_request(
230229
&self,
231230
sql: SqlQuery,
232-
request: V1LoadRequestQuery,
231+
request: TransportLoadRequestQuery,
233232
member_fields: Vec<MemberField>,
234233
) -> Self {
235234
Self {
@@ -258,7 +257,7 @@ pub struct SqlGenerationResult {
258257
pub from_alias: Option<String>,
259258
pub column_remapping: Option<HashMap<Column, Column>>,
260259
pub sql: SqlQuery,
261-
pub request: V1LoadRequestQuery,
260+
pub request: TransportLoadRequestQuery,
262261
}
263262

264263
lazy_static! {
@@ -925,7 +924,7 @@ impl CubeScanWrapperNode {
925924
from_alias: None,
926925
sql: SqlQuery::new("".to_string(), values.clone()),
927926
column_remapping: None,
928-
request: V1LoadRequestQuery::new(),
927+
request: TransportLoadRequestQuery::new(),
929928
}),
930929
// LogicalPlan::Distinct(_) => {}
931930
x => {

rust/cubesql/cubesql/src/compile/engine/information_schema/postgres/columns.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{any::Any, sync::Arc};
22

33
use async_trait::async_trait;
4-
use cubeclient::models::V1CubeMeta;
54
use datafusion::{
65
arrow::{
76
array::{Array, ArrayRef, StringBuilder, UInt32Builder},
@@ -14,7 +13,7 @@ use datafusion::{
1413
physical_plan::{memory::MemoryExec, ExecutionPlan},
1514
};
1615

17-
use crate::transport::{CubeColumn, V1CubeMetaExt};
16+
use crate::transport::{CubeColumn, CubeMeta, V1CubeMetaExt};
1817

1918
use super::{
2019
ext::CubeColumnPostgresExt,
@@ -312,7 +311,7 @@ pub struct InfoSchemaColumnsProvider {
312311
}
313312

314313
impl InfoSchemaColumnsProvider {
315-
pub fn new(db_name: &str, cubes: &Vec<V1CubeMeta>) -> Self {
314+
pub fn new(db_name: &str, cubes: &Vec<CubeMeta>) -> Self {
316315
let mut builder = InformationSchemaColumnsBuilder::new();
317316

318317
for cube in cubes {

rust/cubesql/cubesql/src/compile/engine/information_schema/redshift/svv_tables.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{any::Any, sync::Arc};
22

3+
use crate::transport::CubeMeta;
34
use async_trait::async_trait;
4-
use cubeclient::models::V1CubeMeta;
55
use datafusion::{
66
arrow::{
77
array::{Array, ArrayRef, StringBuilder},
@@ -68,7 +68,7 @@ pub struct RedshiftSvvTablesTableProvider {
6868
}
6969

7070
impl RedshiftSvvTablesTableProvider {
71-
pub fn new(db_name: &str, cubes: &Vec<V1CubeMeta>) -> Self {
71+
pub fn new(db_name: &str, cubes: &Vec<CubeMeta>) -> Self {
7272
let mut builder = RedshiftSvvTablesBuilder::new(cubes.len());
7373

7474
for cube in cubes {

0 commit comments

Comments
 (0)