Skip to content

Commit 708bb81

Browse files
dcadenasyukibtc
authored andcommitted
lmdb: refactor ingester to use transaction-aware method
Pull-Request: #1004 Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent f91379a commit 708bb81

File tree

2 files changed

+129
-143
lines changed

2 files changed

+129
-143
lines changed

database/nostr-lmdb/src/store/ingester.rs

Lines changed: 14 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
use std::sync::mpsc::{Receiver, Sender};
66
use std::thread;
77

8-
use heed::RwTxn;
9-
use nostr::nips::nip01::Coordinate;
10-
use nostr::{Event, Kind, Timestamp};
11-
use nostr_database::{FlatBufferBuilder, RejectedReason, SaveEventStatus};
8+
use nostr::Event;
9+
use nostr_database::{FlatBufferBuilder, SaveEventStatus};
1210
use tokio::sync::oneshot;
1311

1412
use super::error::Error;
@@ -97,150 +95,24 @@ impl Ingester {
9795
event: Event,
9896
fbb: &mut FlatBufferBuilder,
9997
) -> nostr::Result<SaveEventStatus, Error> {
100-
if event.kind.is_ephemeral() {
101-
return Ok(SaveEventStatus::Rejected(RejectedReason::Ephemeral));
102-
}
103-
104-
// Initial read txn checks
105-
{
106-
// Acquire read txn
107-
let read_txn = self.db.read_txn()?;
108-
109-
// Already exists
110-
if self.db.has_event(&read_txn, event.id.as_bytes())? {
111-
return Ok(SaveEventStatus::Rejected(RejectedReason::Duplicate));
112-
}
113-
114-
// Reject event if ID was deleted
115-
if self.db.is_deleted(&read_txn, &event.id)? {
116-
return Ok(SaveEventStatus::Rejected(RejectedReason::Deleted));
117-
}
118-
119-
// Reject event if ADDR was deleted after it's created_at date
120-
// (non-parameterized or parameterized)
121-
if let Some(coordinate) = event.coordinate() {
122-
if let Some(time) = self.db.when_is_coordinate_deleted(&read_txn, &coordinate)? {
123-
if event.created_at <= time {
124-
return Ok(SaveEventStatus::Rejected(RejectedReason::Deleted));
125-
}
126-
}
127-
}
128-
129-
read_txn.commit()?;
130-
}
131-
132-
// Acquire write transaction
133-
let mut txn = self.db.write_txn()?;
134-
135-
// Remove replaceable events being replaced
136-
if event.kind.is_replaceable() {
137-
// Find replaceable event
138-
if let Some(stored) = self
139-
.db
140-
.find_replaceable_event(&txn, &event.pubkey, event.kind)?
141-
{
142-
if stored.created_at > event.created_at {
143-
txn.abort();
144-
return Ok(SaveEventStatus::Rejected(RejectedReason::Replaced));
145-
}
146-
147-
// Acquire read txn
148-
let read_txn = self.db.read_txn()?;
149-
150-
let coordinate: Coordinate = Coordinate::new(event.kind, event.pubkey);
151-
self.db
152-
.remove_replaceable(&read_txn, &mut txn, &coordinate, event.created_at)?;
153-
154-
read_txn.commit()?;
155-
}
156-
}
157-
158-
// Remove parameterized replaceable events being replaced
159-
if event.kind.is_addressable() {
160-
if let Some(identifier) = event.tags.identifier() {
161-
let coordinate: Coordinate =
162-
Coordinate::new(event.kind, event.pubkey).identifier(identifier);
163-
164-
// Find param replaceable event
165-
if let Some(stored) = self.db.find_addressable_event(&txn, &coordinate)? {
166-
if stored.created_at > event.created_at {
167-
txn.abort();
168-
return Ok(SaveEventStatus::Rejected(RejectedReason::Replaced));
169-
}
170-
171-
// Acquire read txn
172-
let read_txn = self.db.read_txn()?;
173-
174-
self.db.remove_addressable(
175-
&read_txn,
176-
&mut txn,
177-
&coordinate,
178-
Timestamp::max(),
179-
)?;
180-
181-
read_txn.commit()?;
182-
}
183-
}
184-
}
185-
186-
// Handle deletion events
187-
if let Kind::EventDeletion = event.kind {
188-
let invalid: bool = self.handle_deletion_event(&mut txn, &event)?;
189-
190-
if invalid {
191-
txn.abort();
192-
return Ok(SaveEventStatus::Rejected(RejectedReason::InvalidDelete));
193-
}
194-
}
195-
196-
// Store and index the event
197-
self.db.store(&mut txn, fbb, &event)?;
198-
199-
// Commit
200-
txn.commit()?;
201-
202-
Ok(SaveEventStatus::Success)
203-
}
204-
205-
fn handle_deletion_event(&self, txn: &mut RwTxn, event: &Event) -> nostr::Result<bool, Error> {
206-
// Acquire read txn
20798
let read_txn = self.db.read_txn()?;
99+
let mut write_txn = self.db.write_txn()?;
208100

209-
for id in event.tags.event_ids() {
210-
if let Some(target) = self.db.get_event_by_id(&read_txn, id.as_bytes())? {
211-
// Author must match
212-
if target.pubkey != event.pubkey.as_bytes() {
213-
return Ok(true);
214-
}
215-
216-
// Mark as deleted and remove event
217-
self.db.mark_deleted(txn, id)?;
218-
self.db.remove(txn, &target)?;
219-
}
220-
}
101+
let status: SaveEventStatus =
102+
self.db
103+
.save_event_with_txn(&read_txn, &mut write_txn, fbb, &event)?;
221104

222-
for coordinate in event.tags.coordinates() {
223-
// Author must match
224-
if coordinate.public_key != event.pubkey {
225-
return Ok(true);
105+
match &status {
106+
SaveEventStatus::Success => {
107+
write_txn.commit()?;
108+
read_txn.commit()?;
226109
}
227-
228-
// Mark deleted
229-
self.db
230-
.mark_coordinate_deleted(txn, &coordinate.borrow(), event.created_at)?;
231-
232-
// Remove events (up to the created_at of the deletion event)
233-
if coordinate.kind.is_replaceable() {
234-
self.db
235-
.remove_replaceable(&read_txn, txn, coordinate, event.created_at)?;
236-
} else if coordinate.kind.is_addressable() {
237-
self.db
238-
.remove_addressable(&read_txn, txn, coordinate, event.created_at)?;
110+
SaveEventStatus::Rejected(_) => {
111+
write_txn.abort();
112+
read_txn.commit()?;
239113
}
240114
}
241115

242-
read_txn.commit()?;
243-
244-
Ok(false)
116+
Ok(status)
245117
}
246118
}

database/nostr-lmdb/src/store/lmdb/mod.rs

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ use std::path::Path;
1111
use heed::byteorder::NativeEndian;
1212
use heed::types::{Bytes, Unit, U64};
1313
use heed::{Database, Env, EnvFlags, EnvOpenOptions, RoRange, RoTxn, RwTxn};
14+
use nostr::nips::nip01::Coordinate;
1415
use nostr::prelude::*;
1516
use nostr_database::flatbuffers::FlatBufferDecodeBorrowed;
16-
use nostr_database::{FlatBufferBuilder, FlatBufferEncode};
17+
use nostr_database::{FlatBufferBuilder, FlatBufferEncode, RejectedReason, SaveEventStatus};
1718

1819
mod index;
1920

@@ -284,6 +285,80 @@ impl Lmdb {
284285
Ok(self.get_event_by_id(txn, event_id)?.is_some())
285286
}
286287

288+
/// Save event with transaction support
289+
pub(crate) fn save_event_with_txn(
290+
&self,
291+
read_txn: &RoTxn,
292+
txn: &mut RwTxn,
293+
fbb: &mut FlatBufferBuilder,
294+
event: &Event,
295+
) -> Result<SaveEventStatus, Error> {
296+
if event.kind.is_ephemeral() {
297+
return Ok(SaveEventStatus::Rejected(RejectedReason::Ephemeral));
298+
}
299+
300+
// Already exists
301+
if self.has_event(read_txn, event.id.as_bytes())? {
302+
return Ok(SaveEventStatus::Rejected(RejectedReason::Duplicate));
303+
}
304+
305+
// Reject event if ID was deleted
306+
if self.is_deleted(read_txn, &event.id)? {
307+
return Ok(SaveEventStatus::Rejected(RejectedReason::Deleted));
308+
}
309+
310+
// Reject event if ADDR was deleted after it's created_at date
311+
// (non-parameterized or parameterized)
312+
if let Some(coordinate) = event.coordinate() {
313+
if let Some(time) = self.when_is_coordinate_deleted(read_txn, &coordinate)? {
314+
if event.created_at <= time {
315+
return Ok(SaveEventStatus::Rejected(RejectedReason::Deleted));
316+
}
317+
}
318+
}
319+
320+
// Remove replaceable events being replaced
321+
if event.kind.is_replaceable() {
322+
if let Some(stored) =
323+
self.find_replaceable_event(read_txn, &event.pubkey, event.kind)?
324+
{
325+
if stored.created_at > event.created_at {
326+
return Ok(SaveEventStatus::Rejected(RejectedReason::Replaced));
327+
}
328+
329+
let coordinate = Coordinate::new(event.kind, event.pubkey);
330+
self.remove_replaceable(read_txn, txn, &coordinate, event.created_at)?;
331+
}
332+
}
333+
334+
// Remove addressable events being replaced
335+
if event.kind.is_addressable() {
336+
if let Some(identifier) = event.tags.identifier() {
337+
let coordinate = Coordinate::new(event.kind, event.pubkey).identifier(identifier);
338+
339+
if let Some(stored) = self.find_addressable_event(read_txn, &coordinate)? {
340+
if stored.created_at > event.created_at {
341+
return Ok(SaveEventStatus::Rejected(RejectedReason::Replaced));
342+
}
343+
344+
self.remove_addressable(read_txn, txn, &coordinate, Timestamp::max())?;
345+
}
346+
}
347+
}
348+
349+
// Handle deletion events
350+
if event.kind == Kind::EventDeletion {
351+
let invalid: bool = self.handle_deletion_event(read_txn, txn, event)?;
352+
if invalid {
353+
return Ok(SaveEventStatus::Rejected(RejectedReason::InvalidDelete));
354+
}
355+
}
356+
357+
self.store(txn, fbb, event)?;
358+
359+
Ok(SaveEventStatus::Success)
360+
}
361+
287362
#[inline]
288363
pub(crate) fn get_event_by_id<'a>(
289364
&self,
@@ -803,6 +878,45 @@ impl Lmdb {
803878
Ok(self.atc_index.range(txn, &range)?)
804879
}
805880

881+
fn handle_deletion_event(
882+
&self,
883+
read_txn: &RoTxn,
884+
txn: &mut RwTxn,
885+
event: &Event,
886+
) -> Result<bool, Error> {
887+
for id in event.tags.event_ids() {
888+
if let Some(target) = self.get_event_by_id(read_txn, id.as_bytes())? {
889+
// Author must match
890+
if target.pubkey != event.pubkey.as_bytes() {
891+
return Ok(true);
892+
}
893+
894+
// Mark as deleted and remove event
895+
self.mark_deleted(txn, id)?;
896+
self.remove(txn, &target)?;
897+
}
898+
}
899+
900+
for coordinate in event.tags.coordinates() {
901+
// Author must match
902+
if coordinate.public_key != event.pubkey {
903+
return Ok(true);
904+
}
905+
906+
// Mark deleted
907+
self.mark_coordinate_deleted(txn, &coordinate.borrow(), event.created_at)?;
908+
909+
// Remove events (up to the created_at of the deletion event)
910+
if coordinate.kind.is_replaceable() {
911+
self.remove_replaceable(read_txn, txn, coordinate, event.created_at)?;
912+
} else if coordinate.kind.is_addressable() {
913+
self.remove_addressable(read_txn, txn, coordinate, event.created_at)?;
914+
}
915+
}
916+
917+
Ok(false)
918+
}
919+
806920
pub(crate) fn ktc_iter<'a>(
807921
&'a self,
808922
txn: &'a RoTxn,

0 commit comments

Comments
 (0)