Skip to content

Commit 79c5e92

Browse files
authored
feat(cubestore): CREATE TABLE IF NOT EXISTS support (#7451)
1 parent 40726ba commit 79c5e92

File tree

11 files changed

+795
-441
lines changed

11 files changed

+795
-441
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ crate::di_service!(MockCluster, [Cluster]);
173173
pub enum JobEvent {
174174
Started(RowKey, JobType),
175175
Success(RowKey, JobType),
176+
Orphaned(RowKey, JobType),
176177
Error(RowKey, JobType, String),
177178
}
178179

@@ -820,6 +821,10 @@ impl JobResultListener {
820821
new.get_row().job_type().clone(),
821822
"Job timed out".to_string(),
822823
)),
824+
JobStatus::Orphaned => Some(JobEvent::Orphaned(
825+
new.get_row().row_reference().clone(),
826+
new.get_row().job_type().clone(),
827+
)),
823828
JobStatus::Error(e) => Some(JobEvent::Error(
824829
new.get_row().row_reference().clone(),
825830
new.get_row().job_type().clone(),

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,8 @@ pub trait ConfigObj: DIService {
535535
fn remote_files_cleanup_delay_secs(&self) -> u64;
536536

537537
fn remote_files_cleanup_batch_size(&self) -> u64;
538+
539+
fn create_table_max_retries(&self) -> u64;
538540
}
539541

540542
#[derive(Debug, Clone)]
@@ -631,6 +633,7 @@ pub struct ConfigObjImpl {
631633
pub local_files_cleanup_delay_secs: u64,
632634
pub remote_files_cleanup_delay_secs: u64,
633635
pub remote_files_cleanup_batch_size: u64,
636+
pub create_table_max_retries: u64,
634637
}
635638

636639
crate::di_service!(ConfigObjImpl, [ConfigObj]);
@@ -987,6 +990,10 @@ impl ConfigObj for ConfigObjImpl {
987990
self.remote_files_cleanup_batch_size
988991
}
989992

993+
fn create_table_max_retries(&self) -> u64 {
994+
self.create_table_max_retries
995+
}
996+
990997
fn cachestore_cache_eviction_below_threshold(&self) -> u8 {
991998
self.cachestore_cache_eviction_below_threshold
992999
}
@@ -1496,6 +1503,7 @@ impl Config {
14961503
"CUBESTORE_REMOTE_FILES_CLEANUP_BATCH_SIZE",
14971504
50000,
14981505
),
1506+
create_table_max_retries: env_parse("CUBESTORE_CREATE_TABLE_MAX_RETRIES", 3),
14991507
}),
15001508
}
15011509
}
@@ -1604,6 +1612,7 @@ impl Config {
16041612
local_files_cleanup_delay_secs: 600,
16051613
remote_files_cleanup_delay_secs: 3600,
16061614
remote_files_cleanup_batch_size: 50000,
1615+
create_table_max_retries: 3,
16071616
}),
16081617
}
16091618
}

rust/cubestore/cubestore/src/metastore/job.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub enum JobStatus {
6161
Completed,
6262
Timeout,
6363
Error(String),
64+
Orphaned,
6465
}
6566

6667
#[derive(Clone, Serialize, Deserialize, Debug, Hash)]

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,7 @@ pub trait MetaStore: DIService + Send + Sync {
845845
aggregates: Option<Vec<(String, String)>>,
846846
partition_split_threshold: Option<u64>,
847847
trace_obj: Option<String>,
848+
drop_if_exists: bool,
848849
) -> Result<IdRow<Table>, CubeError>;
849850
async fn table_ready(&self, id: u64, is_ready: bool) -> Result<IdRow<Table>, CubeError>;
850851
async fn seal_table(&self, id: u64) -> Result<IdRow<Table>, CubeError>;
@@ -1490,6 +1491,37 @@ impl RocksMetaStore {
14901491
{
14911492
self.store.write_operation(f).await
14921493
}
1494+
1495+
fn drop_table_impl(
1496+
table_id: u64,
1497+
db_ref: DbTableRef,
1498+
batch_pipe: &mut BatchPipe,
1499+
) -> Result<IdRow<Table>, CubeError> {
1500+
let tables_table = TableRocksTable::new(db_ref.clone());
1501+
let indexes_table = IndexRocksTable::new(db_ref.clone());
1502+
let replay_handles_table = ReplayHandleRocksTable::new(db_ref.clone());
1503+
let trace_objects_table = TraceObjectRocksTable::new(db_ref.clone());
1504+
let indexes = indexes_table
1505+
.get_row_ids_by_index(&IndexIndexKey::TableId(table_id), &IndexRocksIndex::TableID)?;
1506+
let trace_objects = trace_objects_table.get_rows_by_index(
1507+
&TraceObjectIndexKey::ByTableId(table_id),
1508+
&TraceObjectRocksIndex::ByTableId,
1509+
)?;
1510+
for trace_object in trace_objects {
1511+
trace_objects_table.delete(trace_object.get_id(), batch_pipe)?;
1512+
}
1513+
let replay_handles = replay_handles_table.get_rows_by_index(
1514+
&ReplayHandleIndexKey::ByTableId(table_id),
1515+
&ReplayHandleRocksIndex::ByTableId,
1516+
)?;
1517+
for replay_handle in replay_handles {
1518+
replay_handles_table.delete(replay_handle.get_id(), batch_pipe)?;
1519+
}
1520+
for index in indexes {
1521+
RocksMetaStore::drop_index(db_ref.clone(), batch_pipe, index, true)?;
1522+
}
1523+
Ok(tables_table.delete(table_id, batch_pipe)?)
1524+
}
14931525
}
14941526

