Skip to content

Commit 05fd114

Browse files
Merge pull request #23 from RelationalAI/ag-list-offset
Add offset to list and prevent checksum generation when using S3 unsigned payload
2 parents 5862386 + 04ce507 commit 05fd114

File tree

8 files changed

+218
-205
lines changed

8 files changed

+218
-205
lines changed

Cargo.lock

Lines changed: 7 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ bench = false
2020
[profile.release]
2121
debug = 1
2222

23+
[features]
24+
default = ["julia"]
25+
julia = []
26+
2327
[dependencies]
2428
tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "signal", "time"] }
2529
tokio-util = { version = "0.7", default-features = false, features = ["io"] }
@@ -28,9 +32,9 @@ tracing = "0.1"
2832
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2933
futures-util = "0.3"
3034
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "hickory-dns"] }
31-
# object_store = { version = "0.8", features = ["azure", "aws"] }
32-
# Pinned to a specific commit while waiting for the next release
33-
object_store = { version = "0.10.1", features = ["azure", "aws"] }
35+
# object_store = { version = "0.10.1", features = ["azure", "aws"] }
36+
# Pinned to a specific commit while waiting for upstream
37+
object_store = { git = "https://github.com/andrebsguedes/arrow-rs.git", branch = "unsigned-payload-and-azure-list-offset", features = ["azure", "aws", "experimental-azure-list-offset"] }
3438
thiserror = "1"
3539
anyhow = { version = "1", features = ["backtrace"] }
3640
once_cell = "1.18"

src/crud_ops.rs

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use crate::{CResult, Config, NotifyGuard, SQ, clients, dyn_connect, static_config, Request, util::cstr_to_path, Context, RawResponse, ResponseGuard};
1+
use crate::{CResult, Client, RawConfig, NotifyGuard, SQ, static_config, Request, util::cstr_to_path, Context, RawResponse, ResponseGuard};
22

33
use object_store::{path::Path, ObjectStore};
44

55
use anyhow::anyhow;
66
use std::ffi::{c_char, c_void};
77
use futures_util::StreamExt;
88
use tokio::io::AsyncWriteExt;
9-
use std::sync::Arc;
109

1110
// The type used to give Julia the result of an async request. It will be allocated
1211
// by Julia as part of the request and filled in by Rust.
@@ -43,15 +42,15 @@ impl RawResponse for Response {
4342
}
4443
}
4544

