-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathentity.rs
More file actions
213 lines (183 loc) · 6.77 KB
/
entity.rs
File metadata and controls
213 lines (183 loc) · 6.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_sqlite::constants::SQL_FELT_DELIMITER;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::OptimisticEntity;
use tracing::{error, trace};
use super::match_entity;
use torii_proto::proto::world::SubscribeEntityResponse;
use torii_proto::Clause;
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";
#[derive(Debug)]
pub struct EntitiesSubscriber {
/// The clause that the subscriber is interested in
pub(crate) clause: Option<Clause>,
/// The channel to send the response back to the subscriber.
pub(crate) sender: Sender<Result<SubscribeEntityResponse, tonic::Status>>,
}
#[derive(Debug, Default)]
pub struct EntityManager {
subscribers: RwLock<HashMap<u64, EntitiesSubscriber>>,
}
impl EntityManager {
pub async fn add_subscriber(
&self,
clause: Option<Clause>,
) -> Result<Receiver<Result<SubscribeEntityResponse, tonic::Status>>, Error> {
let subscription_id = rand::thread_rng().gen::<u64>();
let (sender, receiver) = channel(1);
// NOTE: unlock issue with firefox/safari
// initially send empty stream message to return from
// initial subscribe call
let _ = sender
.send(Ok(SubscribeEntityResponse {
entity: None,
subscription_id,
}))
.await;
self.subscribers
.write()
.await
.insert(subscription_id, EntitiesSubscriber { clause, sender });
Ok(receiver)
}
pub async fn update_subscriber(&self, id: u64, clause: Option<Clause>) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
subscriber.sender.clone()
} else {
return; // Subscriber not found, exit early
}
};
self.subscribers
.write()
.await
.insert(id, EntitiesSubscriber { clause, sender });
}
pub(super) async fn remove_subscriber(&self, id: u64) {
self.subscribers.write().await.remove(&id);
}
}
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
entity_sender: UnboundedSender<OptimisticEntity>,
}
impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
let (entity_sender, entity_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
entity_sender,
};
tokio::spawn(Self::publish_updates(subs_manager, entity_receiver));
service
}
async fn publish_updates(
subs: Arc<EntityManager>,
mut entity_receiver: UnboundedReceiver<OptimisticEntity>,
) {
while let Some(entity) = entity_receiver.recv().await {
if let Err(e) = Self::process_entity_update(&subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Processing entity update.");
}
}
}
async fn process_entity_update(
subs: &Arc<EntityManager>,
entity: &OptimisticEntity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let hashed = Felt::from_str(&entity.id).map_err(ParseError::FromStr)?;
// keys is empty when an entity is updated with StoreUpdateRecord or Member but the entity
// has never been set before. In that case, we dont know the keys
let keys = entity
.keys
.trim_end_matches(SQL_FELT_DELIMITER)
.split(SQL_FELT_DELIMITER)
.filter_map(|key| {
if key.is_empty() {
None
} else {
Some(Felt::from_str(key))
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(ParseError::FromStr)?;
for (idx, sub) in subs.subscribers.read().await.iter() {
// Check if the subscriber is interested in this entity
// If we have a clause of hashed keys, then check that the id of the entity
// is in the list of hashed keys.
// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if let Some(clause) = &sub.clause {
if !match_entity(hashed, &keys, &entity.updated_model, clause) {
continue;
}
}
if entity.deleted {
let resp = SubscribeEntityResponse {
entity: Some(torii_proto::proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![],
}),
subscription_id: *idx,
};
if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
continue;
}
// This should NEVER be None
let model = entity
.updated_model
.as_ref()
.unwrap()
.as_struct()
.unwrap()
.clone();
let resp = SubscribeEntityResponse {
entity: Some(torii_proto::proto::types::Entity {
hashed_keys: hashed.to_bytes_be().to_vec(),
models: vec![model.into()],
}),
subscription_id: *idx,
};
if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}
for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing entity stream.");
subs.remove_subscriber(id).await
}
Ok(())
}
}
impl Future for Service {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
}
Poll::Pending
}
}