Skip to content

Commit dcc83e1

Browse files
authored
fix create stream error response (#502)
fixes #487
1 parent fdd9ab7 commit dcc83e1

File tree

2 files changed

+40
-27
lines changed

2 files changed

+40
-27
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KE
3333
use crate::metadata::STREAM_INFO;
3434
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3535

36+
use super::logstream::error::CreateStreamError;
37+
3638
// Handler for POST /api/v1/ingest
3739
// ingests events by extracting stream name from header
3840
// creates if stream does not exist
@@ -43,9 +45,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
4345
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
4446
{
4547
let stream_name = stream_name.to_str().unwrap().to_owned();
46-
if let Err(e) = super::logstream::create_stream_if_not_exists(&stream_name).await {
47-
return Err(PostError::CreateStream(e.into()));
48-
}
48+
create_stream_if_not_exists(&stream_name).await?;
4949
push_logs(stream_name, req, body).await?;
5050
Ok(HttpResponse::Ok().finish())
5151
} else {
@@ -104,6 +104,15 @@ fn into_event_batch(
104104
Ok((size, rb, is_first))
105105
}
106106

107+
// Check if the stream exists and create a new stream if doesn't exist
108+
pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostError> {
109+
if STREAM_INFO.stream_exists(stream_name) {
110+
return Ok(());
111+
}
112+
super::logstream::create_stream(stream_name.to_string()).await?;
113+
Ok(())
114+
}
115+
107116
#[derive(Debug, thiserror::Error)]
108117
pub enum PostError {
109118
#[error("Stream {0} not found")]
@@ -116,8 +125,8 @@ pub enum PostError {
116125
Event(#[from] EventError),
117126
#[error("Invalid Request: {0}")]
118127
Invalid(#[from] anyhow::Error),
119-
#[error("Failed to create stream due to {0}")]
120-
CreateStream(Box<dyn std::error::Error + Send + Sync>),
128+
#[error("{0}")]
129+
CreateStream(#[from] CreateStreamError),
121130
}
122131

123132
impl actix_web::ResponseError for PostError {
@@ -127,6 +136,9 @@ impl actix_web::ResponseError for PostError {
127136
PostError::Header(_) => StatusCode::BAD_REQUEST,
128137
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
129138
PostError::Invalid(_) => StatusCode::BAD_REQUEST,
139+
PostError::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
140+
StatusCode::BAD_REQUEST
141+
}
130142
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
131143
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
132144
}

server/src/handlers/http/logstream.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,16 @@ use crate::storage::{LogStream, StorageDir};
3131
use crate::{event, stats};
3232
use crate::{metadata, validator};
3333

34-
use self::error::StreamError;
34+
use self::error::{CreateStreamError, StreamError};
3535

3636
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
3737
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
38-
validator::stream_name(&stream_name)?;
39-
40-
let objectstore = CONFIG.storage().get_object_store();
4138

4239
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
4340
return Err(StreamError::StreamNotFound(stream_name));
4441
}
4542

43+
let objectstore = CONFIG.storage().get_object_store();
4644
objectstore.delete_stream(&stream_name).await?;
4745
metadata::STREAM_INFO.delete_stream(&stream_name);
4846
event::STREAM_WRITERS.delete_stream(&stream_name);
@@ -269,27 +267,14 @@ fn remove_id_from_alerts(value: &mut Value) {
269267
}
270268
}
271269

272-
// Check if the stream exists and create a new stream if doesn't exist
273-
pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), StreamError> {
274-
if metadata::STREAM_INFO.stream_exists(stream_name) {
275-
return Ok(());
276-
}
277-
278-
create_stream(stream_name.to_string()).await
279-
}
280-
281-
pub async fn create_stream(stream_name: String) -> Result<(), StreamError> {
270+
pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> {
282271
// fail to proceed if invalid stream name
283272
validator::stream_name(&stream_name)?;
284273

285274
// Proceed to create log stream if it doesn't exist
286275
let storage = CONFIG.storage().get_object_store();
287-
if let Err(e) = storage.create_stream(&stream_name).await {
288-
// Fail if unable to create log stream on object store backend
289-
return Err(StreamError::Custom {
290-
msg: format!("failed to create log stream {stream_name} due to err: {e}"),
291-
status: StatusCode::INTERNAL_SERVER_ERROR,
292-
});
276+
if let Err(err) = storage.create_stream(&stream_name).await {
277+
return Err(CreateStreamError::Storage { stream_name, err });
293278
}
294279
metadata::STREAM_INFO.add_stream(stream_name.to_string());
295280

@@ -308,9 +293,20 @@ pub mod error {
308293
};
309294

310295
#[derive(Debug, thiserror::Error)]
311-
pub enum StreamError {
296+
pub enum CreateStreamError {
312297
#[error("Stream name validation failed due to {0}")]
313298
StreamNameValidation(#[from] StreamNameValidationError),
299+
#[error("failed to create log stream {stream_name} due to err: {err}")]
300+
Storage {
301+
stream_name: String,
302+
err: ObjectStorageError,
303+
},
304+
}
305+
306+
#[derive(Debug, thiserror::Error)]
307+
pub enum StreamError {
308+
#[error("{0}")]
309+
CreateStream(#[from] CreateStreamError),
314310
#[error("Log stream {0} does not exist")]
315311
StreamNotFound(String),
316312
#[error("Log stream is not initialized, send an event to this logstream and try again")]
@@ -341,7 +337,12 @@ pub mod error {
341337
impl actix_web::ResponseError for StreamError {
342338
fn status_code(&self) -> http::StatusCode {
343339
match self {
344-
StreamError::StreamNameValidation(_) => StatusCode::BAD_REQUEST,
340+
StreamError::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
341+
StatusCode::BAD_REQUEST
342+
}
343+
StreamError::CreateStream(CreateStreamError::Storage { .. }) => {
344+
StatusCode::INTERNAL_SERVER_ERROR
345+
}
345346
StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
346347
StreamError::Custom { status, .. } => *status,
347348
StreamError::UninitializedLogstream => StatusCode::METHOD_NOT_ALLOWED,

0 commit comments

Comments
 (0)