Skip to content

Commit 3149383

Browse files
author
Devdutt Shenoi
committed
refactor: ObjectStorage: Send
1 parent dd841ef commit 3149383

File tree

8 files changed

+12
-14
lines changed

8 files changed

+12
-14
lines changed

server/src/catalog/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ async fn create_manifest(
318318
}
319319

320320
pub async fn remove_manifest_from_snapshot(
321-
storage: Arc<dyn ObjectStorage + Send>,
321+
storage: Arc<dyn ObjectStorage>,
322322
stream_name: &str,
323323
dates: Vec<String>,
324324
) -> Result<Option<String>, ObjectStorageError> {
@@ -343,7 +343,7 @@ pub async fn remove_manifest_from_snapshot(
343343
}
344344

345345
pub async fn get_first_event(
346-
storage: Arc<dyn ObjectStorage + Send>,
346+
storage: Arc<dyn ObjectStorage>,
347347
stream_name: &str,
348348
dates: Vec<String>,
349349
) -> Result<Option<String>, ObjectStorageError> {

server/src/hottier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ impl HotTierManager {
289289
stream: &str,
290290
manifest_files_to_download: &mut BTreeMap<String, Vec<String>>,
291291
parquet_file_size: &mut u64,
292-
object_store: Arc<dyn ObjectStorage + Send>,
292+
object_store: Arc<dyn ObjectStorage>,
293293
) -> Result<(), HotTierError> {
294294
if manifest_files_to_download.is_empty() {
295295
return Ok(());

server/src/migration/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> {
297297
}
298298

299299
async fn run_meta_file_migration(
300-
object_store: &Arc<dyn ObjectStorage + Send>,
300+
object_store: &Arc<dyn ObjectStorage>,
301301
old_meta_file_path: RelativePathBuf,
302302
) -> anyhow::Result<()> {
303303
// get the list of all meta files
@@ -328,9 +328,7 @@ async fn run_meta_file_migration(
328328
Ok(())
329329
}
330330

331-
async fn run_stream_files_migration(
332-
object_store: &Arc<dyn ObjectStorage + Send>,
333-
) -> anyhow::Result<()> {
331+
async fn run_stream_files_migration(object_store: &Arc<dyn ObjectStorage>) -> anyhow::Result<()> {
334332
let streams = object_store
335333
.list_old_streams()
336334
.await?

server/src/option.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub static CONFIG: Lazy<Arc<Config>> = Lazy::new(|| Arc::new(Config::new()));
3939
#[derive(Debug)]
4040
pub struct Config {
4141
pub parseable: Cli,
42-
storage: Arc<dyn ObjectStorageProvider + Send + Sync>,
42+
storage: Arc<dyn ObjectStorageProvider>,
4343
pub storage_name: &'static str,
4444
}
4545

server/src/storage/azure_blob.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl ObjectStorageProvider for AzureBlobConfig {
163163
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
164164
}
165165

166-
fn get_object_store(&self) -> Arc<dyn super::ObjectStorage + Send> {
166+
fn get_object_store(&self) -> Arc<dyn super::ObjectStorage> {
167167
let azure = self.get_default_builder().build().unwrap();
168168
// limit objectstore to a concurrent request limit
169169
let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS);

server/src/storage/localfs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl ObjectStorageProvider for FSConfig {
6767
RuntimeConfig::new()
6868
}
6969

70-
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {
70+
fn get_object_store(&self) -> Arc<dyn ObjectStorage> {
7171
Arc::new(LocalFS::new(self.root.clone()))
7272
}
7373

server/src/storage/object_storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ use std::{
5858
time::{Duration, Instant},
5959
};
6060

61-
pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug {
61+
pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync {
6262
fn get_datafusion_runtime(&self) -> RuntimeConfig;
63-
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send>;
63+
fn get_object_store(&self) -> Arc<dyn ObjectStorage>;
6464
fn get_endpoint(&self) -> String;
6565
fn register_store_metrics(&self, handler: &PrometheusMetrics);
6666
}
6767

6868
#[async_trait]
69-
pub trait ObjectStorage: Sync + 'static {
69+
pub trait ObjectStorage: Send + Sync + 'static {
7070
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError>;
7171
// TODO: make the filter function optional as we may want to get all objects
7272
async fn get_objects(

server/src/storage/s3.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ impl ObjectStorageProvider for S3Config {
289289
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry))
290290
}
291291

292-
fn get_object_store(&self) -> Arc<dyn ObjectStorage + Send> {
292+
fn get_object_store(&self) -> Arc<dyn ObjectStorage> {
293293
let s3 = self.get_default_builder().build().unwrap();
294294

295295
// limit objectstore to a concurrent request limit

0 commit comments

Comments
 (0)