Skip to content

Commit 5df5ca2

Browse files
authored
s3 range propagation (#23)
* fix * fixes * please our overload, the linter * fix pr comment
1 parent b83054b commit 5df5ca2

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

cas_client/src/remote_client.rs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ use anyhow::anyhow;
44
use bytes::Buf;
55
use cas::key::Key;
66
use cas_types::{QueryChunkResponse, QueryReconstructionResponse, UploadXorbResponse};
7-
use reqwest::{header::{HeaderMap, HeaderValue}, StatusCode, Url};
7+
use reqwest::{
8+
header::{HeaderMap, HeaderValue},
9+
StatusCode, Url,
10+
};
811
use serde::{de::DeserializeOwned, Serialize};
912

1013
use bytes::Bytes;
1114
use cas_object::CasObject;
1215
use cas_types::CASReconstructionTerm;
13-
use tracing::warn;
16+
use tracing::{debug, warn};
1417

1518
use crate::{error::Result, CasClientError};
1619

1720
use merklehash::MerkleHash;
18-
use tracing::debug;
1921

2022
use crate::Client;
2123
pub const CAS_ENDPOINT: &str = "http://localhost:8080";
@@ -83,8 +85,8 @@ impl Client for RemoteClient {
8385

8486
impl RemoteClient {
8587
pub async fn from_config(endpoint: String, token: Option<String>) -> Self {
86-
Self {
87-
client: CASAPIClient::new(&endpoint, token)
88+
Self {
89+
client: CASAPIClient::new(&endpoint, token),
8890
}
8991
}
9092
}
@@ -108,7 +110,7 @@ impl CASAPIClient {
108110
Self {
109111
client,
110112
endpoint: endpoint.to_string(),
111-
token
113+
token,
112114
}
113115
}
114116

@@ -170,7 +172,7 @@ impl CASAPIClient {
170172
&key.hash,
171173
contents,
172174
&chunk_boundaries.into_iter().map(|x| x as u32).collect(),
173-
cas_object::CompressionScheme::LZ4
175+
cas_object::CompressionScheme::LZ4,
174176
)?;
175177

176178
debug!("Upload: POST to {url:?} for {key:?}");
@@ -224,14 +226,17 @@ impl CASAPIClient {
224226
/// Reconstruct the file
225227
async fn reconstruct_file(&self, file_id: &MerkleHash) -> Result<QueryReconstructionResponse> {
226228
let url = Url::parse(&format!(
227-
"{}/reconstruction/{}",
228-
self.endpoint,
229+
"{}/reconstruction/{}",
230+
self.endpoint,
229231
file_id.hex()
230232
))?;
231233

232234
let mut headers = HeaderMap::new();
233235
if let Some(tok) = &self.token {
234-
headers.insert("Authorization", HeaderValue::from_str(&format!("Bearer {}", tok)).unwrap());
236+
headers.insert(
237+
"Authorization",
238+
HeaderValue::from_str(&format!("Bearer {}", tok)).unwrap(),
239+
);
235240
}
236241

237242
let response = self.client.get(url).headers(headers).send().await?;
@@ -264,9 +269,21 @@ impl CASAPIClient {
264269
}
265270

266271
async fn get_one(term: &CASReconstructionTerm) -> Result<Bytes> {
272+
debug!("term: {term:?}");
273+
274+
if term.range.end < term.range.start || term.url_range.end < term.url_range.start {
275+
return Err(CasClientError::InternalError(anyhow!(
276+
"invalid range in reconstruction"
277+
)));
278+
}
279+
267280
let url = Url::parse(term.url.as_str())?;
268281
let response = reqwest::Client::new()
269282
.request(hyper::Method::GET, url)
283+
.header(
284+
reqwest::header::RANGE,
285+
format!("bytes={}-{}", term.url_range.start, term.url_range.end),
286+
)
270287
.send()
271288
.await?
272289
.error_for_status()?;
@@ -275,11 +292,13 @@ async fn get_one(term: &CASReconstructionTerm) -> Result<Bytes> {
275292
.await
276293
.map_err(CasClientError::ReqwestError)?;
277294
let mut readseek = Cursor::new(xorb_bytes.to_vec());
295+
let data = cas_object::deserialize_chunks(&mut readseek)?;
296+
let len = (term.range.end - term.range.start) as usize;
297+
let offset = term.range_start_offset as usize;
278298

279-
let cas_object = CasObject::deserialize(&mut readseek)?;
280-
let data = cas_object.get_range(&mut readseek, term.range.start, term.range.end)?;
299+
let sliced = data[offset..offset + len].to_vec();
281300

282-
Ok(Bytes::from(data))
301+
Ok(Bytes::from(sliced))
283302
}
284303

285304
#[cfg(test)]

cas_types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub struct CASReconstructionTerm {
2424
pub range: Range,
2525
pub range_start_offset: u32,
2626
pub url: String,
27+
pub url_range: Range,
2728
}
2829

2930
#[derive(Debug, Serialize, Deserialize, Clone)]

hf_xet/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)