46-
async fn multipart_get(slice: &mut [u8], path: &Path, client: &dyn ObjectStore) -> anyhow::Result<usize> {
47-
let result = client.head(&path).await?;
45+
async fn multipart_get(slice: &mut [u8], path: &Path, client: &Client) -> anyhow::Result<usize> {
46+
let result = client.store.head(&path).await?;
4847
if result.size > slice.len() {
4948
return Err(anyhow!("Supplied buffer was too small"));
5049
}
5150

5251
let part_ranges = crate::util::size_to_ranges(result.size);
5352

54-
let result_vec = client.get_ranges(&path, &part_ranges).await?;
53+
let result_vec = client.store.get_ranges(&path, &part_ranges).await?;
5554
let mut accum: usize = 0;
5655
for i in 0..result_vec.len() {
5756
slice[accum..accum + result_vec[i].len()].copy_from_slice(&result_vec[i]);
@@ -61,9 +60,9 @@ async fn multipart_get(slice: &mut [u8], path: &Path, client: &dyn ObjectStore)
6160
return Ok(accum);
6261
}
6362

64-
async fn multipart_put(slice: &[u8], path: &Path, client: Arc<dyn ObjectStore>) -> anyhow::Result<()> {
63+
async fn multipart_put(slice: &[u8], path: &Path, client: Client) -> anyhow::Result<()> {
6564
let mut writer = object_store::buffered::BufWriter::with_capacity(
66-
client,
65+
client.store,
6766
path.clone(),
6867
10 * 1024 * 1024
6968
)
@@ -88,19 +87,15 @@ async fn multipart_put(slice: &[u8], path: &Path, client: Arc<dyn ObjectStore>)
8887
};
8988
}
9089

91-
pub(crate) async fn handle_get(slice: &mut [u8], path: &Path, config: &Config) -> anyhow::Result<usize> {
92-
let (client, _) = clients()
93-
.try_get_with(config.get_hash(), dyn_connect(config)).await
94-
.map_err(|e| anyhow!(e))?;
95-
90+
pub(crate) async fn handle_get(client: Client, slice: &mut [u8], path: &Path) -> anyhow::Result<usize> {
9691
// Multipart Get
9792
if slice.len() > static_config().multipart_get_threshold as usize {
9893
let accum = multipart_get(slice, path, &client).await?;
9994
return Ok(accum);
10095
}
10196

10297
// Single part Get
103-
let body = client.get(path).await?;
98+
let body = client.store.get(path).await?;
10499
let mut batch_stream = body.into_stream().chunks(8);
105100

106101
let mut received_bytes = 0;
@@ -129,27 +124,19 @@ pub(crate) async fn handle_get(slice: &mut [u8], path: &Path, config: &Config) -
129124
Ok(received_bytes)
130125
}
131126

132-
pub(crate) async fn handle_put(slice: &'static [u8], path: &Path, config: &Config) -> anyhow::Result<usize> {
133-
let (client, _) = clients()
134-
.try_get_with(config.get_hash(), dyn_connect(config)).await
135-
.map_err(|e| anyhow!(e))?;
136-
127+
pub(crate) async fn handle_put(client: Client, slice: &'static [u8], path: &Path) -> anyhow::Result<usize> {
137128
let len = slice.len();
138129
if len < static_config().multipart_put_threshold as usize {
139-
let _ = client.put(path, slice.into()).await?;
130+
let _ = client.store.put(path, slice.into()).await?;
140131
} else {
141132
let _ = multipart_put(slice, path, client).await?;
142133
}
143134

144135
Ok(len)
145136
}
146137

147-
pub(crate) async fn handle_delete(path: &Path, config: &Config) -> anyhow::Result<usize> {
148-
let (client, _) = clients()
149-
.try_get_with(config.get_hash(), dyn_connect(config)).await
150-
.map_err(|e| anyhow!(e))?;
151-
152-
client.delete(path).await?;
138+
pub(crate) async fn handle_delete(client: Client, path: &Path) -> anyhow::Result<usize> {
139+
client.store.delete(path).await?;
153140

154141
Ok(0)
155142
}
@@ -159,7 +146,7 @@ pub extern "C" fn get(
159146
path: *const c_char,
160147
buffer: *mut u8,
161148
size: usize,
162-
config: *const Config,
149+
config: *const RawConfig,
163150
response: *mut Response,
164151
handle: *const c_void
165152
) -> CResult {
@@ -195,7 +182,7 @@ pub extern "C" fn put(
195182
path: *const c_char,
196183
buffer: *const u8,
197184
size: usize,
198-
config: *const Config,
185+
config: *const RawConfig,
199186
response: *mut Response,
200187
handle: *const c_void
201188
) -> CResult {
@@ -229,7 +216,7 @@ pub extern "C" fn put(
229216
#[no_mangle]
230217
pub extern "C" fn delete(
231218
path: *const c_char,
232-
config: *const Config,
219+
config: *const RawConfig,
233220
response: *mut Response,
234221
handle: *const c_void
235222
) -> CResult {

src/error.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::time::Duration;
22
use backoff::backoff::Backoff;
3+
use object_store::RetryConfig;
34
use once_cell::sync::Lazy;
4-
use crate::ConfigMeta;
55
use std::error::Error as StdError;
66
use anyhow::anyhow;
77

@@ -122,15 +122,15 @@ pub(crate) fn extract_error_info(error: &anyhow::Error) -> ErrorInfo {
122122
pub(crate) struct RetryState {
123123
pub(crate) start: std::time::Instant,
124124
pub(crate) attempts: Vec<ErrorInfo>,
125-
pub(crate) config_meta: ConfigMeta
125+
pub(crate) retry_config: RetryConfig
126126
}
127127

128128
impl RetryState {
129-
pub(crate) fn new(config_meta: ConfigMeta) -> RetryState {
129+
pub(crate) fn new(retry_config: RetryConfig) -> RetryState {
130130
RetryState {
131131
start: std::time::Instant::now(),
132132
attempts: vec![],
133-
config_meta
133+
retry_config
134134
}
135135
}
136136

@@ -145,8 +145,8 @@ impl RetryState {
145145
// We try to use the same settings as the object_store backoff but the implementation is
146146
// different so this is best effort.
147147
let mut backoff = backoff::ExponentialBackoff {
148-
initial_interval: self.config_meta.retry_config.backoff.init_backoff,
149-
max_interval: self.config_meta.retry_config.backoff.max_backoff,
148+
initial_interval: self.retry_config.backoff.init_backoff,
149+
max_interval: self.retry_config.backoff.max_backoff,
150150
..Default::default()
151151
};
152152

@@ -155,16 +155,16 @@ impl RetryState {
155155
let _ = backoff.next_backoff();
156156
}
157157

158-
backoff.next_backoff().unwrap_or(self.config_meta.retry_config.backoff.max_backoff)
158+
backoff.next_backoff().unwrap_or(self.retry_config.backoff.max_backoff)
159159
}
160160

161161
fn log_attempt(&mut self, info: ErrorInfo) {
162162
self.attempts.push(info);
163163
}
164164

165165
pub(crate) fn should_retry_logic(&self) -> bool {
166-
let max_retries = self.config_meta.retry_config.max_retries;
167-
let retry_timeout = self.config_meta.retry_config.retry_timeout;
166+
let max_retries = self.retry_config.max_retries;
167+
let retry_timeout = self.retry_config.retry_timeout;
168168
let elapsed = self.start.elapsed();
169169
let all_retries = self.retries();
170170

0 commit comments

Comments
 (0)