Skip to content

Commit f26d76f

Browse files
committed
fix query
1 parent e2331a8 commit f26d76f

File tree

8 files changed

+189
-57
lines changed

8 files changed

+189
-57
lines changed

.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
DATABASE_URL=postgres://postgres:postgres@localhost:5432/event_store
1+
DATABASE_URL=postgres://postgres:postgres@localhost:5432/postgres

disintegrate-postgres/src/event_store.rs

Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,9 @@ where
106106
<QE as TryFrom<E>>::Error: StdError + Send + Sync + 'static,
107107
{
108108
let sql = format!(
109-
r#"
110-
WITH epoch AS (
111-
SELECT MAX(event_id) as __epoch FROM event
112-
)
113-
SELECT event_id, payload, __epoch
114-
FROM event CROSS JOIN epoch WHERE ({criteria}) AND event_id <= __epoch
109+
r#"SELECT event.event_id, event.payload, epoch.__epoch_id
110+
FROM (SELECT MAX(event_id) AS __epoch_id FROM event) AS epoch
111+
LEFT JOIN event ON event.event_id <= epoch.__epoch_id AND ({criteria})
115112
ORDER BY event_id ASC"#,
116113
criteria = CriteriaBuilder::new(query).build()
117114
);
@@ -121,13 +118,15 @@ where
121118
let mut epoch_id: PgEventId = 0;
122119
while let Some(row) = rows.next().await {
123120
let row = row?;
124-
let event_id = row.get(0);
121+
let event_id: Option<i64> = row.get(0);
125122
epoch_id = row.get(2);
126-
let payload = self.serde.deserialize(row.get(1))?;
127-
let payload: QE = payload
123+
if let Some(event_id) = event_id {
124+
let payload = self.serde.deserialize(row.get(1))?;
125+
let payload: QE = payload
128126
.try_into()
129127
.map_err(|e| Error::QueryEventMapping(Box::new(e)))?;
130-
yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
128+
yield Ok(StreamItem::Event(PersistedEvent::new(event_id, payload)));
129+
}
131130
}
132131
yield Ok(StreamItem::End(epoch_id));
133132
}
@@ -176,20 +175,18 @@ where
176175

177176
/// Appends new events to the event store.
178177
///
179-
/// This function inserts the provided `events` into the PostgreSQL event store by performing
180-
/// two separate inserts. First, it inserts the events into the `event_sequence` table to reclaim
181-
/// a set of IDs for the events. Then, it inserts the events into the `event` table along with
182-
/// their IDs, event types, domain identifiers, and payloads. Finally, it marks the event IDs as `consumed`
183-
/// in the event sequence table. If marking the event IDs as consumed fails (e.g., another process has already consumed the IDs),
184-
/// a conflict error is raised. This conflict indicates that the data retrieved by the query is stale,
185-
/// meaning that the events generated are no longer valid due to being generated from an old version
186-
/// of the event store.
178+
/// This function inserts the provided `events` into the PostgreSQL-backed event store.
179+
/// Before inserting, it queries the `event` table to ensure that no events have been
180+
/// appended since the given `version`. If newer events are found, a concurrency error
181+
/// is returned to prevent invalid state transitions.
182+
///
183+
/// If the concurrency check succeeds, the events are inserted into the `event` table.
187184
///
188185
/// # Arguments
189186
///
190-
/// * `events` - A vector of events to be appended.
191-
/// * `query` - The stream query specifying the criteria for filtering events.
192-
/// * `version` - The ID of the last consumed event.
187+
/// * `events` - The events to append to the event store.
188+
/// * `query` - The stream query that identifies the target event stream.
189+
/// * `version` - The ID of the last consumed event, used for optimistic concurrency control.
193190
///
194191
/// # Returns
195192
///
@@ -206,13 +203,16 @@ where
206203
QE: Event + Clone + Send + Sync,
207204
{
208205
let mut tx = self.pool.begin().await?;
209-
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(&mut *tx).await?;
210-
let inv_events: i64 = sqlx::query_scalar(&format!("SELECT count(*) FROM event WHERE {}", CriteriaBuilder::new(&query.change_origin(version)).build()))
211-
.fetch_one(&mut *tx)
212-
.await
213-
.map_err(map_concurrency_err)?;
206+
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
207+
.execute(&mut *tx)
208+
.await?;
214209

215-
if inv_events > 0 {
210+
if sqlx::query_scalar(&format!(
211+
"SELECT EXISTS (SELECT 1 FROM event WHERE {})",
212+
CriteriaBuilder::new(&query.change_origin(version)).build()
213+
))
214+
.fetch_one(&mut *tx)
215+
.await? {
216216
return Err(Error::Concurrency);
217217
}
218218

@@ -231,16 +231,55 @@ where
231231
.map(|(event_id, event)| PersistedEvent::new(*event_id, event))
232232
.collect::<Vec<_>>();
233233

234-
tx.commit().await?;
234+
tx.commit().await.map_err(map_concurrency_err)?;
235235

236236
Ok(persisted_events)
237237
}
238238

239+
/// Appends a batch of events to the PostgreSQL-backed event store **without** verifying
240+
/// whether new events have been added since the last read.
241+
///
242+
/// # Arguments
243+
///
244+
/// * `events` - A vector of events to be appended.
245+
///
246+
/// # Returns
247+
///
248+
/// A `Result` containing a vector of `PersistedEvent` representing the appended events,
249+
/// or an error of type `Self::Error`.
250+
async fn append_without_validation(
251+
&self,
252+
events: Vec<E>,
253+
) -> Result<Vec<PersistedEvent<PgEventId, E>>, Self::Error>
254+
where
255+
E: Clone + 'async_trait,
256+
{
257+
let mut tx = self.pool.begin().await?;
258+
259+
let mut sequence_insert = InsertEventsBuilder::new(&events, &self.serde);
260+
let event_ids: Vec<PgEventId> = sequence_insert
261+
.build()
262+
.fetch_all(&mut *tx)
263+
.await?
264+
.into_iter()
265+
.map(|r| r.get(0))
266+
.collect();
267+
268+
let persisted_events = event_ids
269+
.iter()
270+
.zip(events)
271+
.map(|(event_id, event)| PersistedEvent::new(*event_id, event))
272+
.collect::<Vec<_>>();
273+
274+
tx.commit().await?;
275+
276+
Ok(persisted_events)
277+
}
239278
}
240279

