From e59f8a6d494bc9bb70315e93f487bf66604942d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20H=C3=B6ser?= Date: Sat, 6 Sep 2025 14:42:37 +0200 Subject: [PATCH] feat(client): #54 add verify hash function --- Cargo.lock | 67 ++++++++++++++++++++ Cargo.toml | 4 +- src/client/client_request.rs | 29 +++++---- src/error.rs | 8 +++ src/event/event_types/event.rs | 110 +++++++++++++++++++++++++++++++-- 5 files changed, 201 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09f6f16..a69e67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,6 +205,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bollard" version = "0.19.1" @@ -390,6 +399,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "critical-section" version = "1.2.0" @@ -420,6 +438,16 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "darling" version = "0.20.11" @@ -482,6 +510,16 @@ dependencies = [ "serde", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -575,10 +613,12 @@ dependencies = [ "cloudevents-sdk", "futures", "futures-util", + "hex", "jsonschema", "reqwest", "serde", "serde_json", + "sha2", "testcontainers", "thiserror 2.0.16", "tokio", @@ -772,6 +812,16 @@ dependencies = [ "windows", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -2331,6 +2381,17 @@ dependencies = [ "syn", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2927,6 +2988,12 @@ dependencies = [ "syn", ] +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "ulid" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index e8b01b5..f77b367 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ futures-util = "0.3.31" jsonschema = "0.33.0" reqwest = { version = "0.12.23", features = ["json", "stream"] } serde = { version = "1.0.219", features = ["derive"] } -serde_json = "1.0.143" +serde_json = { version = "1.0.143", features = ["raw_value"] } testcontainers = { version = "0.25.0", features = [ "http_wait", ], optional = true } @@ -37,6 +37,8 @@ tokio-util = { version = "0.7.16", features = ["io"] } tokio-stream = { version = "0.1.17", features = ["io-util"] } typed-builder = "0.21.2" url = "2.5.4" +sha2 = "0.10.9" +hex = "0.4.3" [dev-dependencies] testcontainers = { version = "0.25.0", features = ["http_wait"] } diff --git a/src/client/client_request.rs b/src/client/client_request.rs index a0fbd6f..a1119c3 100644 --- a/src/client/client_request.rs +++ b/src/client/client_request.rs @@ -69,18 +69,13 @@ pub trait OneShotRequest: ClientRequest { /// A line in any json-nd stream coming from the database #[derive(Deserialize, Debug)] -#[serde(tag = "type", content = "payload", rename_all = "camelCase")] +#[serde(untagged)] enum StreamLineItem { - /// An error occured during the request - Error { error: String }, - /// A heardbeat message was sent to keep the connection alive. - /// This is only used when observing events, but it does not hurt to have it everywhere. - Heartbeat(Value), + Predefined(PredefinedStreamLineItem), /// A successful response from the database /// Since the exact type of the payload is not known at this point, we use this as a fallback case. /// Every request item gets put in here and the type can be checked later on. /// The type name checking is only for semantic reasons, as the payload is already parsed as the correct type at this point. - #[serde(untagged)] Ok { #[serde(rename = "type")] ty: String, @@ -88,6 +83,16 @@ enum StreamLineItem { }, } +#[derive(Deserialize, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "camelCase")] +enum PredefinedStreamLineItem { + /// An error occured during the request + Error { error: String }, + /// A heardbeat message was sent to keep the connection alive. + /// This is only used when observing events, but it does not hurt to have it everywhere. + Heartbeat(Value), +} + /// Represents a request to the database that expects a stream of responses pub trait StreamingRequest: ClientRequest { type ItemType: DeserializeOwned; @@ -113,11 +118,13 @@ pub trait StreamingRequest: ClientRequest { .filter_map(|o| async { match o { // An error was passed by the database, so we forward it as an error. - Ok(StreamLineItem::Error { error }) => { - Some(Err(ClientError::DBError(error))) - } + Ok(StreamLineItem::Predefined(PredefinedStreamLineItem::Error { + error, + })) => Some(Err(ClientError::DBError(error))), // A heartbeat message was sent, which we ignore. - Ok(StreamLineItem::Heartbeat(_value)) => None, + Ok(StreamLineItem::Predefined(PredefinedStreamLineItem::Heartbeat( + _value, + ))) => None, // A successful response was sent with the correct type. Ok(StreamLineItem::Ok { payload, ty }) if ty == Self::ITEM_TYPE_NAME => { Some(Ok(payload)) diff --git a/src/error.rs b/src/error.rs index 85b3b9d..4841aa1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -70,4 +70,12 @@ pub enum EventError { #[cfg(feature = "cloudevents")] #[error("The passed cloudevent is invalid")] InvalidCloudevent, + /// Hash verification failed + #[error("Hash verification failed")] + HashVerificationFailed { + /// Expected hash as in the DB + expected: String, + /// Actual hash as computed + actual: String, + }, } diff --git a/src/event/event_types/event.rs b/src/event/event_types/event.rs index 1524d33..f4c6ae0 100644 --- a/src/event/event_types/event.rs +++ b/src/event/event_types/event.rs @@ -1,15 +1,53 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::value::{RawValue, Value}; -use crate::event::{EventCandidate, trace_info::TraceInfo}; +use crate::{ + error::EventError, + event::{EventCandidate, trace_info::TraceInfo}, +}; #[cfg(feature = "cloudevents")] use cloudevents::EventBuilder; +use sha2::{Digest, Sha256}; + +#[derive(Debug, Clone)] +pub struct CustomValue { + parsed: Value, + raw: Box, +} + +impl PartialEq for CustomValue { + fn eq(&self, other: &Self) -> bool { + self.parsed == other.parsed + } +} + +impl Eq for CustomValue {} + +impl<'de> Deserialize<'de> for CustomValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let raw = Box::::deserialize(deserializer)?; + let parsed: Value = serde_json::from_str(raw.get()).map_err(serde::de::Error::custom)?; + Ok(Self { parsed, raw }) + } +} + +impl Serialize for CustomValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.raw.serialize(serializer) + } +} /// Represents an event that has been received from the DB. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Event { - data: Value, + data: CustomValue, datacontenttype: String, hash: String, id: String, @@ -28,7 +66,7 @@ impl Event { /// Get the data of an event. #[must_use] pub fn data(&self) -> &Value { - &self.data + &self.data.parsed } /// Get the data content type of an event. #[must_use] @@ -92,12 +130,74 @@ impl Event { pub fn ty(&self) -> &str { &self.ty } + + /// Verify the hash of an event. + /// + /// ``` + /// use eventsourcingdb::event::EventCandidate; + /// # use serde_json::json; + /// # tokio_test::block_on(async { + /// # let container = eventsourcingdb::container::Container::start_preview().await.unwrap(); + /// let db_url = "http://localhost:3000/"; + /// let api_token = "secrettoken"; + /// # let db_url = container.get_base_url().await.unwrap(); + /// # let api_token = container.get_api_token(); + /// let client = eventsourcingdb::client::Client::new(db_url, api_token); + /// let candidates = vec![ + /// EventCandidate::builder() + /// .source("https://www.eventsourcingdb.io".to_string()) + /// .data(json!({"value": 1})) + /// .subject("/test".to_string()) + /// .ty("io.eventsourcingdb.test".to_string()) + /// .build() + /// ]; + /// let written_events = client.write_events(candidates, vec![]).await.expect("Failed to write events"); + /// let event = &written_events[0]; + /// event.verify_hash().expect("Hash verification failed"); + /// # }) + /// ``` + /// + /// # Errors + /// Returns an error if the hash verification fails. + pub fn verify_hash(&self) -> Result<(), EventError> { + let metadata = format!( + "{}|{}|{}|{}|{}|{}|{}|{}", + self.specversion, + self.id, + self.predecessorhash, + self.time + .to_rfc3339_opts(chrono::SecondsFormat::Nanos, true), + self.source, + self.subject, + self.ty, + self.datacontenttype, + ); + + let metadata_hash = Sha256::digest(metadata.as_bytes()); + let metadata_hash_hex = hex::encode(metadata_hash); + + let data_hash = Sha256::digest(self.data.raw.get()); + let data_hash_hex = hex::encode(data_hash); + + let final_hash_input = format!("{metadata_hash_hex}{data_hash_hex}"); + let final_hash = Sha256::digest(final_hash_input.as_bytes()); + let final_hash_hex = hex::encode(final_hash); + + if final_hash_hex == self.hash { + Ok(()) + } else { + Err(EventError::HashVerificationFailed { + expected: self.hash.clone(), + actual: final_hash_hex, + }) + } + } } impl From for EventCandidate { fn from(event: Event) -> Self { Self { - data: event.data, + data: event.data.parsed, source: event.source, subject: event.subject, ty: event.ty,