Skip to content

Commit 9b4f05f

Browse files
committed
feat(cubestore): Add ExtendedRemoteFs interface and make S3 use priority queue based implementation
1 parent cf4b6b3 commit 9b4f05f

File tree

11 files changed

+305
-73
lines changed

11 files changed

+305
-73
lines changed

rust/cubestore/Cargo.lock

Lines changed: 24 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubestore/cubestore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ chrono-tz = "0.8.2"
5252
lazy_static = "1.4.0"
5353
mockall = "0.8.1"
5454
async-std = "0.99"
55+
async-stream = "0.3.6"
5556
itertools = "0.11.0"
5657
bigdecimal = { version = "0.2.0", features = ["serde"] }
5758
# Right now, it's not possible to use the 0.33 release because it has bugs

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::remotefs::gcs::GCSRemoteFs;
2828
use crate::remotefs::minio::MINIORemoteFs;
2929
use crate::remotefs::queue::QueueRemoteFs;
3030
use crate::remotefs::s3::S3RemoteFs;
31-
use crate::remotefs::{LocalDirRemoteFs, RemoteFs};
31+
use crate::remotefs::{ExtendedRemoteFs, LocalDirRemoteFs, RemoteFs};
3232
use crate::scheduler::SchedulerImpl;
3333
use crate::sql::cache::SqlResultCache;
3434
use crate::sql::{SqlService, SqlServiceImpl};
@@ -1907,7 +1907,8 @@ impl Config {
19071907
self.injector
19081908
.register("cachestore_fs", async move |i| {
19091909
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
1910-
let original_remote_fs = i.get_service("original_remote_fs").await;
1910+
let original_remote_fs: Arc<dyn ExtendedRemoteFs> =
1911+
i.get_service("original_remote_fs").await;
19111912
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new_for_cachestore(
19121913
original_remote_fs,
19131914
i.get_service_typed().await,
@@ -1982,7 +1983,8 @@ impl Config {
19821983
self.injector
19831984
.register("metastore_fs", async move |i| {
19841985
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
1985-
let original_remote_fs = i.get_service("original_remote_fs").await;
1986+
let original_remote_fs: Arc<dyn ExtendedRemoteFs> =
1987+
i.get_service("original_remote_fs").await;
19861988
let arc: Arc<dyn DIService> = BaseRocksStoreFs::new_for_metastore(
19871989
original_remote_fs,
19881990
i.get_service_typed().await,

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6546,8 +6546,9 @@ mod tests {
65466546

65476547
#[tokio::test]
65486548
async fn delete_old_snapshots() {
6549+
let metastore_snapshots_lifetime_secs = 1;
65496550
let config = Config::test("delete_old_snapshots").update_config(|mut obj| {
6550-
obj.metastore_snapshots_lifetime = 1;
6551+
obj.metastore_snapshots_lifetime = metastore_snapshots_lifetime_secs;
65516552
obj.minimum_metastore_snapshots_count = 2;
65526553
obj
65536554
});
@@ -6616,14 +6617,22 @@ mod tests {
66166617
.await
66176618
.unwrap();
66186619

6619-
assert_eq!(uploaded3.len(), 3);
6620+
assert_eq!(
6621+
uploaded3.len(),
6622+
3,
6623+
"uploaded3 keys: {}",
6624+
uploaded3.keys().join(", ")
6625+
);
66206626

66216627
meta_store
66226628
.create_schema("foo4".to_string(), false)
66236629
.await
66246630
.unwrap();
66256631

6626-
tokio::time::sleep(Duration::from_millis(1100)).await;
6632+
tokio::time::sleep(Duration::from_millis(
6633+
metastore_snapshots_lifetime_secs * 1000 + 100,
6634+
))
6635+
.await;
66276636
meta_store.upload_check_point().await.unwrap();
66286637

66296638
let uploaded4 =

0 commit comments

Comments
 (0)