Skip to content

Commit 80c774c

Browse files
committed
chore(client): add comments to stream parsing to make complex parts easier to understand especially for new rustaceans
Signed-off-by: Raphael Höser <[email protected]>
1 parent 0b06d1c commit 80c774c

File tree

1 file changed

+41
-24
lines changed

1 file changed

+41
-24
lines changed

src/client/client_request.rs

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,20 @@ pub trait OneShotRequest: ClientRequest {
6161
Ok(())
6262
}
6363
}
64+
65+
/// A line in any json-nd stream coming from the database
6466
#[derive(Deserialize, Debug)]
6567
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
6668
enum StreamLineItem<T> {
67-
Error {
68-
error: String,
69-
},
69+
/// An error occured during the request
70+
Error { error: String },
71+
/// A heardbeat message was sent to keep the connection alive.
72+
/// This is only used when observing events, but it does not hurt to have it everywhere.
7073
Heartbeat,
74+
/// A successful response from the database
75+
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
76+
/// Every request item gets put in here and the type can be checked later on.
77+
/// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point.
7178
#[serde(untagged)]
7279
Ok {
7380
#[serde(rename = "type")]
@@ -76,22 +83,6 @@ enum StreamLineItem<T> {
7683
},
7784
}
7885

79-
impl<T> StreamLineItem<T> {
80-
pub fn into_result_option(self, expected_type: &str) -> Result<Option<T>, ClientError> {
81-
match self {
82-
StreamLineItem::Error { error } => Err(ClientError::DBError(error)),
83-
StreamLineItem::Heartbeat => Ok(None),
84-
StreamLineItem::Ok { ty, payload } => {
85-
if ty == expected_type {
86-
Ok(Some(payload))
87-
} else {
88-
Err(ClientError::InvalidResponseType(ty))
89-
}
90-
}
91-
}
92-
}
93-
}
94-
9586
/// Represents a request to the database that expects a stream of responses
9687
pub trait StreamingRequest: ClientRequest {
9788
type ItemType: DeserializeOwned;
@@ -102,12 +93,38 @@ pub trait StreamingRequest: ClientRequest {
10293
) -> impl Stream<Item = Result<Self::ItemType, ClientError>> {
10394
Box::pin(
10495
Self::lines_stream(response)
105-
.map(|line| {
106-
let line = line?;
107-
let item: StreamLineItem<Self::ItemType> = serde_json::from_str(line.as_str())?;
108-
item.into_result_option(Self::ITEM_TYPE_NAME)
96+
.map(|maybe_line| {
97+
let line = maybe_line?;
98+
// This line does the heavy lifting of parsing the json-nd line into the correct type.
99+
// There's some Rust typesystem glory involved here, so let's break it down:
100+
// First of all `serde_json::from_str` is used to parse any json `&str` into the type we want to have (in this case a `StreamLineItem`).
101+
// `StreamLineItem` in turn is generic over `Self::ItemType`, which is the type that is expected by the exact response implementation and can change.
102+
// This means, that this will throw an error if the line is invalid json or the string cannot be parsed into an error, heartbeat or the expected type.
103+
// Because of this, we can guarantee after this line, that the payload of the `StreamLineItem` is of the correct type and no further checks are needed.
104+
Ok(serde_json::from_str::<StreamLineItem<Self::ItemType>>(
105+
line.as_str(),
106+
)?)
109107
})
110-
.filter_map(|o| async { o.transpose() }),
108+
.filter_map(|o| async {
109+
match o {
110+
// An error was passed by the database, so we forward it as an error.
111+
Ok(StreamLineItem::Error { error }) => {
112+
Some(Err(ClientError::DBError(error)))
113+
}
114+
// A heartbeat message was sent, which we ignore.
115+
Ok(StreamLineItem::Heartbeat) => None,
116+
// A successful response was sent with the correct type.
117+
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
118+
Some(Ok(payload))
119+
}
120+
// A successful response was sent, but the type does not match the expected type.
121+
Ok(StreamLineItem::Ok { ty, .. }) => {
122+
Some(Err(ClientError::InvalidResponseType(ty)))
123+
}
124+
// An error occured while parsing the line, which we forward as an error.
125+
Err(e) => Some(Err(e)),
126+
}
127+
}),
111128
)
112129
}
113130

0 commit comments

Comments
 (0)