Skip to content

Commit 0ce89ca

Browse files
Ceng23333zenghua
andauthored
[Rust]Fix Flightsql consistency aligned to Flink (lakesoul-io#591)
* fix flightsql consistency for non CDC query/update abd CDC query Signed-off-by: zenghua <huazeng@dmetasoul.com> * fix cdc update with flight service Signed-off-by: zenghua <huazeng@dmetasoul.com> * remove upsert_with_io_config Signed-off-by: zenghua <huazeng@dmetasoul.com> * rollback rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * fix ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * disable testFlightData case Signed-off-by: zenghua <huazeng@dmetasoul.com> * fix do_get_tables Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> * update rust ci Signed-off-by: zenghua <huazeng@dmetasoul.com> --------- Signed-off-by: zenghua <huazeng@dmetasoul.com> Co-authored-by: zenghua <huazeng@dmetasoul.com>
1 parent e6099d2 commit 0ce89ca

File tree

26 files changed

+649
-1767
lines changed

26 files changed

+649
-1767
lines changed

.github/workflows/rust-ci.yml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ jobs:
4747

4848
steps:
4949
- uses: actions/checkout@v4
50+
- name: Install Rust toolchain
51+
uses: dtolnay/rust-toolchain@master
52+
with:
53+
toolchain: 1.81.0
54+
components: rustfmt, clippy
5055
- name: Install requirement
5156
uses: ConorMacBride/install-package@v1
5257
with:
@@ -58,6 +63,10 @@ jobs:
5863
uses: arduino/setup-protoc@v2
5964
with:
6065
version: "23.x"
61-
# - name: Run tests
62-
# run: cd rust && sudo cargo test --package lakesoul-datafusion
66+
- name: Build and test
67+
working-directory: ./rust
68+
run: |
69+
rm -f Cargo.lock
70+
cargo clean
71+
cargo test --package lakesoul-datafusion
6372

lakesoul-flink/src/test/java/org/apache/flink/lakesoul/test/flinkSource/DMLSuite.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,4 +474,14 @@ private void createLakeSoulPartitionedCDCSourceTableUser(TableEnvironment tEnvs)
474474
tEnvs.executeSql(createUserSql);
475475
}
476476

477+
// @Test
478+
public void testFlightData() {
479+
TableEnvironment tEnv = TestUtils.createTableEnv(BATCH_TYPE);
480+
StreamTableEnvironment streamEnv = TestUtils.createStreamTableEnv(BATCH_TYPE);
481+
String testSelect = "select * from lakesoul_test_table";
482+
TableImpl flinkTable = (TableImpl) streamEnv.sqlQuery(testSelect);
483+
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
484+
streamEnv.executeSql("select count(1) from lakesoul_test_table").print();
485+
System.out.println(results);
486+
}
477487
}

