Skip to content

Commit d20f7d8

Browse files
committed
chore(cubestore): Upgrade DF: Construct SessionConfig, update datafusion interfaces
1 parent fdaef44 commit d20f7d8

File tree

12 files changed

+248
-60
lines changed

12 files changed

+248
-60
lines changed

rust/cubestore/Cargo.lock

Lines changed: 156 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,6 +2103,10 @@ impl Config {
21032103
i.get_service_typed().await,
21042104
i.get_service_typed().await,
21052105
i.get_service_typed().await,
2106+
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
2107+
.await
2108+
.cache_factory()
2109+
.clone()
21062110
)
21072111
})
21082112
.await;

rust/cubestore/cubestore/src/queryplanner/metadata_cache.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use bytes::Bytes;
22
use datafusion::datasource::physical_plan::parquet::DefaultParquetFileReaderFactory;
33
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
44
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
5+
use datafusion::parquet::file::encryption::ParquetEncryptionConfig;
56
use datafusion::parquet::file::metadata::ParquetMetaData;
67
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
8+
use datafusion::prelude::SessionConfig;
79
use futures_util::future::BoxFuture;
810
use futures_util::FutureExt;
911
use std::fmt;
@@ -22,6 +24,9 @@ pub trait MetadataCacheFactory: Sync + Send {
2224
max_capacity: u64,
2325
time_to_idle: Duration,
2426
) -> Arc<dyn ParquetFileReaderFactory>;
27+
fn make_session_config(&self) -> SessionConfig {
28+
SessionConfig::new()
29+
}
2530
}
2631
/// Default MetadataCache, does not cache anything
2732
#[derive(Debug)]
@@ -132,6 +137,16 @@ pub struct LruCachingFileReader {
132137
cache: Arc<moka::sync::Cache<object_store::path::Path, Arc<ParquetMetaData>>>,
133138
}
134139

