Skip to content

Commit e59f8a6

Browse files
committed
feat(client): #54 add verify hash function
1 parent 42da0e6 commit e59f8a6

File tree

5 files changed

+201
-17
lines changed

5 files changed

+201
-17
lines changed

Cargo.lock

Lines changed: 67 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ futures-util = "0.3.31"
2727
jsonschema = "0.33.0"
2828
reqwest = { version = "0.12.23", features = ["json", "stream"] }
2929
serde = { version = "1.0.219", features = ["derive"] }
30-
serde_json = "1.0.143"
30+
serde_json = { version = "1.0.143", features = ["raw_value"] }
3131
testcontainers = { version = "0.25.0", features = [
3232
"http_wait",
3333
], optional = true }
@@ -37,6 +37,8 @@ tokio-util = { version = "0.7.16", features = ["io"] }
3737
tokio-stream = { version = "0.1.17", features = ["io-util"] }
3838
typed-builder = "0.21.2"
3939
url = "2.5.4"
40+
sha2 = "0.10.9"
41+
hex = "0.4.3"
4042

4143
[dev-dependencies]
4244
testcontainers = { version = "0.25.0", features = ["http_wait"] }

src/client/client_request.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,25 +69,30 @@ pub trait OneShotRequest: ClientRequest {
6969

7070
/// A line in any json-nd stream coming from the database
7171
#[derive(Deserialize, Debug)]
72-
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
72+
#[serde(untagged)]
7373
enum StreamLineItem<T> {
74-
/// An error occured during the request
75-
Error { error: String },
76-
/// A heardbeat message was sent to keep the connection alive.
77-
/// This is only used when observing events, but it does not hurt to have it everywhere.
78-
Heartbeat(Value),
74+
Predefined(PredefinedStreamLineItem),
7975
/// A successful response from the database
8076
/// Since the exact type of the payload is not known at this point, we use this as a fallback case.
8177
/// Every request item gets put in here and the type can be checked later on.
8278
/// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point.
83-
#[serde(untagged)]
8479
Ok {
8580
#[serde(rename = "type")]
8681
ty: String,
8782
payload: T,
8883
},
8984
}
9085

86+
#[derive(Deserialize, Debug)]
87+
#[serde(tag = "type", content = "payload", rename_all = "camelCase")]
88+
enum PredefinedStreamLineItem {
89+
/// An error occured during the request
90+
Error { error: String },
91+
/// A heardbeat message was sent to keep the connection alive.
92+
/// This is only used when observing events, but it does not hurt to have it everywhere.
93+
Heartbeat(Value),
94+
}
95+
9196
/// Represents a request to the database that expects a stream of responses
9297
pub trait StreamingRequest: ClientRequest {
9398
type ItemType: DeserializeOwned;
@@ -113,11 +118,13 @@ pub trait StreamingRequest: ClientRequest {
113118
.filter_map(|o| async {
114119
match o {
115120
// An error was passed by the database, so we forward it as an error.
116-
Ok(StreamLineItem::Error { error }) => {
117-
Some(Err(ClientError::DBError(error)))
118-
}
121+
Ok(StreamLineItem::Predefined(PredefinedStreamLineItem::Error {
122+
error,
123+
})) => Some(Err(ClientError::DBError(error))),
119124
// A heartbeat message was sent, which we ignore.
120-
Ok(StreamLineItem::Heartbeat(_value)) => None,
125+
Ok(StreamLineItem::Predefined(PredefinedStreamLineItem::Heartbeat(
126+
_value,
127+
))) => None,
121128
// A successful response was sent with the correct type.
122129
Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => {
123130
Some(Ok(payload))

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,12 @@ pub enum EventError {
7070
#[cfg(feature = "cloudevents")]
7171
#[error("The passed cloudevent is invalid")]
7272
InvalidCloudevent,
73+
/// Hash verification failed
74+
#[error("Hash verification failed")]
75+
HashVerificationFailed {
76+
/// Expected hash as in the DB
77+
expected: String,
78+
/// Actual hash as computed
79+
actual: String,
80+
},
7381
}

src/event/event_types/event.rs

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,53 @@
11
use chrono::{DateTime, Utc};
22
use serde::{Deserialize, Serialize};
3-
use serde_json::Value;
3+
use serde_json::value::{RawValue, Value};
44

5-
use crate::event::{EventCandidate, trace_info::TraceInfo};
5+
use crate::{
6+
error::EventError,
7+
event::{EventCandidate, trace_info::TraceInfo},
8+
};
69
#[cfg(feature = "cloudevents")]
710
use cloudevents::EventBuilder;
11+
use sha2::{Digest, Sha256};
12+
13+
#[derive(Debug, Clone)]
14+
pub struct CustomValue {
15+
parsed: Value,
16+
raw: Box<RawValue>,
17+
}
18+
19+
impl PartialEq for CustomValue {
20+
fn eq(&self, other: &Self) -> bool {
21+
self.parsed == other.parsed
22+
}
23+
}
24+
25+
impl Eq for CustomValue {}
26+
27+
impl<'de> Deserialize<'de> for CustomValue {
28+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
29+
where
30+
D: serde::Deserializer<'de>,
31+
{
32+
let raw = Box::<RawValue>::deserialize(deserializer)?;
33+
let parsed: Value = serde_json::from_str(raw.get()).map_err(serde::de::Error::custom)?;
34+
Ok(Self { parsed, raw })
35+
}
36+
}
37+
38+
impl Serialize for CustomValue {
39+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40+
where
41+
S: serde::Serializer,
42+
{
43+
self.raw.serialize(serializer)
44+
}
45+
}
846

947
/// Represents an event that has been received from the DB.
1048
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1149
pub struct Event {
12-
data: Value,
50+
data: CustomValue,
1351
datacontenttype: String,
1452
hash: String,
1553
id: String,
@@ -28,7 +66,7 @@ impl Event {
2866
/// Get the data of an event.
2967
#[must_use]
3068
pub fn data(&self) -> &Value {
31-
&self.data
69+
&self.data.parsed
3270
}
3371
/// Get the data content type of an event.
3472
#[must_use]
@@ -92,12 +130,74 @@ impl Event {
92130
pub fn ty(&self) -> &str {
93131
&self.ty
94132
}
133+
134+
/// Verify the hash of an event.
135+
///
136+
/// ```
137+
/// use eventsourcingdb::event::EventCandidate;
138+
/// # use serde_json::json;
139+
/// # tokio_test::block_on(async {
140+
/// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap();
141+
/// let db_url = "http://localhost:3000/";
142+
/// let api_token = "secrettoken";
143+
/// # let db_url = container.get_base_url().await.unwrap();
144+
/// # let api_token = container.get_api_token();
145+
/// let client = eventsourcingdb::client::Client::new(db_url, api_token);
146+
/// let candidates = vec![
147+
/// EventCandidate::builder()
148+
/// .source("https://www.eventsourcingdb.io".to_string())
149+
/// .data(json!({"value": 1}))
150+
/// .subject("/test".to_string())
151+
/// .ty("io.eventsourcingdb.test".to_string())
152+
/// .build()
153+
/// ];
154+
/// let written_events = client.write_events(candidates, vec![]).await.expect("Failed to write events");
155+
/// let event = &written_events[0];
156+
/// event.verify_hash().expect("Hash verification failed");
157+
/// # })
158+
/// ```
159+
///
160+
/// # Errors
161+
/// Returns an error if the hash verification fails.
162+
pub fn verify_hash(&self) -> Result<(), EventError> {
163+
let metadata = format!(
164+
"{}|{}|{}|{}|{}|{}|{}|{}",
165+
self.specversion,
166+
self.id,
167+
self.predecessorhash,
168+
self.time
169+
.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
170+
self.source,
171+
self.subject,
172+
self.ty,
173+
self.datacontenttype,
174+
);
175+
176+
let metadata_hash = Sha256::digest(metadata.as_bytes());
177+
let metadata_hash_hex = hex::encode(metadata_hash);
178+
179+
let data_hash = Sha256::digest(self.data.raw.get());
180+
let data_hash_hex = hex::encode(data_hash);
181+
182+
let final_hash_input = format!("{metadata_hash_hex}{data_hash_hex}");
183+
let final_hash = Sha256::digest(final_hash_input.as_bytes());
184+
let final_hash_hex = hex::encode(final_hash);
185+
186+
if final_hash_hex == self.hash {
187+
Ok(())
188+
} else {
189+
Err(EventError::HashVerificationFailed {
190+
expected: self.hash.clone(),
191+
actual: final_hash_hex,
192+
})
193+
}
194+
}
95195
}
96196

97197
impl From<Event> for EventCandidate {
98198
fn from(event: Event) -> Self {
99199
Self {
100-
data: event.data,
200+
data: event.data.parsed,
101201
source: event.source,
102202
subject: event.subject,
103203
ty: event.ty,

0 commit comments

Comments
 (0)