Skip to content

Commit f3f83b4

Browse files
committed
use clause in subscriptions and proto
1 parent af4d617 commit f3f83b4

File tree

8 files changed

+6023
-3826
lines changed

8 files changed

+6023
-3826
lines changed

Cargo.lock

Lines changed: 5742 additions & 3680 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/grpc/server/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,10 +1526,10 @@ impl proto::world::world_server::World for DojoWorld {
15261526
&self,
15271527
request: Request<SubscribeEntitiesRequest>,
15281528
) -> ServiceResult<Self::SubscribeEntitiesStream> {
1529-
let SubscribeEntitiesRequest { clauses } = request.into_inner();
1529+
let SubscribeEntitiesRequest { clause } = request.into_inner();
15301530
let rx = self
15311531
.entity_manager
1532-
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
1532+
.add_subscriber(clause.into())
15331533
.await
15341534
.map_err(|e| Status::internal(e.to_string()))?;
15351535

@@ -1544,12 +1544,12 @@ impl proto::world::world_server::World for DojoWorld {
15441544
) -> ServiceResult<()> {
15451545
let UpdateEntitiesSubscriptionRequest {
15461546
subscription_id,
1547-
clauses,
1547+
clause,
15481548
} = request.into_inner();
15491549
self.entity_manager
15501550
.update_subscriber(
15511551
subscription_id,
1552-
clauses.into_iter().map(|keys| keys.into()).collect(),
1552+
clause.into(),
15531553
)
15541554
.await;
15551555

@@ -1626,10 +1626,10 @@ impl proto::world::world_server::World for DojoWorld {
16261626
&self,
16271627
request: Request<SubscribeEventMessagesRequest>,
16281628
) -> ServiceResult<Self::SubscribeEntitiesStream> {
1629-
let SubscribeEventMessagesRequest { clauses } = request.into_inner();
1629+
let SubscribeEventMessagesRequest { clause } = request.into_inner();
16301630
let rx = self
16311631
.event_message_manager
1632-
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
1632+
.add_subscriber(clause.into())
16331633
.await
16341634
.map_err(|e| Status::internal(e.to_string()))?;
16351635

@@ -1644,12 +1644,12 @@ impl proto::world::world_server::World for DojoWorld {
16441644
) -> ServiceResult<()> {
16451645
let UpdateEventMessagesSubscriptionRequest {
16461646
subscription_id,
1647-
clauses,
1647+
clause,
16481648
} = request.into_inner();
16491649
self.event_message_manager
16501650
.update_subscriber(
16511651
subscription_id,
1652-
clauses.into_iter().map(|keys| keys.into()).collect(),
1652+
clause.into(),
16531653
)
16541654
.await;
16551655

crates/grpc/server/src/subscriptions/entity.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use torii_sqlite::simple_broker::SimpleBroker;
1919
use torii_sqlite::types::OptimisticEntity;
2020
use tracing::{error, trace};
2121

22-
use super::match_entity_keys;
22+
use super::{match_entity};
2323
use torii_proto::proto::world::SubscribeEntityResponse;
24-
use torii_proto::EntityKeysClause;
24+
use torii_proto::Clause;
2525

2626
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";
2727

2828
#[derive(Debug)]
2929
pub struct EntitiesSubscriber {
30-
/// Entity ids that the subscriber is interested in
31-
pub(crate) clauses: Vec<EntityKeysClause>,
30+
/// The clause that the subscriber is interested in
31+
pub(crate) clause: Clause,
3232
/// The channel to send the response back to the subscriber.
3333
pub(crate) sender: Sender<Result<SubscribeEntityResponse, tonic::Status>>,
3434
}
@@ -40,7 +40,7 @@ pub struct EntityManager {
4040
impl EntityManager {
4141
pub async fn add_subscriber(
4242
&self,
43-
clauses: Vec<EntityKeysClause>,
43+
clause: Clause,
4444
) -> Result<Receiver<Result<SubscribeEntityResponse, tonic::Status>>, Error> {
4545
let subscription_id = rand::thread_rng().gen::<u64>();
4646
let (sender, receiver) = channel(1);
@@ -58,12 +58,12 @@ impl EntityManager {
5858
self.subscribers
5959
.write()
6060
.await
61-
.insert(subscription_id, EntitiesSubscriber { clauses, sender });
61+
.insert(subscription_id, EntitiesSubscriber { clause, sender });
6262

6363
Ok(receiver)
6464
}
6565

66-
pub async fn update_subscriber(&self, id: u64, clauses: Vec<EntityKeysClause>) {
66+
pub async fn update_subscriber(&self, id: u64, clause: Clause) {
6767
let sender = {
6868
let subscribers = self.subscribers.read().await;
6969
if let Some(subscriber) = subscribers.get(&id) {
@@ -76,7 +76,7 @@ impl EntityManager {
7676
self.subscribers
7777
.write()
7878
.await
79-
.insert(id, EntitiesSubscriber { clauses, sender });
79+
.insert(id, EntitiesSubscriber { clause, sender });
8080
}
8181

8282
pub(super) async fn remove_subscriber(&self, id: u64) {
@@ -144,7 +144,7 @@ impl Service {
144144

145145
// If we have a clause of keys, then check that the key pattern of the entity
146146
// matches the key pattern of the subscriber.
147-
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
147+
if !match_entity(hashed, &keys, &entity.updated_model, &sub.clause) {
148148
continue;
149149
}
150150

crates/grpc/server/src/subscriptions/event.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use tokio::sync::mpsc::{
1313
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
1414
};
1515
use tokio::sync::RwLock;
16+
use torii_proto::KeysClause;
1617
use torii_sqlite::constants::SQL_FELT_DELIMITER;
1718
use torii_sqlite::error::{Error, ParseError};
1819
use torii_sqlite::simple_broker::SimpleBroker;
@@ -22,14 +23,13 @@ use tracing::{error, trace};
2223
use super::match_keys;
2324
use torii_proto::proto::types::Event as ProtoEvent;
2425
use torii_proto::proto::world::SubscribeEventsResponse;
25-
use torii_proto::EntityKeysClause;
2626

2727
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event";
2828

2929
#[derive(Debug)]
3030
pub struct EventSubscriber {
3131
/// Event keys that the subscriber is interested in
32-
keys: Vec<EntityKeysClause>,
32+
keys: Vec<KeysClause>,
3333
/// The channel to send the response back to the subscriber.
3434
sender: Sender<Result<SubscribeEventsResponse, tonic::Status>>,
3535
}
@@ -42,7 +42,7 @@ pub struct EventManager {
4242
impl EventManager {
4343
pub async fn add_subscriber(
4444
&self,
45-
keys: Vec<EntityKeysClause>,
45+
keys: Vec<KeysClause>,
4646
) -> Result<Receiver<Result<SubscribeEventsResponse, tonic::Status>>, Error> {
4747
let id = rand::thread_rng().gen::<usize>();
4848
let (sender, receiver) = channel(1);

crates/grpc/server/src/subscriptions/event_message.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,23 @@ use tokio::sync::mpsc::{
1414
};
1515
use tokio::sync::RwLock;
1616
use torii_proto::proto::types::Entity;
17-
use torii_proto::EntityKeysClause;
17+
use torii_proto::Clause;
1818
use torii_sqlite::constants::SQL_FELT_DELIMITER;
1919
use torii_sqlite::error::{Error, ParseError};
2020
use torii_sqlite::simple_broker::SimpleBroker;
2121
use torii_sqlite::types::OptimisticEventMessage;
2222
use tracing::{error, trace};
2323

24-
use super::match_entity_keys;
2524
use torii_proto::proto::world::SubscribeEntityResponse;
2625

26+
use super::match_entity;
27+
2728
pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";
2829

2930
#[derive(Debug)]
3031
pub struct EventMessageSubscriber {
31-
/// Entity ids that the subscriber is interested in
32-
pub(crate) clauses: Vec<EntityKeysClause>,
32+
/// The clause that the subscriber is interested in
33+
pub(crate) clause: Clause,
3334
/// The channel to send the response back to the subscriber.
3435
pub(crate) sender: Sender<Result<SubscribeEntityResponse, tonic::Status>>,
3536
}
@@ -42,7 +43,7 @@ pub struct EventMessageManager {
4243
impl EventMessageManager {
4344
pub async fn add_subscriber(
4445
&self,
45-
clauses: Vec<EntityKeysClause>,
46+
clause: Clause,
4647
) -> Result<Receiver<Result<SubscribeEntityResponse, tonic::Status>>, Error> {
4748
let subscription_id = rand::thread_rng().gen::<u64>();
4849
let (sender, receiver) = channel(1);
@@ -60,12 +61,12 @@ impl EventMessageManager {
6061
self.subscribers
6162
.write()
6263
.await
63-
.insert(subscription_id, EventMessageSubscriber { clauses, sender });
64+
.insert(subscription_id, EventMessageSubscriber { clause, sender });
6465

6566
Ok(receiver)
6667
}
6768

68-
pub async fn update_subscriber(&self, id: u64, clauses: Vec<EntityKeysClause>) {
69+
pub async fn update_subscriber(&self, id: u64, clause: Clause) {
6970
let sender = {
7071
let subscribers = self.subscribers.read().await;
7172
if let Some(subscriber) = subscribers.get(&id) {
@@ -78,7 +79,7 @@ impl EventMessageManager {
7879
self.subscribers
7980
.write()
8081
.await
81-
.insert(id, EventMessageSubscriber { clauses, sender });
82+
.insert(id, EventMessageSubscriber { clause, sender });
8283
}
8384

8485
pub(super) async fn remove_subscriber(&self, id: u64) {
@@ -138,7 +139,7 @@ impl Service {
138139

139140
// If we have a clause of keys, then check that the key pattern of the entity
140141
// matches the key pattern of the subscriber.
141-
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
142+
if !match_entity(hashed, &keys, &entity.updated_model, &sub.clause) {
142143
continue;
143144
}
144145

0 commit comments

Comments
 (0)