Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ futures-util = "0.3.31"
jsonschema = "0.33.0"
reqwest = { version = "0.12.23", features = ["json", "stream"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
serde_json = { version = "1.0.145", features = ["raw_value"] }
testcontainers = { version = "0.25.0", features = [
"http_wait",
], optional = true }
Expand All @@ -37,6 +37,8 @@ tokio-util = { version = "0.7.16", features = ["io"] }
tokio-stream = { version = "0.1.17", features = ["io-util"] }
typed-builder = "0.22.0"
url = "2.5.4"
sha2 = "0.10.9"
hex = "0.4.3"

[dev-dependencies]
testcontainers = { version = "0.25.0", features = ["http_wait"] }
Expand Down
69 changes: 24 additions & 45 deletions src/client/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use read_event_type::ReadEventTypeRequest;
pub use read_events::ReadEventsRequest;
pub use register_event_schema::RegisterEventSchemaRequest;
pub use run_eventql_query::RunEventqlQueryRequest;
use serde_json::Value;
use serde_json::value::RawValue;
pub use verify_api_token::VerifyApiTokenRequest;
pub use write_events::WriteEventsRequest;

Expand Down Expand Up @@ -67,25 +67,13 @@ pub trait OneShotRequest: ClientRequest {
}
}

/// A line in any json-nd stream coming from the database
/// A line in a json-nd stream coming from the database
/// The body is parsed as a [`RawValue`], because some of the types need the raw string for internal usage.
#[derive(Deserialize, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
enum StreamLineItem<T> {
/// An error occured during the request
Error { error: String },
/// A heardbeat message was sent to keep the connection alive.
/// This is only used when observing events, but it does not hurt to have it everywhere.
Heartbeat(Value),
/// A successful response from the database
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
/// Every request item gets put in here and the type can be checked later on.
/// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point.
#[serde(untagged)]
Ok {
#[serde(rename = "type")]
ty: String,
payload: T,
},
struct StreamLineItem {
#[serde(rename = "type")]
ty: String,
payload: Box<RawValue>,
}

/// Represents a request to the database that expects a stream of responses
Expand All @@ -98,34 +86,25 @@ pub trait StreamingRequest: ClientRequest {
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
Box::pin(
Self::lines_stream(response)
.map(|maybe_line| {
let line = maybe_line?;
// This line does the heavy lifting of parsing the json-nd line into the correct type.
// There's some Rust typesystem glory involved here, so let's break it down:
// First of all `serde_json::from_str` is used to parse any json `&str` into the type we want to have (in this case a `StreamLineItem`).
// `StreamLineItem` in turn is generic over `Self::ItemType`, which is the type that is expected by the exact response implementation and can change.
// This means, that this will throw an error if the line is invalid json or the string cannot be parsed into an error, heartbeat or the expected type.
// Because of this, we can guarantee after this line, that the payload of the `StreamLineItem` is of the correct type and no further checks are needed.
Ok(serde_json::from_str::<StreamLineItem<Self::ItemType>>(
line.as_str(),
)?)
})
.map(|line| Ok(serde_json::from_str::<StreamLineItem>(line?.as_str())?))
.filter_map(|o| async {
match o {
// An error was passed by the database, so we forward it as an error.
Ok(StreamLineItem::Error { error }) => {
Some(Err(ClientError::DBError(error)))
}
// A heartbeat message was sent, which we ignore.
Ok(StreamLineItem::Heartbeat(_value)) => None,
// A successful response was sent with the correct type.
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
Some(Ok(payload))
}
// A successful response was sent, but the type does not match the expected type.
Ok(StreamLineItem::Ok { ty, .. }) => {
Some(Err(ClientError::InvalidResponseType(ty)))
}
// A line was successfully parsed.
Ok(StreamLineItem { payload, ty }) => match ty.as_str() {
// This is the expected type, so we try to parse it.
ty if ty == Self::ITEM_TYPE_NAME => {
Some(serde_json::from_str(payload.get()).map_err(ClientError::from))
}
// Forward Errors from the DB as DBErrors.
"error" => Some(Err(ClientError::DBError(payload.get().to_string()))),
// Ignore heartbeat messages.
"heartbeat" => None,
other => Some(Err(ClientError::InvalidResponseType(format!(
"Expected type {}, but got {}",
Self::ITEM_TYPE_NAME,
other
)))),
},
// An error occured while parsing the line, which we forward as an error.
Err(e) => Some(Err(e)),
}
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,15 @@ pub enum EventError {
#[cfg(feature = "cloudevents")]
#[error("The passed cloudevent is invalid")]
InvalidCloudevent,
/// Hash verification failed
#[error("Hash verification failed")]
HashVerificationFailed {
/// Expected hash as in the DB
expected: String,
/// Actual hash as computed
actual: String,
},
/// Serde error
#[error("Serde error: {0}")]
SerdeError(#[from] serde_json::Error),
}
Loading