14951527
impl RocksMetaStore {
@@ -2033,9 +2065,15 @@ impl MetaStore for RocksMetaStore {
20332065
aggregates: Option<Vec<(String, String)>>,
20342066
partition_split_threshold: Option<u64>,
20352067
trace_obj: Option<String>,
2068+
drop_if_exists: bool,
20362069
) -> Result<IdRow<Table>, CubeError> {
20372070
self.write_operation(move |db_ref, batch_pipe| {
20382071
batch_pipe.invalidate_tables_cache();
2072+
if drop_if_exists {
2073+
if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) {
2074+
RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?;
2075+
}
2076+
}
20392077
let rocks_table = TableRocksTable::new(db_ref.clone());
20402078
let rocks_index = IndexRocksTable::new(db_ref.clone());
20412079
let rocks_schema = SchemaRocksTable::new(db_ref.clone());
@@ -2379,32 +2417,7 @@ impl MetaStore for RocksMetaStore {
23792417
async fn drop_table(&self, table_id: u64) -> Result<IdRow<Table>, CubeError> {
23802418
self.write_operation(move |db_ref, batch_pipe| {
23812419
batch_pipe.invalidate_tables_cache();
2382-
let tables_table = TableRocksTable::new(db_ref.clone());
2383-
let indexes_table = IndexRocksTable::new(db_ref.clone());
2384-
let replay_handles_table = ReplayHandleRocksTable::new(db_ref.clone());
2385-
let trace_objects_table = TraceObjectRocksTable::new(db_ref.clone());
2386-
let indexes = indexes_table.get_row_ids_by_index(
2387-
&IndexIndexKey::TableId(table_id),
2388-
&IndexRocksIndex::TableID,
2389-
)?;
2390-
let trace_objects = trace_objects_table.get_rows_by_index(
2391-
&TraceObjectIndexKey::ByTableId(table_id),
2392-
&TraceObjectRocksIndex::ByTableId,
2393-
)?;
2394-
for trace_object in trace_objects {
2395-
trace_objects_table.delete(trace_object.get_id(), batch_pipe)?;
2396-
}
2397-
let replay_handles = replay_handles_table.get_rows_by_index(
2398-
&ReplayHandleIndexKey::ByTableId(table_id),
2399-
&ReplayHandleRocksIndex::ByTableId,
2400-
)?;
2401-
for replay_handle in replay_handles {
2402-
replay_handles_table.delete(replay_handle.get_id(), batch_pipe)?;
2403-
}
2404-
for index in indexes {
2405-
RocksMetaStore::drop_index(db_ref.clone(), batch_pipe, index, true)?;
2406-
}
2407-
Ok(tables_table.delete(table_id, batch_pipe)?)
2420+
RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe)
24082421
})
24092422
.await
24102423
}
@@ -5108,6 +5121,7 @@ mod tests {
51085121
None,
51095122
None,
51105123
None,
5124+
false,
51115125
)
51125126
.await
51135127
.unwrap();
@@ -5130,6 +5144,7 @@ mod tests {
51305144
None,
51315145
None,
51325146
None,
5147+
false,
51335148
)
51345149
.await
51355150
.unwrap();
@@ -5251,6 +5266,7 @@ mod tests {
52515266
None,
52525267
None,
52535268
None,
5269+
false,
52545270
)
52555271
.await
52565272
.unwrap();
@@ -5275,6 +5291,7 @@ mod tests {
52755291
None,
52765292
None,
52775293
None,
5294+
false,
52785295
)
52795296
.await
52805297
.is_err());
@@ -5363,6 +5380,7 @@ mod tests {
53635380
None,
53645381
None,
53655382
None,
5383+
false,
53665384
)
53675385
.await
53685386
.unwrap();
@@ -5452,6 +5470,7 @@ mod tests {
54525470
]),
54535471
None,
54545472
None,
5473+
false,
54555474
)
54565475
.await
54575476
.unwrap();
@@ -5524,6 +5543,7 @@ mod tests {
55245543
]),
55255544
None,
55265545
None,
5546+
false,
55275547
)
55285548
.await
55295549
.is_err());
@@ -5546,6 +5566,7 @@ mod tests {
55465566
None,
55475567
None,
55485568
None,
5569+
false,
55495570
)
55505571
.await
55515572
.is_err());
@@ -5571,6 +5592,7 @@ mod tests {
55715592
]),
55725593
None,
55735594
None,
5595+
false,
55745596
)
55755597
.await
55765598
.is_err());
@@ -6047,6 +6069,7 @@ mod tests {
60476069
None,
60486070
None,
60496071
None,
6072+
false,
60506073
)
60516074
.await
60526075
.unwrap();
@@ -6269,6 +6292,7 @@ mod tests {
62696292
None,
62706293
None,
62716294
None,
6295+
false,
62726296
)
62736297
.await
62746298
.unwrap();
@@ -6409,6 +6433,7 @@ mod tests {
64096433
None,
64106434
None,
64116435
None,
6436+
false,
64126437
)
64136438
.await
64146439
.unwrap();

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl MetaStore for MetaStoreMock {
103103
_aggregates: Option<Vec<(String, String)>>,
104104
_partition_split_threshold: Option<u64>,
105105
_trace_obj: Option<String>,
106+
_drop_if_exists: bool,
106107
) -> Result<IdRow<Table>, CubeError> {
107108
panic!("MetaStore mock!")
108109
}