241280
fn map_concurrency_err(err: sqlx::Error) -> Error {
242281
if let sqlx::Error::Database(ref description) = err {
243-
if description.code().as_deref() == Some("23514") {
282+
if description.code().as_deref() == Some("40001") {
244283
return Error::Concurrency;
245284
}
246285
}

disintegrate-postgres/src/event_store/append.rs

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::BTreeSet;
22

3-
use disintegrate::{Event, Identifier, PersistedEvent};
3+
use disintegrate::{Event, Identifier};
44
use disintegrate_serde::Serde;
55
use sqlx::postgres::PgArguments;
66
use sqlx::query::Query;
@@ -83,4 +83,95 @@ where
8383

8484
#[cfg(test)]
8585
mod tests {
86+
use disintegrate::{
87+
domain_identifiers, ident, DomainIdentifierInfo, DomainIdentifierSet, EventInfo,
88+
EventSchema, IdentifierType,
89+
};
90+
use serde::{Deserialize, Serialize};
91+
use sqlx::Execute;
92+
93+
use super::*;
94+
95+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
96+
#[serde(tag = "event_type", rename_all = "snake_case")]
97+
enum ShoppingCartEvent {
98+
Added {
99+
product_id: String,
100+
cart_id: String,
101+
quantity: i64,
102+
},
103+
Removed {
104+
product_id: String,
105+
cart_id: String,
106+
quantity: i64,
107+
},
108+
}
109+
110+
impl Event for ShoppingCartEvent {
111+
const SCHEMA: EventSchema = EventSchema {
112+
events: &["ShoppingCartAdded", "ShoppingCartRemoved"],
113+
events_info: &[
114+
&EventInfo {
115+
name: "ShoppingCartAdded",
116+
domain_identifiers: &[&ident!(#product_id), &ident!(#cart_id)],
117+
},
118+
&EventInfo {
119+
name: "ShoppingCartRemoved",
120+
domain_identifiers: &[&ident!(#product_id), &ident!(#cart_id)],
121+
},
122+
],
123+
domain_identifiers: &[
124+
&DomainIdentifierInfo {
125+
ident: ident!(#cart_id),
126+
type_info: IdentifierType::String,
127+
},
128+
&DomainIdentifierInfo {
129+
ident: ident!(#product_id),
130+
type_info: IdentifierType::String,
131+
},
132+
],
133+
};
134+
fn name(&self) -> &'static str {
135+
match self {
136+
ShoppingCartEvent::Added { .. } => "ShoppingCartAdded",
137+
ShoppingCartEvent::Removed { .. } => "ShoppingCartRemoved",
138+
}
139+
}
140+
fn domain_identifiers(&self) -> DomainIdentifierSet {
141+
match self {
142+
ShoppingCartEvent::Added {
143+
product_id,
144+
cart_id,
145+
..
146+
} => domain_identifiers! {product_id: product_id, cart_id: cart_id},
147+
ShoppingCartEvent::Removed {
148+
product_id,
149+
cart_id,
150+
..
151+
} => domain_identifiers! {product_id: product_id, cart_id: cart_id},
152+
}
153+
}
154+
}
155+
156+
#[test]
157+
fn it_builds_event_insert() {
158+
let events = &[
159+
ShoppingCartEvent::Added {
160+
product_id: "product_1".into(),
161+
cart_id: "cart_1".into(),
162+
quantity: 10,
163+
},
164+
ShoppingCartEvent::Removed {
165+
product_id: "product_1".into(),
166+
cart_id: "cart_1".into(),
167+
quantity: 10,
168+
},
169+
];
170+
let serde = disintegrate_serde::serde::json::Json::default();
171+
let mut insert_query = InsertEventsBuilder::new(events, &serde);
172+
assert_eq!(
173+
insert_query.build().sql(),
174+
"INSERT INTO event (event_type,payload,cart_id,product_id) VALUES ($1, $2, $3, $4), ($5, $6, $7, $8) RETURNING (event_id)"
175+
);
176+
}
86177
}

disintegrate-postgres/src/event_store/tests.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use super::append::{InsertEventSequenceBuilder, InsertEventsBuilder};
1+
use super::append::InsertEventsBuilder;
22
use crate::{Error, PgEventId, PgEventStore};
33
use disintegrate::{
44
domain_identifiers, ident, query, DomainIdentifierInfo, DomainIdentifierSet, Event, EventInfo,
5-
EventSchema, EventStore, IdentifierType, PersistedEvent,
5+
EventSchema, EventStore, IdentifierType,
66
};
77
use disintegrate_serde::serde::json::Json;
88
use disintegrate_serde::Deserializer;
@@ -270,24 +270,8 @@ pub async fn insert_events<E: Event + Clone + Serialize + DeserializeOwned>(
270270
pool: &PgPool,
271271
events: &[E],
272272
) {
273-
let mut event_sequence_insert = InsertEventSequenceBuilder::new(events)
274-
.with_consumed(true)
275-
.with_committed(true);
276-
let event_ids: Vec<PgEventId> = event_sequence_insert
277-
.build()
278-
.fetch_all(pool)
279-
.await
280-
.unwrap()
281-
.into_iter()
282-
.map(|r| r.get(0))
283-
.collect();
284-
let persisted_events = event_ids
285-
.into_iter()
286-
.zip(events.iter())
287-
.map(|(id, event)| PersistedEvent::new(id, event.clone()))
288-
.collect::<Vec<_>>();
289273
let serde = disintegrate_serde::serde::json::Json::default();
290-
InsertEventsBuilder::new(persisted_events.as_slice(), &serde)
274+
InsertEventsBuilder::new(events, &serde)
291275
.build()
292276
.execute(pool)
293277
.await

disintegrate-postgres/src/listener.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,6 @@ where
539539
};
540540

541541
let result = self.handle_events_from(last_processed_id, &mut tx).await;
542-
543542
self.release_listener(result, tx).await
544543
}
545544

disintegrate-postgres/src/listener/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ async fn it_runs_event_listeners(pool: PgPool) {
216216

217217
assert!(append_result.is_ok());
218218
let carts = Cart::carts(&pool).await.unwrap();
219+
dbg!(&carts);
219220
assert_eq!(carts.len(), 1);
220221
let first_row = carts.first().unwrap();
221222
assert_eq!("cart_1", &first_row.cart_id);

disintegrate-postgres/src/migrator.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,7 @@ where
4343

4444
/// Init `PgEventStore` database
4545
pub async fn init_event_store(&self) -> Result<(), Error> {
46-
const RESERVED_NAMES: &[&str] = &[
47-
"event_id",
48-
"payload",
49-
"event_type",
50-
"inserted_at",
51-
];
46+
const RESERVED_NAMES: &[&str] = &["event_id", "payload", "event_type", "inserted_at"];
5247

5348
sqlx::query(include_str!("event_store/sql/table_event.sql"))
5449
.execute(&self.event_store.pool)

disintegrate/src/event_store.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,29 @@ where
6969
where
7070
E: Clone + 'async_trait,
7171
QE: Event + 'static + Clone + Send + Sync;
72+
73+
/// Appends a batch of events to the event store **without** verifying if
74+
/// new events have been added since the last read.
75+
///
76+
/// This method is useful when you are certain that no other process
77+
/// has modified the event store in a way that would make your logic stale.
78+
///
79+
/// If you need to guarantee that no duplicate events are added,
80+
/// use the `append` method instead, providing a query that ensures uniqueness.
81+
///
82+
/// # Arguments
83+
///
84+
/// * `events` - A vector of events to append to the event store.
85+
///
86+
///# Returns
87+
///
88+
/// A `Result` containing a vector of `PersistedEvent` representing the appended events, or an error.
89+
async fn append_without_validation(
90+
&self,
91+
events: Vec<E>,
92+
) -> Result<Vec<PersistedEvent<ID, E>>, Self::Error>
93+
where
94+
E: Clone + 'async_trait;
7295
}
7396

7497
/// An item in the event stream.

0 commit comments

Comments
 (0)