140+
impl LruCachingFileReader {
141+
pub fn new(path: object_store::path::Path, reader: Box<dyn AsyncFileReader>, cache: Arc<moka::sync::Cache<object_store::path::Path, Arc<ParquetMetaData>>>) -> LruCachingFileReader {
142+
LruCachingFileReader {
143+
path,
144+
reader,
145+
cache,
146+
}
147+
}
148+
}
149+
135150
impl AsyncFileReader for LruCachingFileReader {
136151
fn get_bytes(
137152
&mut self,
@@ -149,14 +164,16 @@ impl AsyncFileReader for LruCachingFileReader {
149164

150165
fn get_metadata(
151166
&mut self,
167+
encryption_config: &Option<ParquetEncryptionConfig>
152168
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
153169
let cache = self.cache.clone();
154170
let path = self.path.clone();
171+
let encryption_config = encryption_config.clone();
155172
async move {
156173
match cache.get(&path) {
157174
Some(metadata) => Ok(metadata),
158175
None => {
159-
let metadata = self.reader.get_metadata().await?;
176+
let metadata = self.reader.get_metadata(&encryption_config).await?;
160177
cache.insert(path, metadata.clone());
161178
Ok(metadata)
162179
}

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ use datafusion::physical_plan::{
8787
collect, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
8888
PlanProperties, SendableRecordBatchStream,
8989
};
90-
use datafusion::prelude::SessionContext;
90+
use datafusion::prelude::{SessionConfig, SessionContext};
9191
use datafusion::sql::parser::Statement;
9292
use datafusion::sql::planner::{ContextProvider, SqlToRel};
9393
use datafusion::{cube_ext, datasource::TableProvider};
@@ -217,7 +217,7 @@ impl QueryPlanner for QueryPlannerImpl {
217217
let physical_plan = plan_ctx.state().create_physical_plan(&plan_to_move).await?;
218218

219219
let execution_time = SystemTime::now();
220-
let results = collect(physical_plan, Arc::new(TaskContext::default())).await?;
220+
let results = collect(physical_plan, ctx.task_ctx()).await?;
221221
let execution_time = execution_time.elapsed()?;
222222
app_metrics::META_QUERY_TIME_MS.report(execution_time.as_millis() as i64);
223223
debug!("Meta query data processing time: {:?}", execution_time,);
@@ -245,8 +245,8 @@ impl QueryPlannerImpl {
245245
}
246246

247247
impl QueryPlannerImpl {
248-
pub fn make_execution_context() -> SessionContext {
249-
let context = SessionContext::new();
248+
pub fn execution_context_helper(config: SessionConfig) -> SessionContext {
249+
let context = SessionContext::new_with_config(config);
250250
// TODO upgrade DF: build SessionContexts consistently -- that now means check all appropriate SessionContext constructors use this make_execution_context or execution_context function.
251251
for udaf in registerable_aggregate_udfs() {
252252
context.register_udaf(udaf);
@@ -266,8 +266,13 @@ impl QueryPlannerImpl {
266266
context
267267
}
268268

269+
pub fn make_execution_context() -> SessionContext {
270+
Self::execution_context_helper(SessionConfig::new())
271+
}
272+
273+
// TODO upgrade DF: Don't be async
269274
async fn execution_context(&self) -> Result<Arc<SessionContext>, CubeError> {
270-
Ok(Arc::new(Self::make_execution_context()))
275+
Ok(Arc::new(Self::execution_context_helper(self.metadata_cache_factory.make_session_config())))
271276
}
272277
}
273278

rust/cubestore/cubestore/src/queryplanner/partition_filter.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ impl PartitionFilter {
1919
const SIZE_LIMIT: usize = 50;
2020

2121
pub fn extract(s: &Schema, filters: &[Expr]) -> PartitionFilter {
22-
println!("Calling extract on filters {:?}", filters);
2322
let builder = Builder { schema: s };
2423

2524
let mut r = vec![];
2625
for f in filters {
2726
r = builder.extract_filter(f, r);
28-
println!("Extracted. r = {:?}", r);
2927
}
3028

3129
PartitionFilter { min_max: r }

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ use super::udfs::{
9797
aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs,
9898
registerable_arc_scalar_udfs, CubeAggregateUDFKind,
9999
};
100+
use super::QueryPlannerImpl;
100101

101102
#[automock]
102103
#[async_trait]
@@ -139,14 +140,21 @@ pub trait QueryExecutor: DIService + Send + Sync {
139140
crate::di_service!(MockQueryExecutor, [QueryExecutor]);
140141

141142
pub struct QueryExecutorImpl {
142-
// TODO: Why do we need a MetadataCacheFactory when we have a ParquetMetadataCache?
143+
// TODO: Why do we need a MetadataCacheFactory when we have a ParquetMetadataCache? (We use its make_session_config() now, TODO rename stuff)
143144
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
144145
parquet_metadata_cache: Arc<dyn CubestoreParquetMetadataCache>,
145146
memory_handler: Arc<dyn MemoryHandler>,
146147
}
147148

148149
crate::di_service!(QueryExecutorImpl, [QueryExecutor]);
149150

151+
impl QueryExecutorImpl {
152+
fn execution_context(&self) -> Result<Arc<SessionContext>, CubeError> {
153+
// This is supposed to be identical to QueryImplImpl::execution_context.
154+
Ok(Arc::new(QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.make_session_config())))
155+
}
156+
}
157+
150158
#[async_trait]
151159
impl QueryExecutor for QueryExecutorImpl {
152160
#[instrument(level = "trace", skip(self, plan, cluster))]
@@ -174,7 +182,8 @@ impl QueryExecutor for QueryExecutorImpl {
174182

175183
let execution_time = SystemTime::now();
176184

177-
let results = collect(split_plan.clone(), Arc::new(TaskContext::default()))
185+
let session_context = self.execution_context()?;
186+
let results = collect(split_plan.clone(), session_context.task_ctx())
178187
.instrument(collect_span)
179188
.await;
180189
let execution_time = execution_time.elapsed()?;
@@ -241,8 +250,9 @@ impl QueryExecutor for QueryExecutorImpl {
241250
);
242251

243252
let execution_time = SystemTime::now();
253+
let session_context = self.execution_context()?;
244254
// TODO context
245-
let results = collect(worker_plan.clone(), Arc::new(TaskContext::default()))
255+
let results = collect(worker_plan.clone(), session_context.task_ctx())
246256
.instrument(tracing::span!(
247257
tracing::Level::TRACE,
248258
"collect_physical_plan"

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::metastore::{
1212
use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec;
1313
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
1414
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
15+
use crate::queryplanner::QueryPlannerImpl;
1516
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
1617
use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE};
1718
use crate::table::data::{cmp_min_rows, cmp_partition_key};
@@ -190,11 +191,14 @@ impl CompactionServiceImpl {
190191
let deactivate_res = self
191192
.deactivate_and_mark_failed_chunks_for_replay(failed)
192193
.await;
194+
195+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
196+
193197
let in_memory_res = self
194-
.compact_chunks_to_memory(mem_chunks, &partition, &index, &table)
198+
.compact_chunks_to_memory(mem_chunks, &partition, &index, &table, task_context.clone())
195199
.await;
196200
let persistent_res = self
197-
.compact_chunks_to_persistent(persistent_chunks, &partition, &index, &table)
201+
.compact_chunks_to_persistent(persistent_chunks, &partition, &index, &table, task_context)
198202
.await;
199203
deactivate_res?;
200204
in_memory_res?;
@@ -209,6 +213,7 @@ impl CompactionServiceImpl {
209213
partition: &IdRow<Partition>,
210214
index: &IdRow<Index>,
211215
table: &IdRow<Table>,
216+
task_context: Arc<TaskContext>,
212217
) -> Result<(), CubeError> {
213218
if chunks.is_empty() {
214219
return Ok(());
@@ -290,6 +295,7 @@ impl CompactionServiceImpl {
290295
in_memory_columns,
291296
unique_key.clone(),
292297
aggregate_columns.clone(),
298+
task_context.clone(),
293299
)
294300
.await?;
295301
let batches = collect(batches_stream).await?;
@@ -337,6 +343,7 @@ impl CompactionServiceImpl {
337343
partition: &IdRow<Partition>,
338344
index: &IdRow<Index>,
339345
table: &IdRow<Table>,
346+
task_context: Arc<TaskContext>,
340347
) -> Result<(), CubeError> {
341348
if chunks.is_empty() {
342349
return Ok(());
@@ -381,6 +388,7 @@ impl CompactionServiceImpl {
381388
in_memory_columns,
382389
unique_key.clone(),
383390
aggregate_columns.clone(),
391+
task_context,
384392
)
385393
.await?;
386394

@@ -687,8 +695,9 @@ impl CompactionService for CompactionServiceImpl {
687695
IndexType::Regular => None,
688696
IndexType::Aggregate => Some(table.get_row().aggregate_columns()),
689697
};
698+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
690699
let records =
691-
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?;
700+
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns, task_context).await?;
692701
let count_and_min = write_to_files(
693702
records,
694703
total_rows as usize,
@@ -890,6 +899,7 @@ impl CompactionService for CompactionServiceImpl {
890899
key_len,
891900
// TODO should it respect table partition_split_threshold?
892901
self.config.partition_split_threshold() as usize,
902+
QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx(),
893903
)
894904
.await?;
895905
// There is no point if we cannot split the partition.
@@ -988,8 +998,9 @@ async fn find_partition_keys(
988998
p: AggregateExec,
989999
key_len: usize,
9901000
rows_per_partition: usize,
1001+
context: Arc<TaskContext>,
9911002
) -> Result<Vec<Row>, CubeError> {
992-
let mut s = p.execute(0, Arc::new(TaskContext::default()))?;
1003+
let mut s = p.execute(0, context)?;
9931004
let mut points = Vec::new();
9941005
let mut row_count = 0;
9951006
while let Some(b) = s.next().await.transpose()? {
@@ -1364,6 +1375,7 @@ pub async fn merge_chunks(
13641375
r: Vec<ArrayRef>,
13651376
unique_key_columns: Option<Vec<&crate::metastore::Column>>,
13661377
aggregate_columns: Option<Vec<AggregateColumn>>,
1378+
task_context: Arc<TaskContext>,
13671379
) -> Result<SendableRecordBatchStream, CubeError> {
13681380
let schema = l.schema();
13691381
let r = RecordBatch::try_new(schema.clone(), r)?;
@@ -1421,7 +1433,7 @@ pub async fn merge_chunks(
14211433
)?);
14221434
}
14231435

1424-
Ok(res.execute(0, Arc::new(TaskContext::default()))?)
1436+
Ok(res.execute(0, task_context)?)
14251437
}
14261438

14271439
pub async fn merge_replay_handles(
@@ -2331,6 +2343,7 @@ impl MultiSplit {
23312343
ROW_GROUP_SIZE,
23322344
self.metadata_cache_factory.clone(),
23332345
);
2346+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
23342347
let records = if !in_files.is_empty() {
23352348
read_files(
23362349
&in_files.into_iter().map(|(f, _)| f).collect::<Vec<_>>(),
@@ -2340,10 +2353,10 @@ impl MultiSplit {
23402353
Arc::new(store.arrow_schema()),
23412354
)
23422355
.await?
2343-
.execute(0, Arc::new(TaskContext::default()))?
2356+
.execute(0, task_context)?
23442357
} else {
23452358
EmptyExec::new(Arc::new(store.arrow_schema()))
2346-
.execute(0, Arc::new(TaskContext::default()))?
2359+
.execute(0, task_context)?
23472360
};
23482361
let row_counts = write_to_files_by_keys(
23492362
records,

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::metastore::{
1818
deactivate_table_due_to_corrupt_data, deactivate_table_on_corrupt_data, table::Table, Chunk,
1919
Column, ColumnType, IdRow, Index, IndexType, MetaStore, Partition, WAL,
2020
};
21+
use crate::queryplanner::QueryPlannerImpl;
2122
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
2223
use crate::table::{Row, TableValue};
2324
use crate::util::batch_memory::columns_vec_buffer_size;
@@ -432,12 +433,15 @@ impl ChunkDataStore for ChunkStore {
432433
if old_chunk_ids.is_empty() {
433434
return Ok(());
434435
}
436+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
437+
435438
let batches_stream = merge_chunks(
436439
key_size,
437440
main_table.clone(),
438441
in_memory_columns,
439442
unique_key.clone(),
440443
aggregate_columns.clone(),
444+
task_context,
441445
)
442446
.await?;
443447
let batches = common_collect(batches_stream).await?;
@@ -1342,7 +1346,9 @@ impl ChunkStore {
13421346

13431347
assert!(aggregate.properties().output_ordering().is_some_and(|ordering| ordering.len() == key_size));
13441348

1345-
let batches = collect(aggregate, Arc::new(TaskContext::default())).await?;
1349+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
1350+
1351+
let batches = collect(aggregate, task_context).await?;
13461352
if batches.is_empty() {
13471353
Ok(vec![])
13481354
} else if batches.len() == 1 {

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::config::injection::DIService;
22
use crate::config::ConfigObj;
33
use crate::metastore::table::StreamOffset;
44
use crate::metastore::Column;
5+
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
56
use crate::streaming::kafka_post_processing::{KafkaPostProcessPlan, KafkaPostProcessPlanner};
67
use crate::streaming::traffic_sender::TrafficSender;
78
use crate::streaming::{parse_json_payload_and_key, StreamingSource};
@@ -59,6 +60,7 @@ impl KafkaStreamingSource {
5960
kafka_client: Arc<dyn KafkaClientService>,
6061
use_ssl: bool,
6162
trace_obj: Option<String>,
63+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
6264
) -> Result<Self, CubeError> {
6365
let (post_processing_plan, columns, unique_key_columns, seq_column_index) =
6466
if let Some(select_statement) = select_statement {
@@ -70,7 +72,7 @@ impl KafkaStreamingSource {
7072
source_columns,
7173
);
7274
let plan = planner
73-
.build(select_statement.clone())
75+
.build(select_statement.clone(), metadata_cache_factory)
7476
.await?;
7577
let columns = plan.source_columns().clone();
7678
let seq_column_index = plan.source_seq_column_index();
@@ -448,7 +450,7 @@ mod tests {
448450
.await
449451
.unwrap();
450452

451-
let batches = collect(phys_plan, Arc::new(TaskContext::default()))
453+
let batches = collect(phys_plan, plan_ctx.task_ctx())
452454
.await
453455
.unwrap();
454456
let res = batches_to_dataframe(batches).unwrap();
@@ -487,7 +489,7 @@ mod tests {
487489
.unwrap();
488490
let phys_plan = phys_plan.with_new_children(vec![inp]).unwrap();
489491

490-
let batches = collect(phys_plan, Arc::new(TaskContext::default()))
492+
let batches = collect(phys_plan, plan_ctx.task_ctx())
491493
.await
492494
.unwrap();
493495
let res = batches_to_dataframe(batches).unwrap();

0 commit comments

Comments
 (0)