rust/cubestore/cubestore/src/scheduler/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,9 @@ impl SchedulerImpl {
686686
.await?;
687687
for job in orphaned_jobs {
688688
log::info!("Removing orphaned job: {:?}", job);
689+
self.meta_store
690+
.update_status(job.get_id(), JobStatus::Orphaned)
691+
.await?;
689692
self.meta_store.delete_job(job.get_id()).await?;
690693
}
691694
Ok(())

rust/cubestore/cubestore/src/sql/cache.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::metastore::{table::Table, IdRow};
12
use crate::queryplanner::serialized_plan::SerializedPlan;
23
use crate::sql::InlineTables;
34
use crate::sql::SqlQueryContext;
@@ -7,7 +8,7 @@ use deepsize::DeepSizeOf;
78
use futures::Future;
89
use log::trace;
910
use moka::future::{Cache, ConcurrentCacheExt, Iter};
10-
use std::collections::HashSet;
11+
use std::collections::{HashMap, HashSet};
1112
use std::sync::Arc;
1213
use std::time::Duration;
1314
use tokio::sync::{watch, Mutex};
@@ -69,6 +70,8 @@ pub struct SqlResultCache {
6970
lru::LruCache<SqlQueueCacheKey, watch::Receiver<Option<Result<Arc<DataFrame>, CubeError>>>>,
7071
>,
7172
result_cache: Cache<SqlResultCacheKey, Arc<DataFrame>>,
73+
create_table_cache:
74+
Mutex<HashMap<(String, String), watch::Receiver<Option<Result<IdRow<Table>, CubeError>>>>>,
7275
}
7376

7477
pub fn sql_result_cache_sizeof(key: &SqlResultCacheKey, df: &Arc<DataFrame>) -> u32 {
@@ -91,6 +94,7 @@ impl SqlResultCache {
9194
.max_capacity(capacity_bytes)
9295
.weigher(sql_result_cache_sizeof)
9396
.build(),
97+
create_table_cache: Mutex::new(HashMap::new()),
9498
}
9599
}
96100

@@ -188,6 +192,56 @@ impl SqlResultCache {
188192
self.wait_for_queue(receiver, query).await
189193
}
190194

195+
pub async fn create_table<F>(
196+
&self,
197+
schema_name: String,
198+
table_name: String,
199+
exec: impl FnOnce() -> F,
200+
) -> Result<IdRow<Table>, CubeError>
201+
where
202+
F: Future<Output = Result<IdRow<Table>, CubeError>> + Send + 'static,
203+
{
204+
let key = (schema_name.clone(), table_name.clone());
205+
let (sender, mut receiver) = {
206+
let mut cache = self.create_table_cache.lock().await;
207+
let key = key.clone();
208+
if !cache.contains_key(&key) {
209+
let (tx, rx) = watch::channel(None);
210+
cache.insert(key, rx);
211+
212+
(Some(tx), None)
213+
} else {
214+
(None, cache.get(&key).cloned())
215+
}
216+
};
217+
218+
if let Some(sender) = sender {
219+
let result = exec().await;
220+
if let Err(e) = sender.send(Some(result.clone())) {
221+
trace!(
222+
"Failed to set cached query result, possibly flushed from LRU cache: {}",
223+
e
224+
);
225+
}
226+
227+
self.create_table_cache.lock().await.remove(&key);
228+
229+
return result;
230+
}
231+
232+
if let Some(receiver) = &mut receiver {
233+
loop {
234+
receiver.changed().await?;
235+
let x = receiver.borrow();
236+
let value = x.as_ref();
237+
if let Some(value) = value {
238+
return value.clone();
239+
}
240+
}
241+
}
242+
panic!("Unexpected state: wait receiver expected but cache was empty")
243+
}
244+
191245
#[tracing::instrument(level = "trace", skip(self, receiver))]
192246
async fn wait_for_queue(
193247
&self,

0 commit comments

Comments
 (0)