Skip to content

Commit af22ff9

Browse files
committed
chore(cubestore): Upgrade DF: Fix usage of MetadataCacheFactory and CubestoreParquetMetadataCache
1 parent 6634da5 commit af22ff9

File tree

7 files changed

+16
-43
lines changed

7 files changed

+16
-43
lines changed

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,7 @@ use crate::util::memory::{MemoryHandler, MemoryHandlerImpl};
4949
use crate::CubeError;
5050
use cuberockstore::rocksdb::{Options, DB};
5151
use datafusion::cube_ext;
52-
// use datafusion::physical_plan::parquet::BasicMetadataCacheFactory;
53-
use crate::queryplanner::metadata_cache::{
54-
BasicMetadataCacheFactory, LruParquetMetadataCacheFactory, MetadataCacheFactory,
55-
NoopParquetMetadataCache,
56-
};
52+
use crate::queryplanner::metadata_cache::BasicMetadataCacheFactory;
5753
use futures::future::join_all;
5854
use log::Level;
5955
use log::{debug, error};
@@ -2034,8 +2030,8 @@ impl Config {
20342030
let metadata_cache_factory: &_ = cubestore_metadata_cache_factory.cache_factory();
20352031
CubestoreParquetMetadataCacheImpl::new(
20362032
match c.metadata_cache_max_capacity_bytes() {
2037-
0 => NoopParquetMetadataCache::new(),
2038-
max_cached_metadata => LruParquetMetadataCacheFactory::new(
2033+
0 => metadata_cache_factory.make_noop_cache(),
2034+
max_cached_metadata => metadata_cache_factory.make_lru_cache(
20392035
max_cached_metadata,
20402036
Duration::from_secs(c.metadata_cache_time_to_idle_secs()),
20412037
),
@@ -2093,10 +2089,6 @@ impl Config {
20932089
i.get_service_typed().await,
20942090
i.get_service_typed().await,
20952091
i.get_service_typed().await,
2096-
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
2097-
.await
2098-
.cache_factory()
2099-
.clone(),
21002092
)
21012093
})
21022094
.await;

rust/cubestore/cubestore/src/queryplanner/metadata_cache.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use futures_util::future::BoxFuture;
88
use futures_util::FutureExt;
99
use std::fmt;
1010
use std::fmt::{Debug, Formatter};
11-
use std::fs::File;
1211
use std::ops::Range;
1312
use std::sync::Arc;
1413
use std::time::Duration;
@@ -24,20 +23,19 @@ pub trait MetadataCacheFactory: Sync + Send {
2423
time_to_idle: Duration,
2524
) -> Arc<dyn ParquetFileReaderFactory>;
2625
}
27-
2826
/// Default MetadataCache, does not cache anything
2927
#[derive(Debug)]
3028
pub struct NoopParquetMetadataCache {
31-
default_factory: Arc<dyn ParquetFileReaderFactory>,
29+
default_factory: DefaultParquetFileReaderFactory,
3230
}
3331

3432
impl NoopParquetMetadataCache {
35-
/// Creates a new DefaultMetadataCache
33+
/// Creates a new DefaultMetadataCache
3634
pub fn new() -> Arc<Self> {
3735
Arc::new(NoopParquetMetadataCache {
38-
default_factory: Arc::new(DefaultParquetFileReaderFactory::new(Arc::new(
39-
object_store::local::LocalFileSystem::new(),
40-
))),
36+
default_factory: DefaultParquetFileReaderFactory::new(Arc::new(
37+
object_store::local::LocalFileSystem::new(),
38+
)),
4139
})
4240
}
4341
}
@@ -52,8 +50,9 @@ impl ParquetFileReaderFactory for NoopParquetMetadataCache {
5250
) -> datafusion::common::Result<Box<dyn AsyncFileReader + Send>> {
5351
self.default_factory
5452
.create_reader(partition_index, file_meta, metadata_size_hint, metrics)
55-
}
56-
}
53+
}
54+
}
55+
5756

