Skip to content

Commit 73ce04b

Browse files
authored
Introduce a new error type for ObjectStorage api (#45)
Create a separate error type for all the methods defined by ObjectStorage. This ObjectStorageError should contain those failure variants over which we want to handle error in future. AwsSdkError and SdkError in AWS S3 crate covers all the variants for failure when calling S3 APIs but parseable needs to have its own Error type that can be used for error handling in the application itself. Added ObjectStorageError type which contains some variants through which we can tell what went wrong when using any of trait methods on S3 object. Though the error variants need to expand in future so that more complex cases could be handled correctly. Fixes #42
1 parent bcc2dd0 commit 73ce04b

File tree

8 files changed

+144
-68
lines changed

8 files changed

+144
-68
lines changed

server/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub enum Error {
3333
#[error("JSON provided to query api doesn't contain {0}")]
3434
JsonQuery(&'static str),
3535
#[error("Storage error: {0}")]
36-
Storage(Box<dyn ObjectStorageError>),
36+
Storage(ObjectStorageError),
3737
#[error("Event error: {0}")]
3838
Event(#[from] EventError),
3939
#[error("Parquet error: {0}")]

server/src/handlers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub async fn liveness() -> HttpResponse {
3737
}
3838

3939
pub async fn readiness() -> HttpResponse {
40-
if S3::new().is_available().await {
40+
if let Ok(()) = S3::new().check().await {
4141
return HttpResponse::new(StatusCode::OK);
4242
}
4343

server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ async fn main() -> anyhow::Result<()> {
5959
CONFIG.print();
6060
CONFIG.validate();
6161
let storage = S3::new();
62+
CONFIG.validate_storage(&storage).await;
6263
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
6364
warn!("could not populate local metadata. {:?}", e);
6465
}

server/src/metadata.rs

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use bytes::Bytes;
2020
use lazy_static::lazy_static;
21-
use log::warn;
2221
use serde::{Deserialize, Serialize};
2322
use std::collections::HashMap;
2423
use std::sync::RwLock;
@@ -126,17 +125,28 @@ impl STREAM_INFO {
126125
// to load the stream metadata based on whatever is available.
127126
//
128127
// TODO: ignore failure(s) if any and skip to next stream
129-
let alert_config = parse_string(storage.get_alert(&stream.name).await)
130-
.map_err(|_| Error::AlertNotInStore(stream.name.to_owned()))?;
131-
let schema = parse_string(storage.get_schema(&stream.name).await)
132-
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?;
128+
let alert_config = storage
129+
.get_alert(&stream.name)
130+
.await
131+
.map_err(|e| e.into())
132+
.and_then(parse_string)
133+
.map_err(|_| Error::AlertNotInStore(stream.name.to_owned()));
134+
135+
let schema = storage
136+
.get_schema(&stream.name)
137+
.await
138+
.map_err(|e| e.into())
139+
.and_then(parse_string)
140+
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()));
141+
133142
let metadata = LogStreamMetadata {
134-
schema,
135-
alert_config,
143+
schema: schema.unwrap_or_default(),
144+
alert_config: alert_config.unwrap_or_default(),
136145
..Default::default()
137146
};
147+
138148
let mut map = self.write().unwrap();
139-
map.insert(stream.name.to_owned(), metadata);
149+
map.insert(stream.name.clone(), metadata);
140150
}
141151

142152
Ok(())
@@ -159,21 +169,8 @@ impl STREAM_INFO {
159169
}
160170
}
161171

162-
fn parse_string(result: Result<Bytes, Error>) -> Result<String, Error> {
163-
let mut string = String::new();
164-
let bytes = match result {
165-
Ok(bytes) => bytes,
166-
Err(e) => {
167-
warn!("Storage error: {}", e);
168-
return Ok(string);
169-
}
170-
};
171-
172-
if !bytes.is_empty() {
173-
string = String::from_utf8(bytes.to_vec())?;
174-
}
175-
176-
Ok(string)
172+
fn parse_string(bytes: Bytes) -> Result<String, Error> {
173+
String::from_utf8(bytes.to_vec()).map_err(|e| e.into())
177174
}
178175

179176
#[cfg(test)]
@@ -214,14 +211,14 @@ mod tests {
214211
#[case::empty_string("")]
215212
fn test_parse_string(#[case] string: String) {
216213
let bytes = Bytes::from(string);
217-
assert!(parse_string(Ok(bytes)).is_ok())
214+
assert!(parse_string(bytes).is_ok())
218215
}
219216

220217
#[test]
221218
fn test_bad_parse_string() {
222219
let bad: Vec<u8> = vec![195, 40];
223220
let bytes = Bytes::from(bad);
224-
assert!(parse_string(Ok(bytes)).is_err());
221+
assert!(parse_string(bytes).is_err());
225222
}
226223

227224
#[rstest]

server/src/option.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use structopt::StructOpt;
2323

2424
use crate::banner;
2525
use crate::s3::S3Config;
26+
use crate::storage::{ObjectStorage, ObjectStorageError};
2627

2728
lazy_static::lazy_static! {
2829
#[derive(Debug)]
@@ -72,6 +73,23 @@ impl Config {
7273
}
7374
}
7475

76+
pub async fn validate_storage(&self, storage: &impl ObjectStorage) {
77+
match storage.check().await {
78+
Ok(_) => (),
79+
Err(ObjectStorageError::NoSuchBucket(name)) => panic!(
80+
"Could not start because the bucket doesn't exist. Please ensure bucket {bucket} exists on {url}",
81+
bucket = name,
82+
url = self.storage.endpoint_url()
83+
),
84+
Err(ObjectStorageError::ConnectionError(inner)) => panic!(
85+
"Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}",
86+
url = self.storage.endpoint_url(),
87+
cause = inner
88+
),
89+
Err(error) => { panic!("{error}") }
90+
}
91+
}
92+
7593
fn status_info(&self, scheme: &str) {
7694
let url = format!("{}://{}", scheme, CONFIG.parseable.address).underlined();
7795
eprintln!(

server/src/s3.rs

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use async_trait::async_trait;
2+
use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind};
23
use aws_sdk_s3::model::{Delete, ObjectIdentifier};
3-
use aws_sdk_s3::types::ByteStream;
4+
use aws_sdk_s3::types::{ByteStream, SdkError};
45
use aws_sdk_s3::Error as AwsSdkError;
56
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
67
use aws_types::credentials::SharedCredentialsProvider;
@@ -18,7 +19,6 @@ use std::sync::Arc;
1819
use structopt::StructOpt;
1920
use tokio_stream::StreamExt;
2021

21-
use crate::error::Error;
2222
use crate::metadata::Stats;
2323
use crate::option::{StorageOpt, CONFIG};
2424
use crate::query::Query;
@@ -96,14 +96,6 @@ impl StorageOpt for S3Config {
9696
}
9797
}
9898

99-
impl ObjectStorageError for AwsSdkError {}
100-
101-
impl From<AwsSdkError> for Error {
102-
fn from(e: AwsSdkError) -> Self {
103-
Self::Storage(Box::new(e))
104-
}
105-
}
106-
10799
struct S3Options {
108100
endpoint: Endpoint,
109101
region: Region,
@@ -304,70 +296,83 @@ impl S3 {
304296

305297
#[async_trait]
306298
impl ObjectStorage for S3 {
307-
async fn is_available(&self) -> bool {
299+
async fn check(&self) -> Result<(), ObjectStorageError> {
308300
self.client
309301
.head_bucket()
310302
.bucket(&S3_CONFIG.s3_bucket_name)
311303
.send()
312304
.await
313-
.is_ok()
305+
.map(|_| ())
306+
.map_err(|err| err.into())
314307
}
315308

316-
async fn put_schema(&self, stream_name: String, body: String) -> Result<(), Error> {
309+
async fn put_schema(
310+
&self,
311+
stream_name: String,
312+
body: String,
313+
) -> Result<(), ObjectStorageError> {
317314
self._put_schema(stream_name, body).await?;
318315

319316
Ok(())
320317
}
321318

322-
async fn create_stream(&self, stream_name: &str) -> Result<(), Error> {
319+
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
323320
self._create_stream(stream_name).await?;
324321

325322
Ok(())
326323
}
327324

328-
async fn delete_stream(&self, stream_name: &str) -> Result<(), Error> {
325+
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
329326
self._delete_stream(stream_name).await?;
330327

331328
Ok(())
332329
}
333330

334-
async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error> {
331+
async fn create_alert(
332+
&self,
333+
stream_name: &str,
334+
body: String,
335+
) -> Result<(), ObjectStorageError> {
335336
self._create_alert(stream_name, body).await?;
336337

337338
Ok(())
338339
}
339340

340-
async fn get_schema(&self, stream_name: &str) -> Result<Bytes, Error> {
341+
async fn get_schema(&self, stream_name: &str) -> Result<Bytes, ObjectStorageError> {
341342
let body_bytes = self._get_schema(stream_name).await?;
342343

343344
Ok(body_bytes)
344345
}
345346

346-
async fn get_alert(&self, stream_name: &str) -> Result<Bytes, Error> {
347+
async fn get_alert(&self, stream_name: &str) -> Result<Bytes, ObjectStorageError> {
347348
let body_bytes = self._alert_exists(stream_name).await?;
348349

349350
Ok(body_bytes)
350351
}
351352

352-
async fn get_stats(&self, stream_name: &str) -> Result<Stats, Error> {
353+
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
353354
let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?;
354355

355356
Ok(stats)
356357
}
357358

358-
async fn list_streams(&self) -> Result<Vec<LogStream>, Error> {
359+
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
359360
let streams = self._list_streams().await?;
360361

361362
Ok(streams)
362363
}
363364

364-
async fn upload_file(&self, key: &str, path: &str) -> Result<(), Error> {
365+
async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError> {
365366
self._upload_file(key, path).await?;
366367

367368
Ok(())
368369
}
369370

370-
async fn query(&self, query: &Query, results: &mut Vec<RecordBatch>) -> Result<(), Error> {
371+
async fn query(
372+
&self,
373+
query: &Query,
374+
results: &mut Vec<RecordBatch>,
375+
) -> Result<(), ObjectStorageError> {
371376
let s3_file_system = Arc::new(
372377
S3FileSystem::new(
373378
Some(SharedCredentialsProvider::new(self.options.creds.clone())),
@@ -397,9 +402,39 @@ impl ObjectStorage for S3 {
397402

398403
// execute the query and collect results
399404
let df = ctx.sql(query.query.as_str()).await?;
400-
results.extend(df.collect().await.map_err(Error::DataFusion)?);
405+
results.extend(df.collect().await?);
401406
}
402407

403408
Ok(())
404409
}
405410
}
411+
412+
impl From<AwsSdkError> for ObjectStorageError {
413+
fn from(error: AwsSdkError) -> Self {
414+
ObjectStorageError::UnhandledError(error.into())
415+
}
416+
}
417+
418+
impl From<SdkError<HeadBucketError>> for ObjectStorageError {
419+
fn from(error: SdkError<HeadBucketError>) -> Self {
420+
match error {
421+
SdkError::ServiceError {
422+
err:
423+
HeadBucketError {
424+
kind: HeadBucketErrorKind::NotFound(_),
425+
..
426+
},
427+
..
428+
} => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()),
429+
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()),
430+
SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err),
431+
err => ObjectStorageError::UnhandledError(err.into()),
432+
}
433+
}
434+
}
435+
436+
impl From<serde_json::Error> for ObjectStorageError {
437+
fn from(error: serde_json::Error) -> Self {
438+
ObjectStorageError::UnhandledError(error.into())
439+
}
440+
}

0 commit comments

Comments
 (0)