Skip to content

Commit 1936d72

Browse files
jgodlewbpronan
andauthored
Fix endpoint propagation (#13)
Co-authored-by: Brian Ronan <[email protected]>
1 parent d819d0b commit 1936d72

File tree

4 files changed

+194
-37
lines changed

4 files changed

+194
-37
lines changed

cas_client/src/remote_client.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::io::{BufWriter, Cursor, Write};
1+
use std::io::{Cursor, Write};
22

33
use anyhow::anyhow;
44
use bytes::Buf;
@@ -20,14 +20,11 @@ use merklehash::MerkleHash;
2020
use tracing::debug;
2121

2222
use crate::Client;
23-
24-
pub const CAS_ENDPOINT: &str = "localhost:8080";
25-
pub const SCHEME: &str = "http:/";
23+
pub const CAS_ENDPOINT: &str = "http://localhost:8080";
2624
pub const PREFIX_DEFAULT: &str = "default";
2725

2826
#[derive(Debug)]
2927
pub struct RemoteClient {
30-
endpoint: String,
3128
client: CASAPIClient,
3229
}
3330

@@ -88,34 +85,32 @@ impl Client for RemoteClient {
8885

8986
impl RemoteClient {
9087
pub async fn from_config(endpoint: String) -> Self {
91-
Self { endpoint, client: CASAPIClient::default() }
88+
Self { client: CASAPIClient::new(&endpoint) }
9289
}
9390
}
9491

9592
#[derive(Debug)]
9693
pub struct CASAPIClient {
9794
client: reqwest::Client,
98-
scheme: String,
9995
endpoint: String,
10096
}
10197

10298
impl Default for CASAPIClient {
10399
fn default() -> Self {
104-
Self::new(SCHEME, CAS_ENDPOINT)
100+
Self::new(CAS_ENDPOINT)
105101
}
106102
}
107103

108104
impl CASAPIClient {
109-
pub fn new(scheme: &str, endpoint: &str) -> Self {
105+
pub fn new(endpoint: &str) -> Self {
110106
let client = reqwest::Client::builder()
111-
.http2_prior_knowledge()
112107
.build()
113108
.unwrap();
114-
Self { client, scheme: scheme.to_string(), endpoint: endpoint.to_string() }
109+
Self { client, endpoint: endpoint.to_string() }
115110
}
116111

117112
pub async fn exists(&self, key: &Key) -> Result<bool> {
118-
let url = Url::parse(&format!("{0}/{1}/xorb/{key}", self.scheme, self.endpoint))?;
113+
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;
119114
let response = self.client.head(url).send().await?;
120115
match response.status() {
121116
StatusCode::OK => Ok(true),
@@ -127,7 +122,7 @@ impl CASAPIClient {
127122
}
128123

129124
pub async fn get_length(&self, key: &Key) -> Result<Option<u64>> {
130-
let url = Url::parse(&format!("{0}/{1}/xorb/{key}", self.scheme, self.endpoint))?;
125+
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;
131126
let response = self.client.head(url).send().await?;
132127
let status = response.status();
133128
if status == StatusCode::NOT_FOUND {
@@ -163,21 +158,22 @@ impl CASAPIClient {
163158
contents: &[u8],
164159
chunk_boundaries: Vec<u64>,
165160
) -> Result<bool> {
166-
let url = Url::parse(&format!("{0}/{1}/xorb/{key}", self.scheme, self.endpoint))?;
161+
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;
167162

168-
let writer = Cursor::new(Vec::new());
169-
let mut buf = BufWriter::new(writer);
163+
let mut writer = Cursor::new(Vec::new());
170164

171165
let (_,_) = CasObject::serialize(
172-
&mut buf,
166+
&mut writer,
173167
&key.hash,
174168
contents,
175169
&chunk_boundaries.into_iter().map(|x| x as u32).collect()
176170
)?;
177171

178172
debug!("Upload: POST to {url:?} for {key:?}");
173+
writer.set_position(0);
174+
let data = writer.into_inner();
179175

180-
let response = self.client.post(url).body(buf.buffer().to_vec()).send().await?;
176+
let response = self.client.post(url).body(data).send().await?;
181177
let response_body = response.bytes().await?;
182178
let response_parsed: UploadXorbResponse = serde_json::from_reader(response_body.reader())?;
183179

@@ -194,7 +190,6 @@ impl CASAPIClient {
194190
}
195191

196192
async fn reconstruct<W: Write>(&self, reconstruction_response: QueryReconstructionResponse, writer: &mut W) -> Result<usize> {
197-
198193
let info = reconstruction_response.reconstruction;
199194
let total_len = info.iter().fold(0, |acc, x| acc + x.unpacked_length);
200195
let futs = info.into_iter().map(|term| {
@@ -217,7 +212,7 @@ impl CASAPIClient {
217212

218213
/// Reconstruct the file
219214
async fn reconstruct_file(&self, file_id: &MerkleHash) -> Result<QueryReconstructionResponse> {
220-
let url = Url::parse(&format!("{0}/{1}/reconstruction/{2}", self.scheme, self.endpoint, file_id.hex()))?;
215+
let url = Url::parse(&format!("{}/reconstruction/{}", self.endpoint, file_id.hex()))?;
221216

222217
let response = self.client.get(url).send().await?;
223218
let response_body = response.bytes().await?;
@@ -230,7 +225,7 @@ impl CASAPIClient {
230225
&self,
231226
key: &Key,
232227
) -> Result<QueryChunkResponse> {
233-
let url = Url::parse(&format!("{0}/{1}/chunk/{key}", self.scheme, self.endpoint))?;
228+
let url = Url::parse(&format!("{}/chunk/{key}", self.endpoint))?;
234229
let response = self.client.get(url).send().await?;
235230
let response_body = response.bytes().await?;
236231
let response_parsed: QueryChunkResponse = serde_json::from_reader(response_body.reader())?;
@@ -331,4 +326,4 @@ mod tests {
331326
rng.fill(&mut data[..]);
332327
data
333328
}
334-
}
329+
}

data/src/data_processing.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::path::Path;
2121
use std::sync::Arc;
2222
use tokio::sync::Mutex;
2323
use tracing::error;
24-
use url::Url;
2524

2625
#[derive(Default, Debug)]
2726
pub struct CASDataAggregator {
@@ -330,9 +329,7 @@ impl PointerFileTranslator {
330329
Endpoint::FileSystem(_) => panic!("aaaaaaaa no server"),
331330
};
332331

333-
let url = Url::parse(&endpoint).unwrap();
334-
335-
let rc = CASAPIClient::new(url.scheme(), url.domain().unwrap());
332+
let rc = CASAPIClient::new(&endpoint);
336333

337334
rc.write_file(file_id, writer).await?;
338335

0 commit comments

Comments
 (0)