Skip to content

Commit 3ae9b23

Browse files
authored
feat(torii-grpc): member clause on subscriptions (#20)
* feat(torii-grpc): member clause on subscriptions * start updating clause matching * use clause in subscriptions and proto * reset lockfile * subscription all clauses * fix: match entity * client * optional clause sub * update client * clippy
1 parent b539283 commit 3ae9b23

File tree

13 files changed

+526
-275
lines changed

13 files changed

+526
-275
lines changed

crates/client/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use dojo_world::contracts::model::ModelError;
22
use starknet::core::types::Felt;
33
use starknet::core::utils::{CairoShortStringToFeltError, ParseCairoShortStringError};
4-
use torii_proto::schema::SchemaError;
4+
use torii_proto::error::ProtoError;
55

66
#[derive(Debug, thiserror::Error)]
77
pub enum Error {
@@ -22,7 +22,7 @@ pub enum Error {
2222
#[error("Unsupported query")]
2323
UnsupportedQuery,
2424
#[error(transparent)]
25-
Schema(#[from] SchemaError),
25+
Proto(#[from] ProtoError),
2626
}
2727

2828
#[derive(Debug, thiserror::Error)]

crates/client/src/lib.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use torii_proto::proto::world::{
1919
};
2020
use torii_proto::schema::Entity;
2121
use torii_proto::{
22-
Controller, EntityKeysClause, Event, EventQuery, Page, Query, Token, TokenBalance,
22+
Clause, Controller, Event, EventQuery, KeysClause, Page, Query, Token, TokenBalance,
2323
};
2424

2525
use crate::error::Error;
@@ -212,53 +212,53 @@ impl Client {
212212
/// A direct stream to grpc subscribe entities
213213
pub async fn on_entity_updated(
214214
&self,
215-
clauses: Vec<EntityKeysClause>,
215+
clause: Option<Clause>,
216216
) -> Result<EntityUpdateStreaming, Error> {
217217
let mut grpc_client = self.inner.write().await;
218-
let stream = grpc_client.subscribe_entities(clauses).await?;
218+
let stream = grpc_client.subscribe_entities(clause).await?;
219219
Ok(stream)
220220
}
221221

222222
/// Update the entities subscription
223223
pub async fn update_entity_subscription(
224224
&self,
225225
subscription_id: u64,
226-
clauses: Vec<EntityKeysClause>,
226+
clause: Option<Clause>,
227227
) -> Result<(), Error> {
228228
let mut grpc_client = self.inner.write().await;
229229
grpc_client
230-
.update_entities_subscription(subscription_id, clauses)
230+
.update_entities_subscription(subscription_id, clause)
231231
.await?;
232232
Ok(())
233233
}
234234

235235
/// A direct stream to grpc subscribe event messages
236236
pub async fn on_event_message_updated(
237237
&self,
238-
clauses: Vec<EntityKeysClause>,
238+
clause: Option<Clause>,
239239
) -> Result<EntityUpdateStreaming, Error> {
240240
let mut grpc_client = self.inner.write().await;
241-
let stream = grpc_client.subscribe_event_messages(clauses).await?;
241+
let stream = grpc_client.subscribe_event_messages(clause).await?;
242242
Ok(stream)
243243
}
244244

245245
/// Update the event messages subscription
246246
pub async fn update_event_message_subscription(
247247
&self,
248248
subscription_id: u64,
249-
clauses: Vec<EntityKeysClause>,
249+
clause: Option<Clause>,
250250
) -> Result<(), Error> {
251251
let mut grpc_client = self.inner.write().await;
252252
grpc_client
253-
.update_event_messages_subscription(subscription_id, clauses)
253+
.update_event_messages_subscription(subscription_id, clause)
254254
.await?;
255255
Ok(())
256256
}
257257

258258
/// A direct stream to grpc subscribe starknet events
259259
pub async fn on_starknet_event(
260260
&self,
261-
keys: Vec<EntityKeysClause>,
261+
keys: Vec<KeysClause>,
262262
) -> Result<EventUpdateStreaming, Error> {
263263
let mut grpc_client = self.inner.write().await;
264264
let stream = grpc_client.subscribe_events(keys).await?;

crates/grpc/client/src/lib.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tonic::codec::CompressionEncoding;
1616
#[cfg(not(target_arch = "wasm32"))]
1717
use tonic::transport::Endpoint;
1818

19+
use torii_proto::error::ProtoError;
1920
use torii_proto::proto::world::{
2021
world_client, RetrieveControllersRequest, RetrieveControllersResponse, RetrieveEntitiesRequest,
2122
RetrieveEntitiesResponse, RetrieveEventMessagesRequest, RetrieveEventsRequest,
@@ -28,8 +29,10 @@ use torii_proto::proto::world::{
2829
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
2930
UpdateTokenSubscriptionRequest, WorldMetadataRequest,
3031
};
31-
use torii_proto::schema::{Entity, SchemaError};
32-
use torii_proto::{EntityKeysClause, Event, EventQuery, IndexerUpdate, Query, Token, TokenBalance};
32+
use torii_proto::schema::Entity;
33+
use torii_proto::{
34+
Clause, Event, EventQuery, IndexerUpdate, KeysClause, Query, Token, TokenBalance,
35+
};
3336

3437
pub use torii_proto as types;
3538

@@ -48,7 +51,7 @@ pub enum Error {
4851
#[error(transparent)]
4952
Transport(tonic::transport::Error),
5053
#[error(transparent)]
51-
Schema(#[from] SchemaError),
54+
Proto(#[from] ProtoError),
5255
}
5356

5457
#[derive(Debug)]
@@ -98,11 +101,11 @@ impl WorldClient {
98101
.and_then(|res| {
99102
res.into_inner()
100103
.metadata
101-
.ok_or(Error::Schema(SchemaError::MissingExpectedData(
104+
.ok_or(Error::Proto(ProtoError::MissingExpectedData(
102105
"metadata".to_string(),
103106
)))
104107
})
105-
.and_then(|metadata| metadata.try_into().map_err(Error::ParseStr))
108+
.and_then(|metadata| metadata.try_into().map_err(Error::Proto))
106109
}
107110

108111
pub async fn retrieve_controllers(
@@ -304,12 +307,13 @@ impl WorldClient {
304307
/// Subscribe to entities updates of a World.
305308
pub async fn subscribe_entities(
306309
&mut self,
307-
clauses: Vec<EntityKeysClause>,
310+
clause: Option<Clause>,
308311
) -> Result<EntityUpdateStreaming, Error> {
309-
let clauses = clauses.into_iter().map(|c| c.into()).collect();
310312
let stream = self
311313
.inner
312-
.subscribe_entities(SubscribeEntitiesRequest { clauses })
314+
.subscribe_entities(SubscribeEntitiesRequest {
315+
clause: clause.map(|c| c.into()),
316+
})
313317
.await
314318
.map_err(Error::Grpc)
315319
.map(|res| res.into_inner())?;
@@ -337,14 +341,12 @@ impl WorldClient {
337341
pub async fn update_entities_subscription(
338342
&mut self,
339343
subscription_id: u64,
340-
clauses: Vec<EntityKeysClause>,
344+
clause: Option<Clause>,
341345
) -> Result<(), Error> {
342-
let clauses = clauses.into_iter().map(|c| c.into()).collect();
343-
344346
self.inner
345347
.update_entities_subscription(UpdateEntitiesSubscriptionRequest {
346348
subscription_id,
347-
clauses,
349+
clause: clause.map(|c| c.into()),
348350
})
349351
.await
350352
.map_err(Error::Grpc)
@@ -354,12 +356,13 @@ impl WorldClient {
354356
/// Subscribe to event messages of a World.
355357
pub async fn subscribe_event_messages(
356358
&mut self,
357-
clauses: Vec<EntityKeysClause>,
359+
clause: Option<Clause>,
358360
) -> Result<EntityUpdateStreaming, Error> {
359-
let clauses = clauses.into_iter().map(|c| c.into()).collect();
360361
let stream = self
361362
.inner
362-
.subscribe_event_messages(SubscribeEventMessagesRequest { clauses })
363+
.subscribe_event_messages(SubscribeEventMessagesRequest {
364+
clause: clause.map(|c| c.into()),
365+
})
363366
.await
364367
.map_err(Error::Grpc)
365368
.map(|res| res.into_inner())?;
@@ -387,13 +390,12 @@ impl WorldClient {
387390
pub async fn update_event_messages_subscription(
388391
&mut self,
389392
subscription_id: u64,
390-
clauses: Vec<EntityKeysClause>,
393+
clause: Option<Clause>,
391394
) -> Result<(), Error> {
392-
let clauses = clauses.into_iter().map(|c| c.into()).collect();
393395
self.inner
394396
.update_event_messages_subscription(UpdateEventMessagesSubscriptionRequest {
395397
subscription_id,
396-
clauses,
398+
clause: clause.map(|c| c.into()),
397399
})
398400
.await
399401
.map_err(Error::Grpc)
@@ -403,7 +405,7 @@ impl WorldClient {
403405
/// Subscribe to the events of a World.
404406
pub async fn subscribe_events(
405407
&mut self,
406-
keys: Vec<EntityKeysClause>,
408+
keys: Vec<KeysClause>,
407409
) -> Result<EventUpdateStreaming, Error> {
408410
let keys = keys.into_iter().map(|c| c.into()).collect();
409411

crates/grpc/server/src/lib.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use tonic::codec::CompressionEncoding;
4040
use tonic::transport::Server;
4141
use tonic::{Request, Response, Status};
4242
use tonic_web::GrpcWebLayer;
43+
use torii_proto::error::ProtoError;
4344
use torii_sqlite::cache::ModelCache;
4445
use torii_sqlite::error::{ParseError, QueryError};
4546
use torii_sqlite::model::{fetch_entities, map_row_to_ty};
@@ -1525,10 +1526,15 @@ impl proto::world::world_server::World for DojoWorld {
15251526
&self,
15261527
request: Request<SubscribeEntitiesRequest>,
15271528
) -> ServiceResult<Self::SubscribeEntitiesStream> {
1528-
let SubscribeEntitiesRequest { clauses } = request.into_inner();
1529+
let SubscribeEntitiesRequest { clause } = request.into_inner();
1530+
let clause = clause
1531+
.map(|c| c.try_into())
1532+
.transpose()
1533+
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
1534+
15291535
let rx = self
15301536
.entity_manager
1531-
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
1537+
.add_subscriber(clause)
15321538
.await
15331539
.map_err(|e| Status::internal(e.to_string()))?;
15341540

@@ -1543,13 +1549,14 @@ impl proto::world::world_server::World for DojoWorld {
15431549
) -> ServiceResult<()> {
15441550
let UpdateEntitiesSubscriptionRequest {
15451551
subscription_id,
1546-
clauses,
1552+
clause,
15471553
} = request.into_inner();
1554+
let clause = clause
1555+
.map(|c| c.try_into())
1556+
.transpose()
1557+
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
15481558
self.entity_manager
1549-
.update_subscriber(
1550-
subscription_id,
1551-
clauses.into_iter().map(|keys| keys.into()).collect(),
1552-
)
1559+
.update_subscriber(subscription_id, clause)
15531560
.await;
15541561

15551562
Ok(Response::new(()))
@@ -1625,10 +1632,14 @@ impl proto::world::world_server::World for DojoWorld {
16251632
&self,
16261633
request: Request<SubscribeEventMessagesRequest>,
16271634
) -> ServiceResult<Self::SubscribeEntitiesStream> {
1628-
let SubscribeEventMessagesRequest { clauses } = request.into_inner();
1635+
let SubscribeEventMessagesRequest { clause } = request.into_inner();
1636+
let clause = clause
1637+
.map(|c| c.try_into())
1638+
.transpose()
1639+
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
16291640
let rx = self
16301641
.event_message_manager
1631-
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
1642+
.add_subscriber(clause)
16321643
.await
16331644
.map_err(|e| Status::internal(e.to_string()))?;
16341645

@@ -1643,13 +1654,14 @@ impl proto::world::world_server::World for DojoWorld {
16431654
) -> ServiceResult<()> {
16441655
let UpdateEventMessagesSubscriptionRequest {
16451656
subscription_id,
1646-
clauses,
1657+
clause,
16471658
} = request.into_inner();
1659+
let clause = clause
1660+
.map(|c| c.try_into())
1661+
.transpose()
1662+
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
16481663
self.event_message_manager
1649-
.update_subscriber(
1650-
subscription_id,
1651-
clauses.into_iter().map(|keys| keys.into()).collect(),
1652-
)
1664+
.update_subscriber(subscription_id, clause)
16531665
.await;
16541666

16551667
Ok(Response::new(()))

crates/grpc/server/src/subscriptions/entity.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use torii_sqlite::simple_broker::SimpleBroker;
1919
use torii_sqlite::types::OptimisticEntity;
2020
use tracing::{error, trace};
2121

22-
use super::match_entity_keys;
22+
use super::match_entity;
2323
use torii_proto::proto::world::SubscribeEntityResponse;
24-
use torii_proto::EntityKeysClause;
24+
use torii_proto::Clause;
2525

2626
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";
2727

2828
#[derive(Debug)]
2929
pub struct EntitiesSubscriber {
30-
/// Entity ids that the subscriber is interested in
31-
pub(crate) clauses: Vec<EntityKeysClause>,
30+
/// The clause that the subscriber is interested in
31+
pub(crate) clause: Option<Clause>,
3232
/// The channel to send the response back to the subscriber.
3333
pub(crate) sender: Sender<Result<SubscribeEntityResponse, tonic::Status>>,
3434
}
@@ -40,7 +40,7 @@ pub struct EntityManager {
4040
impl EntityManager {
4141
pub async fn add_subscriber(
4242
&self,
43-
clauses: Vec<EntityKeysClause>,
43+
clause: Option<Clause>,
4444
) -> Result<Receiver<Result<SubscribeEntityResponse, tonic::Status>>, Error> {
4545
let subscription_id = rand::thread_rng().gen::<u64>();
4646
let (sender, receiver) = channel(1);
@@ -58,12 +58,12 @@ impl EntityManager {
5858
self.subscribers
5959
.write()
6060
.await
61-
.insert(subscription_id, EntitiesSubscriber { clauses, sender });
61+
.insert(subscription_id, EntitiesSubscriber { clause, sender });
6262

6363
Ok(receiver)
6464
}
6565

66-
pub async fn update_subscriber(&self, id: u64, clauses: Vec<EntityKeysClause>) {
66+
pub async fn update_subscriber(&self, id: u64, clause: Option<Clause>) {
6767
let sender = {
6868
let subscribers = self.subscribers.read().await;
6969
if let Some(subscriber) = subscribers.get(&id) {
@@ -76,7 +76,7 @@ impl EntityManager {
7676
self.subscribers
7777
.write()
7878
.await
79-
.insert(id, EntitiesSubscriber { clauses, sender });
79+
.insert(id, EntitiesSubscriber { clause, sender });
8080
}
8181

8282
pub(super) async fn remove_subscriber(&self, id: u64) {
@@ -144,8 +144,10 @@ impl Service {
144144

145145
// If we have a clause of keys, then check that the key pattern of the entity
146146
// matches the key pattern of the subscriber.
147-
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
148-
continue;
147+
if let Some(clause) = &sub.clause {
148+
if !match_entity(hashed, &keys, &entity.updated_model, clause) {
149+
continue;
150+
}
149151
}
150152

151153
if entity.deleted {

crates/grpc/server/src/subscriptions/event.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use tokio::sync::mpsc::{
1313
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
1414
};
1515
use tokio::sync::RwLock;
16+
use torii_proto::KeysClause;
1617
use torii_sqlite::constants::SQL_FELT_DELIMITER;
1718
use torii_sqlite::error::{Error, ParseError};
1819
use torii_sqlite::simple_broker::SimpleBroker;
@@ -22,14 +23,13 @@ use tracing::{error, trace};
2223
use super::match_keys;
2324
use torii_proto::proto::types::Event as ProtoEvent;
2425
use torii_proto::proto::world::SubscribeEventsResponse;
25-
use torii_proto::EntityKeysClause;
2626

2727
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event";
2828

2929
#[derive(Debug)]
3030
pub struct EventSubscriber {
3131
/// Event keys that the subscriber is interested in
32-
keys: Vec<EntityKeysClause>,
32+
keys: Vec<KeysClause>,
3333
/// The channel to send the response back to the subscriber.
3434
sender: Sender<Result<SubscribeEventsResponse, tonic::Status>>,
3535
}
@@ -42,7 +42,7 @@ pub struct EventManager {
4242
impl EventManager {
4343
pub async fn add_subscriber(
4444
&self,
45-
keys: Vec<EntityKeysClause>,
45+
keys: Vec<KeysClause>,
4646
) -> Result<Receiver<Result<SubscribeEventsResponse, tonic::Status>>, Error> {
4747
let id = rand::thread_rng().gen::<usize>();
4848
let (sender, receiver) = channel(1);

0 commit comments

Comments
 (0)