5857
/// LruMetadataCache, caches parquet metadata.
5958
pub struct LruParquetMetadataCacheFactory {
@@ -115,9 +114,7 @@ impl BasicMetadataCacheFactory {
115114

116115
impl MetadataCacheFactory for BasicMetadataCacheFactory {
117116
fn make_noop_cache(&self) -> Arc<dyn ParquetFileReaderFactory> {
118-
Arc::new(DefaultParquetFileReaderFactory::new(Arc::new(
119-
object_store::local::LocalFileSystem::new(),
120-
)))
117+
NoopParquetMetadataCache::new()
121118
}
122119

123120
fn make_lru_cache(

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ use crate::queryplanner::topk::ClusterAggregateTopK;
5050
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
5151
use crate::table::{cmp_same_types, Row};
5252
use crate::CubeError;
53-
// use datafusion::physical_plan::parquet::NoopParquetMetadataCache;
54-
use crate::queryplanner::metadata_cache::{MetadataCacheFactory, NoopParquetMetadataCache};
53+
use crate::queryplanner::metadata_cache::NoopParquetMetadataCache;
5554
use datafusion::common;
5655
use datafusion::common::DFSchemaRef;
5756
use datafusion::datasource::DefaultTableSource;

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,9 +1464,7 @@ mod tests {
14641464
use crate::metastore::{
14651465
BaseRocksStoreFs, Column, ColumnType, IndexDef, IndexType, RocksMetaStore,
14661466
};
1467-
use crate::queryplanner::metadata_cache::{
1468-
BasicMetadataCacheFactory, NoopParquetMetadataCache,
1469-
};
1467+
use crate::queryplanner::metadata_cache::BasicMetadataCacheFactory;
14701468
use crate::remotefs::LocalDirRemoteFs;
14711469
use crate::store::MockChunkDataStore;
14721470
use crate::table::data::rows_to_columns;

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::config::injection::DIService;
22
use crate::config::ConfigObj;
33
use crate::metastore::table::StreamOffset;
44
use crate::metastore::Column;
5-
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
65
use crate::streaming::kafka_post_processing::{KafkaPostProcessPlan, KafkaPostProcessPlanner};
76
use crate::streaming::traffic_sender::TrafficSender;
87
use crate::streaming::{parse_json_payload_and_key, StreamingSource};
@@ -12,7 +11,6 @@ use async_std::stream;
1211
use async_trait::async_trait;
1312
use datafusion::arrow::array::ArrayRef;
1413
use datafusion::cube_ext;
15-
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
1614
use futures::Stream;
1715
use json::object::Object;
1816
use json::JsonValue;
@@ -61,7 +59,6 @@ impl KafkaStreamingSource {
6159
kafka_client: Arc<dyn KafkaClientService>,
6260
use_ssl: bool,
6361
trace_obj: Option<String>,
64-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
6562
) -> Result<Self, CubeError> {
6663
let (post_processing_plan, columns, unique_key_columns, seq_column_index) =
6764
if let Some(select_statement) = select_statement {
@@ -73,7 +70,7 @@ impl KafkaStreamingSource {
7370
source_columns,
7471
);
7572
let plan = planner
76-
.build(select_statement.clone(), metadata_cache_factory)
73+
.build(select_statement.clone())
7774
.await?;
7875
let columns = plan.source_columns().clone();
7976
let seq_column_index = plan.source_seq_column_index();

rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::metastore::Column;
2-
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
32
use crate::sql::MySqlDialectWithBackTicks;
43
use crate::streaming::topic_table_provider::TopicTableProvider;
54
use crate::CubeError;
@@ -9,7 +8,6 @@ use datafusion::arrow::datatypes::{Field, Schema, SchemaRef};
98
use datafusion::arrow::record_batch::RecordBatch;
109
use datafusion::common;
1110
use datafusion::common::{DFSchema, DFSchemaRef};
12-
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
1311
use datafusion::execution::TaskContext;
1412
use datafusion::logical_expr::expr::Alias;
1513
use datafusion::logical_expr::{Expr, Filter, LogicalPlan, Projection};
@@ -138,7 +136,6 @@ impl KafkaPostProcessPlanner {
138136
pub async fn build(
139137
&self,
140138
select_statement: String,
141-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
142139
) -> Result<KafkaPostProcessPlan, CubeError> {
143140
let target_schema = Arc::new(Schema::new(
144141
self.columns
@@ -150,7 +147,7 @@ impl KafkaPostProcessPlanner {
150147
let source_unique_columns = self.extract_source_unique_columns(&logical_plan)?;
151148

152149
let (projection_plan, filter_plan) = self
153-
.make_projection_and_filter_physical_plans(&logical_plan, metadata_cache_factory)
150+
.make_projection_and_filter_physical_plans(&logical_plan)
154151
.await?;
155152
if target_schema != projection_plan.schema() {
156153
return Err(CubeError::user(format!(
@@ -406,7 +403,6 @@ impl KafkaPostProcessPlanner {
406403
async fn make_projection_and_filter_physical_plans(
407404
&self,
408405
plan: &LogicalPlan,
409-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
410406
) -> Result<(Arc<dyn ExecutionPlan>, Option<Arc<dyn ExecutionPlan>>), CubeError> {
411407
let source_schema = Arc::new(Schema::new(
412408
self.source_columns

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::metastore::replay_handle::{ReplayHandle, SeqPointer, SeqPointerForLoc
1111
use crate::metastore::source::SourceCredentials;
1212
use crate::metastore::table::{StreamOffset, Table};
1313
use crate::metastore::{Column, ColumnType, IdRow, MetaStore};
14-
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
1514
use crate::sql::timestamp_from_string;
1615
use crate::store::ChunkDataStore;
1716
use crate::streaming::kafka::{KafkaClientService, KafkaStreamingSource};
@@ -24,7 +23,6 @@ use buffered_stream::BufferedStream;
2423
use chrono::Utc;
2524
use datafusion::arrow::array::ArrayBuilder;
2625
use datafusion::arrow::array::ArrayRef;
27-
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
2826
use futures::future::join_all;
2927
use futures::stream::StreamExt;
3028
use futures::Stream;
@@ -59,7 +57,6 @@ pub struct StreamingServiceImpl {
5957
chunk_store: Arc<dyn ChunkDataStore>,
6058
ksql_client: Arc<dyn KsqlClient>,
6159
kafka_client: Arc<dyn KafkaClientService>,
62-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
6360
}
6461

6562
crate::di_service!(StreamingServiceImpl, [StreamingService]);
@@ -71,15 +68,13 @@ impl StreamingServiceImpl {
7168
chunk_store: Arc<dyn ChunkDataStore>,
7269
ksql_client: Arc<dyn KsqlClient>,
7370
kafka_client: Arc<dyn KafkaClientService>,
74-
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
7571
) -> Arc<Self> {
7672
Arc::new(Self {
7773
config_obj,
7874
meta_store,
7975
chunk_store,
8076
ksql_client,
8177
kafka_client,
82-
metadata_cache_factory,
8378
})
8479
}
8580

@@ -170,7 +165,6 @@ impl StreamingServiceImpl {
170165
self.kafka_client.clone(),
171166
*use_ssl,
172167
trace_obj,
173-
self.metadata_cache_factory.clone(),
174168
).await?)),
175169
}
176170
}

0 commit comments

Comments
 (0)