Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 25 additions & 42 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ futures-util = "0.3"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] }
# object_store = { version = "0.10.1", features = ["azure", "aws"] }
# Pinned to a specific commit while waiting for upstream
object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", tag = "v0.10.2-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] }
object_store = { git = "https://github.com/RelationalAI/arrow-rs.git", branch = "object_store_0.11.3-beta1", features = ["azure", "aws", "experimental-azure-list-offset", "experimental-arbitrary-list-prefix"] }
hickory-resolver = "0.24"
thiserror = "1"
anyhow = { version = "1", features = ["backtrace"] }
Expand Down
143 changes: 142 additions & 1 deletion src/crud_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ::metrics::counter;
use object_store::{path::Path, ObjectStore};

use tokio_util::io::StreamReader;
use std::{ffi::{c_char, c_void}, sync::Arc};
use std::{ffi::{c_char, c_void, CString}, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
use futures_util::{stream, StreamExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};

Expand Down Expand Up @@ -44,6 +44,88 @@ impl RawResponse for Response {
}
}

// ================================================================================================
// Boiler plate code for FFI structs
// Any non-copy fields of ListEntry must be properly destroyed on destroy_list_entries
#[repr(C)]
pub struct BulkFailedEntry {
path: *const c_char,
error_message: *const c_char
}
unsafe impl Send for BulkFailedEntry {}

// Only stores paths of entries that the bulk operation failed on
#[repr(C)]
pub struct BulkResponse {
result: CResult,
failed_entries: *const BulkFailedEntry,
failed_count: u64,
error_message: *mut c_char,
context: *const Context
}

unsafe impl Send for BulkResponse {}

impl RawResponse for BulkResponse {
type Payload = Vec<(Path, String)>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed elsewhere we will fix the ergonomics of this later, but one thing we can do now is defer converting the error to string to set_payload. That way a Rust user of bulk_delete still has an Error to work with.

Suggested change
type Payload = Vec<(Path, String)>;
type Payload = Vec<(Path, crate::Error)>;

fn result_mut(&mut self) -> &mut CResult {
&mut self.result
}
fn context_mut(&mut self) -> &mut *const Context {
&mut self.context
}
fn error_message_mut(&mut self) -> &mut *mut c_char {
&mut self.error_message
}
fn set_payload(&mut self, payload: Option<Self::Payload>) {
match payload {
Some(entries) => {
let entries = entries.into_iter().map(|(path, error_msg)| {
BulkFailedEntry::new(path, error_msg)
}).collect::<Vec<BulkFailedEntry>>();
let entries_slice = entries.into_boxed_slice();
let entry_count = entries_slice.len() as u64;
let entries_ptr = entries_slice.as_ptr();
std::mem::forget(entries_slice);

self.failed_count = entry_count;
self.failed_entries = entries_ptr;
}
None => {
self.failed_entries = std::ptr::null();
self.failed_count = 0;
}
}
}
}

#[no_mangle]
pub extern "C" fn destroy_bulk_failed_entries(
entries: *mut BulkFailedEntry,
entry_count: u64
) -> CResult {
let boxed_slice = unsafe { Box::from_raw(std::slice::from_raw_parts_mut(entries, entry_count as usize)) };
for entry in &*boxed_slice {
// Safety: must properly drop all allocated fields from ListEntry here
let _ = unsafe { CString::from_raw(entry.path.cast_mut()) };
}
CResult::Ok
}

impl BulkFailedEntry {
pub fn new(path: Path, error_msg:String) -> Self {
BulkFailedEntry {
path: CString::new(path.to_string())
.expect("should not have nulls")
.into_raw(),
error_message: CString::new(error_msg.to_string())
.expect("should not have nulls")
.into_raw()
}
}
}
// ================================================================================================

async fn read_to_slice(reader: &mut BoxedReader, mut slice: &mut [u8]) -> crate::Result<usize> {
let mut received_bytes = 0;
loop {
Expand Down Expand Up @@ -143,6 +225,49 @@ impl Client {
counter!(metrics::total_delete_ops).increment(1);
with_retries!(self, self.delete_impl(path).await)
}

async fn bulk_delete_impl(&self, paths: &Vec<Path>) -> crate::Result<Vec<(Path, String)>> {
let stream = stream::iter(paths.iter().map(|path| Ok(path.clone()))).boxed();
// Counter to keep track of the index of the path that failed to delete
let counter = Arc::new(AtomicUsize::new(0));
let bulk_failed_entries = self.store.delete_stream(stream)
.filter_map(|result| async {
let counter_clone = Arc::clone(&counter);
let index = counter_clone.fetch_add(1, Ordering::SeqCst);
match result {
Ok(path) => {
None
},
Err(e) => match e {
// We treat not found as success because AWS S3 does not return an error
// if the object does not exist
object_store::Error::NotFound { path: _, source } => {
None
},
_ => {
Some((paths[index].clone(), e.to_string()))
}
},
}
})
.collect::<Vec<(Path, String)>>()
.await;
// Rail guard to catch generic errors
let callbacks_called = counter.load(Ordering::SeqCst);
if callbacks_called < paths.len() {
if callbacks_called == 0 {
Err(crate::Error::invalid_response("Some paths were not deleted"))
} else {
Err(crate::Error::invalid_response(bulk_failed_entries[0].1.clone()))
}
} else {
Ok(bulk_failed_entries)
}
}
pub async fn bulk_delete(&self, paths: Vec<Path>) -> crate::Result<Vec<(Path, String)>> {
counter!(metrics::total_bulk_delete_ops).increment(1);
with_retries!(self, self.bulk_delete_impl(&paths).await)
}
async fn multipart_get_impl(&self, path: &Path, slice: &mut [u8]) -> crate::Result<usize> {
let _guard = duration_on_drop!(metrics::multipart_get_attempt_duration);
let result = self.store.get_opts(
Expand Down Expand Up @@ -253,3 +378,19 @@ export_queued_op!(
},
path: *const c_char
);

export_queued_op!(
bulk_delete,
BulkResponse,
|config, response| {
let mut paths_vec:Vec<Path> = Vec::new();
for i in 0..num_paths as isize {
let path_ptr = unsafe { *path_c_array.offset(i)};
let path = unsafe { std::ffi::CStr::from_ptr(path_ptr) };
let path = unsafe { cstr_to_path(path) };
paths_vec.push(path);
}
Ok(Request::BulkDelete(paths_vec, config, response))
},
path_c_array: *const *const c_char, num_paths: usize
);
Loading