rust/lakesoul-datafusion/src/catalog/lakesoul_namespace.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct LakeSoulNamespace {
3131

3232
impl LakeSoulNamespace {
3333
pub fn new(meta_data_client_ref: MetaDataClientRef, context: Arc<SessionContext>, namespace: &str) -> Self {
34-
info!("new namespace: {:?}", namespace);
34+
debug!("LakeSoulNamespace::new - Creating new namespace: {}", namespace);
3535
Self {
3636
metadata_client: meta_data_client_ref,
3737
context,
@@ -40,14 +40,17 @@ impl LakeSoulNamespace {
4040
}
4141

4242
pub fn metadata_client(&self) -> MetaDataClientRef {
43+
debug!("LakeSoulNamespace::metadata_client - Getting metadata client");
4344
self.metadata_client.clone()
4445
}
4546

4647
pub fn context(&self) -> Arc<SessionContext> {
48+
debug!("LakeSoulNamespace::context - Getting session context");
4749
self.context.clone()
4850
}
4951

5052
pub fn namespace(&self) -> &str {
53+
debug!("LakeSoulNamespace::namespace - Getting namespace: {}", &self.namespace);
5154
&self.namespace
5255
}
5356

@@ -62,20 +65,24 @@ impl Debug for LakeSoulNamespace {
6265
#[async_trait]
6366
impl SchemaProvider for LakeSoulNamespace {
6467
fn as_any(&self) -> &dyn Any {
68+
debug!("LakeSoulNamespace::as_any called");
6569
self
6670
}
6771

6872
/// query table_name_id by namespace
6973
fn table_names(&self) -> Vec<String> {
74+
debug!("LakeSoulNamespace::table_names - Getting all tables for namespace: {}", &self.namespace);
7075
let client = self.metadata_client.clone();
7176
let np = self.namespace.clone();
7277
futures::executor::block_on(async move {
7378
Handle::current()
7479
.spawn(async move {
75-
client
80+
let table_name_ids = client
7681
.get_all_table_name_id_by_namespace(&np)
7782
.await
78-
.expect("get all table name failed")
83+
.expect("get all table name failed");
84+
debug!("table_name_ids: {:?}", table_name_ids);
85+
table_name_ids
7986
})
8087
.await
8188
.expect("spawn failed")
@@ -88,20 +95,21 @@ impl SchemaProvider for LakeSoulNamespace {
8895
/// Search table by name
8996
/// return LakeSoulListing table
9097
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
98+
debug!("LakeSoulNamespace::table - Looking up table '{}' in namespace '{}'", name, &self.namespace);
9199
let name = case_fold_table_name(name);
92-
info!("table: {:?} {:?}", name, &self.namespace);
93100
let table = match LakeSoulTable::for_namespace_and_name(&self.namespace, &name).await {
94101
Ok(t) => t,
95102
Err(_) => return Ok(None),
96103
};
104+
debug!("LakeSoulNamespace::table - Table found: {:?}", table.table_info());
97105
Ok(table.as_sink_provider(&self.context.state()).await.ok())
98106
}
99107

100108
/// If supported by the implementation, adds a new table to this schema.
101109
/// If a table of the same name existed before, it returns "Table already exists" error.
102110
#[allow(unused_variables)]
103111
fn register_table(&self, name: String, table: Arc<dyn TableProvider>) -> Result<Option<Arc<dyn TableProvider>>> {
104-
info!("register_table: {:?} {:?}", name, &self.namespace);
112+
debug!("LakeSoulNamespace::register_table - Registering table '{}' in namespace '{}'", name, &self.namespace);
105113
// 获取表的 schema
106114
let schema = table.schema();
107115

@@ -125,6 +133,7 @@ impl SchemaProvider for LakeSoulNamespace {
125133
/// If no table of that name exists, returns Ok(None).
126134
#[allow(unused_variables)]
127135
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
136+
debug!("LakeSoulNamespace::deregister_table - Deregistering table '{}' from namespace '{}'", name, &self.namespace);
128137
let name = case_fold_table_name(name);
129138
info!("deregister_table: {:?} {:?}", name, &self.namespace);
130139
let client = self.metadata_client.clone();
@@ -157,6 +166,7 @@ impl SchemaProvider for LakeSoulNamespace {
157166
}
158167

159168
fn table_exist(&self, name: &str) -> bool {
169+
debug!("LakeSoulNamespace::table_exist - Checking existence of table '{}' in namespace '{}'", name, &self.namespace);
160170
info!("table_exist: {:?} {:?}", name, &self.namespace);
161171
// table name is primary key for `table_name_id`
162172
let client = self.metadata_client.clone();
@@ -169,7 +179,10 @@ impl SchemaProvider for LakeSoulNamespace {
169179
.get_all_table_name_id_by_namespace(&np)
170180
.await
171181
.expect("get table name failed");
172-
table_name_ids.into_iter().map(|v| v.table_name).any(|s| s.eq_ignore_ascii_case(&name))
182+
debug!("table_name_ids: {:?}, target: {}", table_name_ids, &name);
183+
let found = table_name_ids.into_iter().map(|v| v.table_name).any(|s| s.eq_ignore_ascii_case(&name));
184+
debug!("table_exist: {:?} {:?}", name, found);
185+
found
173186
})
174187
.await
175188
.expect("spawn failed")

rust/lakesoul-datafusion/src/catalog/mod.rs

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22
//
33
// SPDX-License-Identifier: Apache-2.0
44

5+
use arrow::datatypes::{Schema, SchemaRef};
56
use datafusion::sql::TableReference;
67
use log::info;
8+
use serde::Deserialize;
79
use std::collections::HashMap;
810
use std::env;
911
use std::fmt::Debug;
1012
use std::sync::Arc;
1113
use std::time::SystemTime;
1214

1315
use lakesoul_io::lakesoul_io_config::{LakeSoulIOConfig, LakeSoulIOConfigBuilder};
14-
use lakesoul_metadata::MetaDataClientRef;
16+
use lakesoul_metadata::{LakeSoulMetaDataError, MetaDataClientRef};
1517
use proto::proto::entity::{CommitOp, DataCommitInfo, DataFileOp, FileOp, TableInfo, Uuid};
1618
use crate::error::{LakeSoulError, Result};
1719
use crate::lakesoul_table::helpers::create_io_config_builder_from_table_info;
@@ -25,12 +27,44 @@ pub use lakesoul_catalog::*;
2527
mod lakesoul_namespace;
2628
pub use lakesoul_namespace::*;
2729

30+
fn deserialize_hash_bucket_num<'de, D>(deserializer: D) -> std::result::Result<Option<usize>, D::Error>
31+
where
32+
D: serde::Deserializer<'de>,
33+
{
34+
#[derive(Deserialize)]
35+
#[serde(untagged)]
36+
enum StringOrNum {
37+
String(String),
38+
Number(usize),
39+
}
40+
41+
let opt = Option::deserialize(deserializer)?;
42+
match opt {
43+
None => Ok(None),
44+
Some(StringOrNum::String(s)) => {
45+
s.parse::<usize>().map(Some).map_err(serde::de::Error::custom)
46+
}
47+
Some(StringOrNum::Number(n)) => Ok(Some(n)),
48+
}
49+
}
50+
2851
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
2952
pub struct LakeSoulTableProperty {
30-
#[serde(rename = "hashBucketNum", skip_serializing_if = "Option::is_none")]
53+
#[serde(
54+
rename = "hashBucketNum",
55+
default,
56+
skip_serializing_if = "Option::is_none",
57+
deserialize_with = "deserialize_hash_bucket_num"
58+
)]
3159
pub hash_bucket_num: Option<usize>,
3260
#[serde(rename = "datafusionProperties", skip_serializing_if = "Option::is_none")]
3361
pub datafusion_properties: Option<HashMap<String, String>>,
62+
#[serde(rename = "partitions", default, skip_serializing_if = "Option::is_none")]
63+
pub partitions: Option<String>,
64+
#[serde(rename = "lakesoul_cdc_change_column", default, skip_serializing_if = "Option::is_none")]
65+
pub cdc_change_column: Option<String>,
66+
#[serde(rename = "use_cdc", default, skip_serializing_if = "Option::is_none")]
67+
pub use_cdc: Option<String>,
3468
}
3569

3670
pub(crate) async fn create_table(client: MetaDataClientRef, table_name: &str, config: LakeSoulIOConfig) -> Result<()> {
@@ -84,12 +118,19 @@ pub(crate) async fn create_io_config_builder(
84118
) -> Result<LakeSoulIOConfigBuilder> {
85119
if let Some(table_name) = table_name {
86120
let table_info = client.get_table_info_by_table_name(table_name, namespace).await?;
87-
let data_files = if fetch_files {
88-
client.get_data_files_by_table_name(table_name, namespace).await?
121+
if let Some(table_info) = table_info {
122+
let data_files = if fetch_files {
123+
client.get_data_files_by_table_name(table_name, namespace).await?
124+
} else {
125+
vec![]
126+
};
127+
create_io_config_builder_from_table_info(Arc::new(table_info), options, object_store_options).map(|builder| builder.with_files(data_files))
89128
} else {
90-
vec![]
91-
};
92-
create_io_config_builder_from_table_info(Arc::new(table_info), options, object_store_options).map(|builder| builder.with_files(data_files))
129+
Err(LakeSoulError::MetaDataError(LakeSoulMetaDataError::NotFound(format!(
130+
"Table '{}' not found",
131+
table_name
132+
))))
133+
}
93134
} else {
94135
Ok(LakeSoulIOConfigBuilder::new())
95136
}
@@ -126,6 +167,7 @@ pub(crate) fn format_table_info_partitions(range_keys: &[String], hash_keys: &[S
126167
)
127168
}
128169

170+
129171
pub(crate) async fn commit_data(
130172
client: MetaDataClientRef,
131173
table_name: &str,

rust/lakesoul-datafusion/src/datasource/file_format/metadata_format.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,25 @@ use async_trait::async_trait;
1313

1414
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
1515
use datafusion::common::parsers::CompressionTypeVariant;
16-
use datafusion::common::{project_schema, GetExt, Statistics};
16+
use datafusion::common::{project_schema, DFSchema, GetExt, Statistics};
1717
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
1818
use datafusion::datasource::file_format::parquet::ParquetFormatFactory;
1919
use datafusion::datasource::listing::ListingOptions;
2020
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
2121
use datafusion::error::DataFusionError;
2222
use datafusion::execution::TaskContext;
2323
use datafusion::logical_expr::dml::InsertOp;
24-
use datafusion::physical_expr::{EquivalenceProperties, LexOrdering, LexRequirement};
24+
use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties, LexOrdering, LexRequirement};
2525
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
26+
use datafusion::physical_plan::filter::FilterExec;
2627
use datafusion::physical_plan::projection::ProjectionExec;
2728
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2829
use datafusion::physical_plan::union::UnionExec;
2930
use datafusion::physical_plan::{
3031
DisplayAs, DisplayFormatType, Distribution, ExecutionPlanProperties, Partitioning, PlanProperties,
3132
SendableRecordBatchStream,
3233
};
34+
use datafusion::prelude::{ident, lit};
3335
use datafusion::sql::TableReference;
3436
use datafusion::{
3537
datasource::{
@@ -54,7 +56,7 @@ use object_store::{ObjectMeta, ObjectStore};
5456
use proto::proto::entity::TableInfo;
5557
use rand::distributions::DistString;
5658

57-
use log::debug;
59+
use log::{debug, info};
5860
use tokio::sync::Mutex;
5961
use tokio::task::JoinHandle;
6062

@@ -81,6 +83,7 @@ impl LakeSoulMetaDataParquetFormat {
8183
table_info: Arc<TableInfo>,
8284
conf: LakeSoulIOConfig,
8385
) -> crate::error::Result<Self> {
86+
debug!("LakeSoulMetaDataParquetFormat::new, conf: {:?}", conf);
8487
Ok(Self {
8588
parquet_format,
8689
client,
@@ -162,6 +165,7 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
162165
conf: FileScanConfig,
163166
filters: Option<&Arc<dyn PhysicalExpr>>,
164167
) -> Result<Arc<dyn ExecutionPlan>> {
168+
info!("LakeSoulMetaDataParquetFormat::create_physical_plan with conf= {:?}, filters= {:?}", &conf, &filters);
165169
// If enable pruning then combine the filters to build the predicate.
166170
// If disable pruning then set the predicate to None, thus readers
167171
// will not prune data based on the statistics.
@@ -182,6 +186,7 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
182186
table_schema.clone(),
183187
target_schema.clone(),
184188
self.conf.primary_keys_slice(),
189+
&self.conf.cdc_column(),
185190
);
186191
let merged_schema = project_schema(&table_schema, merged_projection.as_ref())?;
187192

@@ -191,6 +196,7 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
191196
self.parquet_format.clone(),
192197
conf,
193198
self.conf.primary_keys_slice(),
199+
&self.conf.cdc_column(),
194200
self.conf.partition_schema(),
195201
target_schema.clone(),
196202
)
@@ -205,6 +211,7 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
205211
let partition_columnar_value = Arc::new(partition_columnar_value);
206212

207213
let parquet_exec = Arc::new({
214+
debug!("create parquet exec with config= {:?}, predicate= {:?}", &config, &predicate);
208215
let mut builder = ParquetExecBuilder::new(config.clone());
209216
if let Some(predicate) = predicate.clone() {
210217
builder = builder.with_predicate(predicate);
@@ -257,6 +264,17 @@ impl FileFormat for LakeSoulMetaDataParquetFormat {
257264
partitioned_exec.first().unwrap().clone()
258265
};
259266

267+
let cdc_column = self.conf.cdc_column();
268+
let exec = if !cdc_column.is_empty() {
269+
let dfschema = DFSchema::try_from(exec.schema().as_ref().clone())?;
270+
let cdc_filter = ident(cdc_column).not_eq(lit("delete"));
271+
let expr = create_physical_expr(&cdc_filter, &dfschema, state.execution_props())?;
272+
273+
Arc::new(FilterExec::try_new(expr, exec)?)
274+
} else {
275+
exec
276+
};
277+
260278
if target_schema.fields().len() < merged_schema.fields().len() {
261279
let mut projection_expr = vec![];
262280
for field in target_schema.fields() {

rust/lakesoul-datafusion/src/datasource/table_factory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ impl TableProviderFactory for LakeSoulTableProviderFactory {
3939
cmd: &CreateExternalTable,
4040
) -> datafusion::error::Result<Arc<dyn TableProvider>> {
4141
info!(
42-
"LakeSoulTableProviderFactory::create: {:?}, {:?}, {:?}, {:?}",
43-
cmd.name, cmd.location, cmd.schema, cmd.constraints
42+
"LakeSoulTableProviderFactory::create: {:?}, {:?}, {:?}, {:?}, {:?}",
43+
cmd.name, cmd.location, cmd.schema, cmd.constraints, cmd.options
4444
);
4545

4646
let mut cmd = cmd.clone();

0 commit comments

Comments
 (0)