Skip to content

Commit b9cff3e

Browse files
committed
WIP: Avoid TaskContext::default, which is necessary as metadata cache factory is not used
1 parent ab3afc7 commit b9cff3e

File tree

8 files changed

+52
-13
lines changed

8 files changed

+52
-13
lines changed

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,6 +2103,10 @@ impl Config {
21032103
i.get_service_typed().await,
21042104
i.get_service_typed().await,
21052105
i.get_service_typed().await,
2106+
i.get_service_typed::<dyn CubestoreMetadataCacheFactory>()
2107+
.await
2108+
.cache_factory()
2109+
.clone()
21062110
)
21072111
})
21082112
.await;

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl QueryPlanner for QueryPlannerImpl {
217217
let physical_plan = plan_ctx.state().create_physical_plan(&plan_to_move).await?;
218218

219219
let execution_time = SystemTime::now();
220-
let results = collect(physical_plan, Arc::new(TaskContext::default())).await?;
220+
let results = collect(physical_plan, ctx.task_ctx()).await?;
221221
let execution_time = execution_time.elapsed()?;
222222
app_metrics::META_QUERY_TIME_MS.report(execution_time.as_millis() as i64);
223223
debug!("Meta query data processing time: {:?}", execution_time,);

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::metastore::{
1212
use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec;
1313
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
1414
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
15+
use crate::queryplanner::QueryPlannerImpl;
1516
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
1617
use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE};
1718
use crate::table::data::{cmp_min_rows, cmp_partition_key};
@@ -190,11 +191,14 @@ impl CompactionServiceImpl {
190191
let deactivate_res = self
191192
.deactivate_and_mark_failed_chunks_for_replay(failed)
192193
.await;
194+
195+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
196+
193197
let in_memory_res = self
194-
.compact_chunks_to_memory(mem_chunks, &partition, &index, &table)
198+
.compact_chunks_to_memory(mem_chunks, &partition, &index, &table, task_context.clone())
195199
.await;
196200
let persistent_res = self
197-
.compact_chunks_to_persistent(persistent_chunks, &partition, &index, &table)
201+
.compact_chunks_to_persistent(persistent_chunks, &partition, &index, &table, task_context)
198202
.await;
199203
deactivate_res?;
200204
in_memory_res?;
@@ -209,6 +213,7 @@ impl CompactionServiceImpl {
209213
partition: &IdRow<Partition>,
210214
index: &IdRow<Index>,
211215
table: &IdRow<Table>,
216+
task_context: Arc<TaskContext>,
212217
) -> Result<(), CubeError> {
213218
if chunks.is_empty() {
214219
return Ok(());
@@ -290,6 +295,7 @@ impl CompactionServiceImpl {
290295
in_memory_columns,
291296
unique_key.clone(),
292297
aggregate_columns.clone(),
298+
task_context.clone(),
293299
)
294300
.await?;
295301
let batches = collect(batches_stream).await?;
@@ -337,6 +343,7 @@ impl CompactionServiceImpl {
337343
partition: &IdRow<Partition>,
338344
index: &IdRow<Index>,
339345
table: &IdRow<Table>,
346+
task_context: Arc<TaskContext>,
340347
) -> Result<(), CubeError> {
341348
if chunks.is_empty() {
342349
return Ok(());
@@ -381,6 +388,7 @@ impl CompactionServiceImpl {
381388
in_memory_columns,
382389
unique_key.clone(),
383390
aggregate_columns.clone(),
391+
task_context,
384392
)
385393
.await?;
386394

@@ -687,8 +695,9 @@ impl CompactionService for CompactionServiceImpl {
687695
IndexType::Regular => None,
688696
IndexType::Aggregate => Some(table.get_row().aggregate_columns()),
689697
};
698+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
690699
let records =
691-
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns).await?;
700+
merge_chunks(key_size, main_table, new, unique_key, aggregate_columns, task_context).await?;
692701
let count_and_min = write_to_files(
693702
records,
694703
total_rows as usize,
@@ -890,6 +899,7 @@ impl CompactionService for CompactionServiceImpl {
890899
key_len,
891900
// TODO should it respect table partition_split_threshold?
892901
self.config.partition_split_threshold() as usize,
902+
QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx(),
893903
)
894904
.await?;
895905
// There is no point if we cannot split the partition.
@@ -988,8 +998,9 @@ async fn find_partition_keys(
988998
p: AggregateExec,
989999
key_len: usize,
9901000
rows_per_partition: usize,
1001+
context: Arc<TaskContext>,
9911002
) -> Result<Vec<Row>, CubeError> {
992-
let mut s = p.execute(0, Arc::new(TaskContext::default()))?;
1003+
let mut s = p.execute(0, context)?;
9931004
let mut points = Vec::new();
9941005
let mut row_count = 0;
9951006
while let Some(b) = s.next().await.transpose()? {
@@ -1364,6 +1375,7 @@ pub async fn merge_chunks(
13641375
r: Vec<ArrayRef>,
13651376
unique_key_columns: Option<Vec<&crate::metastore::Column>>,
13661377
aggregate_columns: Option<Vec<AggregateColumn>>,
1378+
task_context: Arc<TaskContext>,
13671379
) -> Result<SendableRecordBatchStream, CubeError> {
13681380
let schema = l.schema();
13691381
let r = RecordBatch::try_new(schema.clone(), r)?;
@@ -1421,7 +1433,7 @@ pub async fn merge_chunks(
14211433
)?);
14221434
}
14231435

1424-
Ok(res.execute(0, Arc::new(TaskContext::default()))?)
1436+
Ok(res.execute(0, task_context)?)
14251437
}
14261438

14271439
pub async fn merge_replay_handles(
@@ -2331,6 +2343,7 @@ impl MultiSplit {
23312343
ROW_GROUP_SIZE,
23322344
self.metadata_cache_factory.clone(),
23332345
);
2346+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
23342347
let records = if !in_files.is_empty() {
23352348
read_files(
23362349
&in_files.into_iter().map(|(f, _)| f).collect::<Vec<_>>(),
@@ -2340,10 +2353,10 @@ impl MultiSplit {
23402353
Arc::new(store.arrow_schema()),
23412354
)
23422355
.await?
2343-
.execute(0, Arc::new(TaskContext::default()))?
2356+
.execute(0, task_context)?
23442357
} else {
23452358
EmptyExec::new(Arc::new(store.arrow_schema()))
2346-
.execute(0, Arc::new(TaskContext::default()))?
2359+
.execute(0, task_context)?
23472360
};
23482361
let row_counts = write_to_files_by_keys(
23492362
records,

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::metastore::{
1818
deactivate_table_due_to_corrupt_data, deactivate_table_on_corrupt_data, table::Table, Chunk,
1919
Column, ColumnType, IdRow, Index, IndexType, MetaStore, Partition, WAL,
2020
};
21+
use crate::queryplanner::QueryPlannerImpl;
2122
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
2223
use crate::table::{Row, TableValue};
2324
use crate::util::batch_memory::columns_vec_buffer_size;
@@ -432,12 +433,15 @@ impl ChunkDataStore for ChunkStore {
432433
if old_chunk_ids.is_empty() {
433434
return Ok(());
434435
}
436+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
437+
435438
let batches_stream = merge_chunks(
436439
key_size,
437440
main_table.clone(),
438441
in_memory_columns,
439442
unique_key.clone(),
440443
aggregate_columns.clone(),
444+
task_context,
441445
)
442446
.await?;
443447
let batches = common_collect(batches_stream).await?;
@@ -1342,7 +1346,9 @@ impl ChunkStore {
13421346

13431347
assert!(aggregate.properties().output_ordering().is_some_and(|ordering| ordering.len() == key_size));
13441348

1345-
let batches = collect(aggregate, Arc::new(TaskContext::default())).await?;
1349+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.cache_factory().make_session_config()).task_ctx();
1350+
1351+
let batches = collect(aggregate, task_context).await?;
13461352
if batches.is_empty() {
13471353
Ok(vec![])
13481354
} else if batches.len() == 1 {

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::config::injection::DIService;
22
use crate::config::ConfigObj;
33
use crate::metastore::table::StreamOffset;
44
use crate::metastore::Column;
5+
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
56
use crate::streaming::kafka_post_processing::{KafkaPostProcessPlan, KafkaPostProcessPlanner};
67
use crate::streaming::traffic_sender::TrafficSender;
78
use crate::streaming::{parse_json_payload_and_key, StreamingSource};
@@ -59,6 +60,7 @@ impl KafkaStreamingSource {
5960
kafka_client: Arc<dyn KafkaClientService>,
6061
use_ssl: bool,
6162
trace_obj: Option<String>,
63+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
6264
) -> Result<Self, CubeError> {
6365
let (post_processing_plan, columns, unique_key_columns, seq_column_index) =
6466
if let Some(select_statement) = select_statement {
@@ -70,7 +72,7 @@ impl KafkaStreamingSource {
7072
source_columns,
7173
);
7274
let plan = planner
73-
.build(select_statement.clone())
75+
.build(select_statement.clone(), metadata_cache_factory)
7476
.await?;
7577
let columns = plan.source_columns().clone();
7678
let seq_column_index = plan.source_seq_column_index();
@@ -448,7 +450,7 @@ mod tests {
448450
.await
449451
.unwrap();
450452

451-
let batches = collect(phys_plan, Arc::new(TaskContext::default()))
453+
let batches = collect(phys_plan, plan_ctx.task_ctx())
452454
.await
453455
.unwrap();
454456
let res = batches_to_dataframe(batches).unwrap();
@@ -487,7 +489,7 @@ mod tests {
487489
.unwrap();
488490
let phys_plan = phys_plan.with_new_children(vec![inp]).unwrap();
489491

490-
let batches = collect(phys_plan, Arc::new(TaskContext::default()))
492+
let batches = collect(phys_plan, plan_ctx.task_ctx())
491493
.await
492494
.unwrap();
493495
let res = batches_to_dataframe(batches).unwrap();

rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::metastore::Column;
2+
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
23
use crate::queryplanner::{QueryPlan, QueryPlannerImpl};
34
use crate::sql::MySqlDialectWithBackTicks;
45
use crate::streaming::topic_table_provider::TopicTableProvider;
@@ -29,6 +30,7 @@ use std::sync::Arc;
2930

3031
#[derive(Clone)]
3132
pub struct KafkaPostProcessPlan {
33+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
3234
projection_plan: Arc<dyn ExecutionPlan>,
3335
filter_plan: Option<Arc<dyn ExecutionPlan>>,
3436
source_columns: Vec<Column>,
@@ -44,6 +46,7 @@ impl KafkaPostProcessPlan {
4446
source_columns: Vec<Column>,
4547
source_unique_columns: Vec<Column>,
4648
source_seq_column_index: usize,
49+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
4750
) -> Self {
4851
let source_schema = Arc::new(Schema::new(
4952
source_columns
@@ -58,6 +61,7 @@ impl KafkaPostProcessPlan {
5861
source_unique_columns,
5962
source_seq_column_index,
6063
source_schema,
64+
metadata_cache_factory,
6165
}
6266
}
6367

@@ -91,7 +95,9 @@ impl KafkaPostProcessPlan {
9195
.clone()
9296
.with_new_children(vec![filter_input])?;
9397

94-
let mut out_batches = collect(projection, Arc::new(TaskContext::default())).await?;
98+
let task_context = QueryPlannerImpl::execution_context_helper(self.metadata_cache_factory.make_session_config()).task_ctx();
99+
100+
let mut out_batches = collect(projection, task_context).await?;
95101
let res = if out_batches.len() == 1 {
96102
out_batches.pop().unwrap()
97103
} else {
@@ -139,6 +145,7 @@ impl KafkaPostProcessPlanner {
139145
pub async fn build(
140146
&self,
141147
select_statement: String,
148+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
142149
) -> Result<KafkaPostProcessPlan, CubeError> {
143150
let target_schema = Arc::new(Schema::new(
144151
self.columns
@@ -176,6 +183,7 @@ impl KafkaPostProcessPlanner {
176183
self.source_columns.clone(),
177184
source_unique_columns,
178185
source_seq_column_index,
186+
metadata_cache_factory,
179187
))
180188
}
181189

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::metastore::replay_handle::{ReplayHandle, SeqPointer, SeqPointerForLoc
1111
use crate::metastore::source::SourceCredentials;
1212
use crate::metastore::table::{StreamOffset, Table};
1313
use crate::metastore::{Column, ColumnType, IdRow, MetaStore};
14+
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
1415
use crate::sql::timestamp_from_string;
1516
use crate::store::ChunkDataStore;
1617
use crate::streaming::kafka::{KafkaClientService, KafkaStreamingSource};
@@ -57,6 +58,7 @@ pub struct StreamingServiceImpl {
5758
chunk_store: Arc<dyn ChunkDataStore>,
5859
ksql_client: Arc<dyn KsqlClient>,
5960
kafka_client: Arc<dyn KafkaClientService>,
61+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
6062
}
6163

6264
crate::di_service!(StreamingServiceImpl, [StreamingService]);
@@ -68,13 +70,15 @@ impl StreamingServiceImpl {
6870
chunk_store: Arc<dyn ChunkDataStore>,
6971
ksql_client: Arc<dyn KsqlClient>,
7072
kafka_client: Arc<dyn KafkaClientService>,
73+
metadata_cache_factory: Arc<dyn MetadataCacheFactory>,
7174
) -> Arc<Self> {
7275
Arc::new(Self {
7376
config_obj,
7477
meta_store,
7578
chunk_store,
7679
ksql_client,
7780
kafka_client,
81+
metadata_cache_factory,
7882
})
7983
}
8084

@@ -165,6 +169,7 @@ impl StreamingServiceImpl {
165169
self.kafka_client.clone(),
166170
*use_ssl,
167171
trace_obj,
172+
self.metadata_cache_factory.clone(),
168173
).await?)),
169174
}
170175
}

rust/cubestore/cubestore/src/table/data.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ pub fn rows_to_columns(cols: &[Column], rows: &[Row]) -> Vec<ArrayRef> {
241241

242242
pub fn to_stream(r: RecordBatch) -> SendableRecordBatchStream {
243243
let schema = r.schema();
244+
// TaskContext::default is OK here because it's a plain memory exec.
244245
MemoryExec::try_new(&[vec![r]], schema, None)
245246
.unwrap()
246247
.execute(0, Arc::new(TaskContext::default()))

0 commit comments

Comments
 (0)