|
5 | 5 | //! Nostr Database Indexes
|
6 | 6 |
|
7 | 7 | use std::cmp::Ordering;
|
8 |
| -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; |
9 |
| -//use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering}; |
| 8 | +use std::collections::{BTreeSet, HashMap, HashSet}; |
10 | 9 | use std::sync::Arc;
|
11 | 10 |
|
12 | 11 | use nostr::event::id;
|
13 | 12 | use nostr::nips::nip01::Coordinate;
|
14 | 13 | use nostr::secp256k1::XOnlyPublicKey;
|
15 |
| -use nostr::{ |
16 |
| - Alphabet, Event, EventId, Filter, GenericTagValue, Kind, TagIndexValues, TagIndexes, Timestamp, |
17 |
| -}; |
| 14 | +use nostr::{Alphabet, Event, EventId, Filter, GenericTagValue, Kind, Timestamp}; |
18 | 15 | use rayon::prelude::*;
|
19 | 16 | use thiserror::Error;
|
20 | 17 | use tokio::sync::RwLock;
|
21 | 18 |
|
22 | 19 | use crate::raw::RawEvent;
|
| 20 | +use crate::tag_indexes::{TagIndexValues, TagIndexes}; |
23 | 21 |
|
24 | 22 | /// Public Key Prefix Size
|
25 | 23 | const PUBLIC_KEY_PREFIX_SIZE: usize = 8;
|
@@ -81,32 +79,11 @@ impl From<&Event> for EventIndex {
|
81 | 79 | event_id: e.id,
|
82 | 80 | pubkey: PublicKeyPrefix::from(e.pubkey),
|
83 | 81 | kind: e.kind,
|
84 |
| - tags: e.build_tags_index(), |
| 82 | + tags: TagIndexes::from(e.tags.iter().map(|t| t.as_vec())), |
85 | 83 | }
|
86 | 84 | }
|
87 | 85 | }
|
88 | 86 |
|
89 |
| -impl EventIndex { |
90 |
| - fn filter_tags_match(&self, filter: &FilterIndex) -> bool { |
91 |
| - if filter.generic_tags.is_empty() { |
92 |
| - return true; |
93 |
| - } |
94 |
| - |
95 |
| - if self.tags.is_empty() { |
96 |
| - return false; |
97 |
| - } |
98 |
| - |
99 |
| - filter.generic_tags.iter().all(|(tagname, set)| { |
100 |
| - self.tags.get(tagname).map_or(false, |valset| { |
101 |
| - TagIndexValues::iter(set) |
102 |
| - .filter(|t| valset.contains(t)) |
103 |
| - .count() |
104 |
| - > 0 |
105 |
| - }) |
106 |
| - }) |
107 |
| - } |
108 |
| -} |
109 |
| - |
110 | 87 | /// Public Key prefix
|
111 | 88 | #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
112 | 89 | struct PublicKeyPrefix([u8; PUBLIC_KEY_PREFIX_SIZE]);
|
@@ -134,12 +111,12 @@ impl From<[u8; 32]> for PublicKeyPrefix {
|
134 | 111 |
|
135 | 112 | #[derive(Default)]
|
136 | 113 | struct FilterIndex {
|
137 |
| - ids: BTreeSet<EventId>, |
138 |
| - authors: BTreeSet<PublicKeyPrefix>, |
139 |
| - kinds: BTreeSet<Kind>, |
| 114 | + ids: HashSet<EventId>, |
| 115 | + authors: HashSet<PublicKeyPrefix>, |
| 116 | + kinds: HashSet<Kind>, |
140 | 117 | since: Option<Timestamp>,
|
141 | 118 | until: Option<Timestamp>,
|
142 |
| - generic_tags: BTreeMap<Alphabet, BTreeSet<GenericTagValue>>, |
| 119 | + generic_tags: HashMap<Alphabet, BTreeSet<GenericTagValue>>, |
143 | 120 | }
|
144 | 121 |
|
145 | 122 | impl FilterIndex {
|
@@ -167,21 +144,60 @@ impl FilterIndex {
|
167 | 144 | .insert(identifier);
|
168 | 145 | self
|
169 | 146 | }
|
| 147 | + |
| 148 | + fn ids_match(&self, event: &EventIndex) -> bool { |
| 149 | + self.ids.is_empty() || self.ids.contains(&event.event_id) |
| 150 | + } |
| 151 | + |
| 152 | + fn authors_match(&self, event: &EventIndex) -> bool { |
| 153 | + self.authors.is_empty() || self.authors.contains(&event.pubkey) |
| 154 | + } |
| 155 | + |
| 156 | + fn tag_match(&self, event: &EventIndex) -> bool { |
| 157 | + if self.generic_tags.is_empty() { |
| 158 | + return true; |
| 159 | + } |
| 160 | + if event.tags.is_empty() { |
| 161 | + return false; |
| 162 | + } |
| 163 | + |
| 164 | + self.generic_tags.iter().all(|(tagname, set)| { |
| 165 | + event.tags.get(tagname).map_or(false, |valset| { |
| 166 | + TagIndexValues::iter(set.iter()) |
| 167 | + .filter(|t| valset.contains(t)) |
| 168 | + .count() |
| 169 | + > 0 |
| 170 | + }) |
| 171 | + }) |
| 172 | + } |
| 173 | + |
| 174 | + fn kind_match(&self, kind: &Kind) -> bool { |
| 175 | + self.kinds.is_empty() || self.kinds.contains(kind) |
| 176 | + } |
| 177 | + |
| 178 | + pub fn match_event(&self, event: &EventIndex) -> bool { |
| 179 | + self.ids_match(event) |
| 180 | + && self.since.map_or(true, |t| event.created_at >= t) |
| 181 | + && self.until.map_or(true, |t| event.created_at <= t) |
| 182 | + && self.kind_match(&event.kind) |
| 183 | + && self.authors_match(event) |
| 184 | + && self.tag_match(event) |
| 185 | + } |
170 | 186 | }
|
171 | 187 |
|
172 | 188 | impl From<Filter> for FilterIndex {
|
173 | 189 | fn from(value: Filter) -> Self {
|
174 | 190 | Self {
|
175 |
| - ids: value.ids, |
| 191 | + ids: value.ids.into_iter().collect(), |
176 | 192 | authors: value
|
177 | 193 | .authors
|
178 | 194 | .into_iter()
|
179 | 195 | .map(PublicKeyPrefix::from)
|
180 | 196 | .collect(),
|
181 |
| - kinds: value.kinds, |
| 197 | + kinds: value.kinds.into_iter().collect(), |
182 | 198 | since: value.since,
|
183 | 199 | until: value.until,
|
184 |
| - generic_tags: value.generic_tags, |
| 200 | + generic_tags: value.generic_tags.into_iter().collect(), |
185 | 201 | }
|
186 | 202 | }
|
187 | 203 | }
|
@@ -468,61 +484,11 @@ impl DatabaseIndexes {
|
468 | 484 | T: Into<FilterIndex>,
|
469 | 485 | {
|
470 | 486 | let filter: FilterIndex = filter.into();
|
471 |
| - index.par_iter().filter(move |m| { |
472 |
| - !deleted_ids.contains(&m.event_id) |
473 |
| - && filter.until.map_or(true, |t| m.created_at <= t) |
474 |
| - && filter.since.map_or(true, |t| m.created_at >= t) |
475 |
| - && (filter.ids.is_empty() || filter.ids.contains(&m.event_id)) |
476 |
| - && (filter.authors.is_empty() || filter.authors.contains(&m.pubkey)) |
477 |
| - && (filter.kinds.is_empty() || filter.kinds.contains(&m.kind)) |
478 |
| - && m.filter_tags_match(&filter) |
| 487 | + index.par_iter().filter(move |event| { |
| 488 | + !deleted_ids.contains(&event.event_id) && filter.match_event(event) |
479 | 489 | })
|
480 | 490 | }
|
481 | 491 |
|
482 |
| - /* fn internal_multi_parallel_query<'a, I, T>( |
483 |
| - &self, |
484 |
| - index: &'a BTreeSet<EventIndex>, |
485 |
| - deleted: &'a HashSet<EventId>, |
486 |
| - filters: I, |
487 |
| - ) -> impl ParallelIterator<Item = &'a EventIndex> |
488 |
| - where |
489 |
| - I: IntoIterator<Item = T>, |
490 |
| - T: Into<FilterIndex>, |
491 |
| - { |
492 |
| - let filters: Vec<FilterIndex> = filters.into_iter().map(|f| f.into()).collect(); |
493 |
| - let limits: Vec<Option<usize>> = filters.iter().map(|f| f.limit).collect(); |
494 |
| - let counter: Vec<AtomicUsize> = filters.iter().map(|_| AtomicUsize::new(0)).collect(); |
495 |
| - index |
496 |
| - .par_iter() |
497 |
| - .filter(move |i| !deleted.contains(&i.event_id)) |
498 |
| - .filter(move |i| { |
499 |
| - filters.par_iter().enumerate().any(|(index, filter)| { |
500 |
| - if let Some(Some(limit)) = limits.get(index) { |
501 |
| - if let Some(counter) = counter.get(index) { |
502 |
| - if counter.load(AtomicOrdering::SeqCst) >= *limit { |
503 |
| - return false; |
504 |
| - } |
505 |
| - } |
506 |
| - } |
507 |
| -
|
508 |
| - let status: bool = filter.until.map_or(true, |t| i.created_at <= t) |
509 |
| - && filter.since.map_or(true, |t| i.created_at >= t) |
510 |
| - && (filter.ids.is_empty() || filter.ids.contains(&i.event_id)) |
511 |
| - && (filter.authors.is_empty() || filter.authors.contains(&i.pubkey)) |
512 |
| - && (filter.kinds.is_empty() || filter.kinds.contains(&i.kind)) |
513 |
| - && i.filter_tags_match(&filter); |
514 |
| -
|
515 |
| - if status { |
516 |
| - if let Some(counter) = counter.get(index) { |
517 |
| - counter.fetch_add(1, AtomicOrdering::SeqCst); |
518 |
| - } |
519 |
| - } |
520 |
| -
|
521 |
| - status |
522 |
| - }) |
523 |
| - }) |
524 |
| - } */ |
525 |
| - |
526 | 492 | /// Query
|
527 | 493 | #[tracing::instrument(skip_all, level = "trace")]
|
528 | 494 | pub async fn query<I>(&self, filters: I) -> Vec<EventId>
|
|
0 commit comments