Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use dojo_world::contracts::model::ModelError;
use starknet::core::types::Felt;
use starknet::core::utils::{CairoShortStringToFeltError, ParseCairoShortStringError};
use torii_proto::schema::SchemaError;
use torii_proto::error::ProtoError;

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -22,7 +22,7 @@ pub enum Error {
#[error("Unsupported query")]
UnsupportedQuery,
#[error(transparent)]
Schema(#[from] SchemaError),
Proto(#[from] ProtoError),
}

#[derive(Debug, thiserror::Error)]
Expand Down
20 changes: 10 additions & 10 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use torii_proto::proto::world::{
};
use torii_proto::schema::Entity;
use torii_proto::{
Controller, EntityKeysClause, Event, EventQuery, Page, Query, Token, TokenBalance,
Clause, Controller, Event, EventQuery, KeysClause, Page, Query, Token, TokenBalance,
};

use crate::error::Error;
Expand Down Expand Up @@ -212,53 +212,53 @@ impl Client {
/// A direct stream to grpc subscribe entities
pub async fn on_entity_updated(
&self,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_entities(clauses).await?;
let stream = grpc_client.subscribe_entities(clause).await?;
Ok(stream)
}

/// Update the entities subscription
pub async fn update_entity_subscription(
&self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<(), Error> {
let mut grpc_client = self.inner.write().await;
grpc_client
.update_entities_subscription(subscription_id, clauses)
.update_entities_subscription(subscription_id, clause)
.await?;
Ok(())
}

/// A direct stream to grpc subscribe event messages
pub async fn on_event_message_updated(
&self,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<EntityUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_event_messages(clauses).await?;
let stream = grpc_client.subscribe_event_messages(clause).await?;
Ok(stream)
}

/// Update the event messages subscription
pub async fn update_event_message_subscription(
&self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<(), Error> {
let mut grpc_client = self.inner.write().await;
grpc_client
.update_event_messages_subscription(subscription_id, clauses)
.update_event_messages_subscription(subscription_id, clause)
.await?;
Ok(())
}

/// A direct stream to grpc subscribe starknet events
pub async fn on_starknet_event(
&self,
keys: Vec<EntityKeysClause>,
keys: Vec<KeysClause>,
) -> Result<EventUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_events(keys).await?;
Expand Down
40 changes: 21 additions & 19 deletions crates/grpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tonic::codec::CompressionEncoding;
#[cfg(not(target_arch = "wasm32"))]
use tonic::transport::Endpoint;

use torii_proto::error::ProtoError;
use torii_proto::proto::world::{
world_client, RetrieveControllersRequest, RetrieveControllersResponse, RetrieveEntitiesRequest,
RetrieveEntitiesResponse, RetrieveEventMessagesRequest, RetrieveEventsRequest,
Expand All @@ -28,8 +29,10 @@ use torii_proto::proto::world::{
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
UpdateTokenSubscriptionRequest, WorldMetadataRequest,
};
use torii_proto::schema::{Entity, SchemaError};
use torii_proto::{EntityKeysClause, Event, EventQuery, IndexerUpdate, Query, Token, TokenBalance};
use torii_proto::schema::Entity;
use torii_proto::{
Clause, Event, EventQuery, IndexerUpdate, KeysClause, Query, Token, TokenBalance,
};

pub use torii_proto as types;

Expand All @@ -48,7 +51,7 @@ pub enum Error {
#[error(transparent)]
Transport(tonic::transport::Error),
#[error(transparent)]
Schema(#[from] SchemaError),
Proto(#[from] ProtoError),
}

#[derive(Debug)]
Expand Down Expand Up @@ -98,11 +101,11 @@ impl WorldClient {
.and_then(|res| {
res.into_inner()
.metadata
.ok_or(Error::Schema(SchemaError::MissingExpectedData(
.ok_or(Error::Proto(ProtoError::MissingExpectedData(
"metadata".to_string(),
)))
})
.and_then(|metadata| metadata.try_into().map_err(Error::ParseStr))
.and_then(|metadata| metadata.try_into().map_err(Error::Proto))
}

pub async fn retrieve_controllers(
Expand Down Expand Up @@ -304,12 +307,13 @@ impl WorldClient {
/// Subscribe to entities updates of a World.
pub async fn subscribe_entities(
&mut self,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<EntityUpdateStreaming, Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
.subscribe_entities(SubscribeEntitiesRequest { clauses })
.subscribe_entities(SubscribeEntitiesRequest {
clause: clause.map(|c| c.into()),
})
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Expand Down Expand Up @@ -337,14 +341,12 @@ impl WorldClient {
pub async fn update_entities_subscription(
&mut self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<(), Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();

self.inner
.update_entities_subscription(UpdateEntitiesSubscriptionRequest {
subscription_id,
clauses,
clause: clause.map(|c| c.into()),
})
.await
.map_err(Error::Grpc)
Expand All @@ -354,12 +356,13 @@ impl WorldClient {
/// Subscribe to event messages of a World.
pub async fn subscribe_event_messages(
&mut self,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<EntityUpdateStreaming, Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
let stream = self
.inner
.subscribe_event_messages(SubscribeEventMessagesRequest { clauses })
.subscribe_event_messages(SubscribeEventMessagesRequest {
clause: clause.map(|c| c.into()),
})
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Expand Down Expand Up @@ -387,13 +390,12 @@ impl WorldClient {
pub async fn update_event_messages_subscription(
&mut self,
subscription_id: u64,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<(), Error> {
let clauses = clauses.into_iter().map(|c| c.into()).collect();
self.inner
.update_event_messages_subscription(UpdateEventMessagesSubscriptionRequest {
subscription_id,
clauses,
clause: clause.map(|c| c.into()),
})
.await
.map_err(Error::Grpc)
Expand All @@ -403,7 +405,7 @@ impl WorldClient {
/// Subscribe to the events of a World.
pub async fn subscribe_events(
&mut self,
keys: Vec<EntityKeysClause>,
keys: Vec<KeysClause>,
) -> Result<EventUpdateStreaming, Error> {
let keys = keys.into_iter().map(|c| c.into()).collect();

Expand Down
40 changes: 26 additions & 14 deletions crates/grpc/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use tonic_web::GrpcWebLayer;
use torii_proto::error::ProtoError;
use torii_sqlite::cache::ModelCache;
use torii_sqlite::error::{ParseError, QueryError};
use torii_sqlite::model::{fetch_entities, map_row_to_ty};
Expand Down Expand Up @@ -1525,10 +1526,15 @@ impl proto::world::world_server::World for DojoWorld {
&self,
request: Request<SubscribeEntitiesRequest>,
) -> ServiceResult<Self::SubscribeEntitiesStream> {
let SubscribeEntitiesRequest { clauses } = request.into_inner();
let SubscribeEntitiesRequest { clause } = request.into_inner();
let clause = clause
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;

let rx = self
.entity_manager
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
.add_subscriber(clause)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Expand All @@ -1543,13 +1549,14 @@ impl proto::world::world_server::World for DojoWorld {
) -> ServiceResult<()> {
let UpdateEntitiesSubscriptionRequest {
subscription_id,
clauses,
clause,
} = request.into_inner();
let clause = clause
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
self.entity_manager
.update_subscriber(
subscription_id,
clauses.into_iter().map(|keys| keys.into()).collect(),
)
.update_subscriber(subscription_id, clause)
.await;

Ok(Response::new(()))
Expand Down Expand Up @@ -1625,10 +1632,14 @@ impl proto::world::world_server::World for DojoWorld {
&self,
request: Request<SubscribeEventMessagesRequest>,
) -> ServiceResult<Self::SubscribeEntitiesStream> {
let SubscribeEventMessagesRequest { clauses } = request.into_inner();
let SubscribeEventMessagesRequest { clause } = request.into_inner();
let clause = clause
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
let rx = self
.event_message_manager
.add_subscriber(clauses.into_iter().map(|keys| keys.into()).collect())
.add_subscriber(clause)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Expand All @@ -1643,13 +1654,14 @@ impl proto::world::world_server::World for DojoWorld {
) -> ServiceResult<()> {
let UpdateEventMessagesSubscriptionRequest {
subscription_id,
clauses,
clause,
} = request.into_inner();
let clause = clause
.map(|c| c.try_into())
.transpose()
.map_err(|e: ProtoError| Status::internal(e.to_string()))?;
self.event_message_manager
.update_subscriber(
subscription_id,
clauses.into_iter().map(|keys| keys.into()).collect(),
)
.update_subscriber(subscription_id, clause)
.await;

Ok(Response::new(()))
Expand Down
22 changes: 12 additions & 10 deletions crates/grpc/server/src/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::OptimisticEntity;
use tracing::{error, trace};

use super::match_entity_keys;
use super::match_entity;
use torii_proto::proto::world::SubscribeEntityResponse;
use torii_proto::EntityKeysClause;
use torii_proto::Clause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";

#[derive(Debug)]
pub struct EntitiesSubscriber {
/// Entity ids that the subscriber is interested in
pub(crate) clauses: Vec<EntityKeysClause>,
/// The clause that the subscriber is interested in
pub(crate) clause: Option<Clause>,
/// The channel to send the response back to the subscriber.
pub(crate) sender: Sender<Result<SubscribeEntityResponse, tonic::Status>>,
}
Expand All @@ -40,7 +40,7 @@ pub struct EntityManager {
impl EntityManager {
pub async fn add_subscriber(
&self,
clauses: Vec<EntityKeysClause>,
clause: Option<Clause>,
) -> Result<Receiver<Result<SubscribeEntityResponse, tonic::Status>>, Error> {
let subscription_id = rand::thread_rng().gen::<u64>();
let (sender, receiver) = channel(1);
Expand All @@ -58,12 +58,12 @@ impl EntityManager {
self.subscribers
.write()
.await
.insert(subscription_id, EntitiesSubscriber { clauses, sender });
.insert(subscription_id, EntitiesSubscriber { clause, sender });

Ok(receiver)
}

pub async fn update_subscriber(&self, id: u64, clauses: Vec<EntityKeysClause>) {
pub async fn update_subscriber(&self, id: u64, clause: Option<Clause>) {
let sender = {
let subscribers = self.subscribers.read().await;
if let Some(subscriber) = subscribers.get(&id) {
Expand All @@ -76,7 +76,7 @@ impl EntityManager {
self.subscribers
.write()
.await
.insert(id, EntitiesSubscriber { clauses, sender });
.insert(id, EntitiesSubscriber { clause, sender });
}

pub(super) async fn remove_subscriber(&self, id: u64) {
Expand Down Expand Up @@ -144,8 +144,10 @@ impl Service {

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
continue;
if let Some(clause) = &sub.clause {
if !match_entity(hashed, &keys, &entity.updated_model, clause) {
continue;
}
}

if entity.deleted {
Expand Down
6 changes: 3 additions & 3 deletions crates/grpc/server/src/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_proto::KeysClause;
use torii_sqlite::constants::SQL_FELT_DELIMITER;
use torii_sqlite::error::{Error, ParseError};
use torii_sqlite::simple_broker::SimpleBroker;
Expand All @@ -22,14 +23,13 @@ use tracing::{error, trace};
use super::match_keys;
use torii_proto::proto::types::Event as ProtoEvent;
use torii_proto::proto::world::SubscribeEventsResponse;
use torii_proto::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event";

#[derive(Debug)]
pub struct EventSubscriber {
/// Event keys that the subscriber is interested in
keys: Vec<EntityKeysClause>,
keys: Vec<KeysClause>,
/// The channel to send the response back to the subscriber.
sender: Sender<Result<SubscribeEventsResponse, tonic::Status>>,
}
Expand All @@ -42,7 +42,7 @@ pub struct EventManager {
impl EventManager {
pub async fn add_subscriber(
&self,
keys: Vec<EntityKeysClause>,
keys: Vec<KeysClause>,
) -> Result<Receiver<Result<SubscribeEventsResponse, tonic::Status>>, Error> {
let id = rand::thread_rng().gen::<usize>();
let (sender, receiver) = channel(1);
Expand Down
Loading