Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ arrow-ord = { version = "57" }
arrow-row = { version = "57" }
arrow-schema = { version = "57" }
arrow-select = { version = "57" }
object_store = { version = "0.12.1" }
object_store = { version = "0.13" }
parquet = { version = "57" }

# datafusion 52.1
Expand Down
60 changes: 2 additions & 58 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,14 +1575,6 @@ mod tests {
// Currently only read operations are recorded. Extend as necessary.
#[async_trait::async_trait]
impl ObjectStore for RecordingObjectStore {
async fn put(
&self,
location: &Path,
payload: PutPayload,
) -> object_store::Result<PutResult> {
self.inner.put(location, payload).await
}

async fn put_opts(
&self,
location: &Path,
Expand All @@ -1592,13 +1584,6 @@ mod tests {
self.inner.put_opts(location, payload, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}

async fn put_multipart_opts(
&self,
location: &Path,
Expand All @@ -1607,13 +1592,6 @@ mod tests {
self.inner.put_multipart_opts(location, opts).await
}

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
self.operations
.send(ObjectStoreOperation::Get(location.into()))
.unwrap();
self.inner.get(location).await
}

async fn get_opts(
&self,
location: &Path,
Expand All @@ -1625,20 +1603,6 @@ mod tests {
self.inner.get_opts(location, options).await
}

async fn get_range(
&self,
location: &Path,
range: Range<u64>,
) -> object_store::Result<Bytes> {
self.operations
.send(ObjectStoreOperation::GetRange(
location.into(),
range.clone(),
))
.unwrap();
self.inner.get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
Expand All @@ -1653,14 +1617,6 @@ mod tests {
self.inner.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.inner.head(location).await
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, object_store::Result<Path>>,
Expand Down Expand Up @@ -1690,20 +1646,8 @@ mod tests {
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}

async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
self.inner.copy_opts(from, to, options).await
}
}
}
2 changes: 1 addition & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use delta_kernel::{AsAny, Engine};
use futures::StreamExt;
use object_store::ObjectStoreScheme;
use object_store::{Error as ObjectStoreError, ObjectStore, path::Path};
use object_store::{Error as ObjectStoreError, ObjectStore, ObjectStoreExt, path::Path};
use regex::Regex;
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/logstore/storage/retry_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Retry extension for [`ObjectStore`]

use ::object_store::path::Path;
use ::object_store::{Error, ObjectStore, PutPayload, PutResult, Result};
use ::object_store::{Error, ObjectStore, ObjectStoreExt, PutPayload, PutResult, Result};
use tracing::*;

#[cfg(feature = "cloud")]
Expand Down
105 changes: 22 additions & 83 deletions crates/core/src/logstore/storage/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::ops::Range;

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `std::ops::Range`

Check warning on line 1 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `std::ops::Range`
use std::sync::OnceLock;

use bytes::Bytes;

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / cloud (gcp)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `bytes::Bytes`

Check warning on line 4 in crates/core/src/logstore/storage/runtime.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `bytes::Bytes`
Expand All @@ -7,6 +7,8 @@
use futures::TryFutureExt;
use futures::future::BoxFuture;
use futures::stream::BoxStream;

use object_store::CopyOptions;
use object_store::path::Path;
use object_store::{
Error as ObjectStoreError, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore,
Expand Down Expand Up @@ -185,15 +187,6 @@

#[async_trait::async_trait]
impl<T: ObjectStore + Clone> ObjectStore for DeltaIOStorageBackend<T> {
async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
self.spawn_io_rt(
|store, path| store.put(path, bytes),
&self.inner,
location.clone(),
)
.await
}

async fn put_opts(
&self,
location: &Path,
Expand All @@ -208,41 +201,22 @@
.await
}

async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
self.spawn_io_rt(|store, path| store.get(path), &self.inner, location.clone())
.await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.spawn_io_rt(
|store, path| store.get_opts(path, options),
&self.inner,
location.clone(),
)
.await
}

async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
self.spawn_io_rt(
|store, path| store.get_range(path, range),
&self.inner,
location.clone(),
)
.await
}

async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
async fn put_multipart_opts(
&self,
location: &Path,
options: PutMultipartOptions,
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.spawn_io_rt(
|store, path| store.head(path),
|store, path| store.put_multipart_opts(path, options),
&self.inner,
location.clone(),
)
.await
}

async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.spawn_io_rt(
|store, path| store.delete(path),
|store, path| store.get_opts(path, options),
&self.inner,
location.clone(),
)
Expand All @@ -265,56 +239,21 @@
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.spawn_io_rt_from_to(
|store, from_path, to_path| store.copy(from_path, to_path),
&self.inner,
from.clone(),
to.clone(),
)
.await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.spawn_io_rt_from_to(
|store, from_path, to_path| store.copy_if_not_exists(from_path, to_path),
&self.inner,
from.clone(),
to.clone(),
)
.await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.spawn_io_rt_from_to(
|store, from_path, to_path| store.rename_if_not_exists(from_path, to_path),
&self.inner,
from.clone(),
to.clone(),
)
.await
}

async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.spawn_io_rt(
|store, path| store.put_multipart(path),
&self.inner,
location.clone(),
)
.await
/// Copy an object from one path to another in the same object store.
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> ObjectStoreResult<()> {
self.inner.copy_opts(from, to, options).await
}

async fn put_multipart_opts(
fn delete_stream(
&self,
location: &Path,
options: PutMultipartOptions,
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.spawn_io_rt(
|store, path| store.put_multipart_opts(path, options),
&self.inner,
location.clone(),
)
.await
locations: BoxStream<'static, ObjectStoreResult<Path>>,
) -> BoxStream<'static, ObjectStoreResult<Path>> {
self.inner.delete_stream(locations)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use tracing::log::*;
use super::{CustomExecuteHandler, Operation};
use crate::kernel::{EagerSnapshot, resolve_snapshot};
use crate::logstore::LogStoreRef;
use crate::logstore::object_store::PutPayload;
use crate::logstore::object_store::{ObjectStoreExt, PutPayload};
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use object_store::path::Path;
use object_store::{ObjectStore, ObjectStoreExt};
use serde::Serialize;
use uuid::Uuid;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::{DateTime, Utc};
use futures::future::ready;
use futures::stream::{BoxStream, once};
use futures::{StreamExt, TryStreamExt};
use object_store::{ObjectStore, path::Path};
use object_store::{ObjectStore, ObjectStoreExt, path::Path};
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down
Loading