Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/broker/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ pub type TokenTransferUpdate = Update<torii_proto::TokenTransfer>;
pub type EventUpdate = Update<torii_proto::EventWithMetadata>;
pub type TransactionUpdate = Update<torii_proto::Transaction>;
pub type AggregationUpdate = Update<torii_proto::AggregationEntry>;
pub type ActivityUpdate = Update<torii_proto::Activity>;
91 changes: 81 additions & 10 deletions crates/grpc/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use proto::world::{
};
use starknet::core::types::Felt;
use starknet::providers::Provider;
use subscriptions::activity::ActivityManager;
use subscriptions::aggregation::AggregationManager;
use subscriptions::contract::ContractManager;
use subscriptions::event::EventManager;
Expand Down Expand Up @@ -52,16 +53,17 @@ use torii_proto::proto::world::{
RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse,
RetrieveTokenContractsRequest, RetrieveTokenContractsResponse, RetrieveTokenTransfersRequest,
RetrieveTokenTransfersResponse, RetrieveTokensRequest, RetrieveTokensResponse,
RetrieveTransactionsRequest, RetrieveTransactionsResponse, SubscribeAggregationsRequest,
SubscribeAggregationsResponse, SubscribeContractsRequest, SubscribeContractsResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeTokenBalancesRequest, SubscribeTokenBalancesResponse,
SubscribeTokenTransfersRequest, SubscribeTokenTransfersResponse, SubscribeTokensRequest,
SubscribeTokensResponse, SubscribeTransactionsRequest, SubscribeTransactionsResponse,
UpdateAggregationsSubscriptionRequest, UpdateAggregationsSubscriptionResponse,
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
UpdateTokenSubscriptionRequest, UpdateTokenTransfersSubscriptionRequest, WorldMetadataRequest,
WorldMetadataResponse,
RetrieveTransactionsRequest, RetrieveTransactionsResponse, SubscribeActivitiesRequest,
SubscribeActivitiesResponse, SubscribeAggregationsRequest, SubscribeAggregationsResponse,
SubscribeContractsRequest, SubscribeContractsResponse, SubscribeEntitiesRequest,
SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse,
SubscribeTokenBalancesRequest, SubscribeTokenBalancesResponse, SubscribeTokenTransfersRequest,
SubscribeTokenTransfersResponse, SubscribeTokensRequest, SubscribeTokensResponse,
SubscribeTransactionsRequest, SubscribeTransactionsResponse,
UpdateActivitiesSubscriptionRequest, UpdateAggregationsSubscriptionRequest,
UpdateAggregationsSubscriptionResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, UpdateTokenSubscriptionRequest,
UpdateTokenTransfersSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
};
use torii_proto::proto::{self};
use torii_proto::Message;
Expand All @@ -86,6 +88,7 @@ pub struct DojoWorld<P: Provider + Sync> {
token_transfer_manager: Arc<TokenTransferManager>,
transaction_manager: Arc<TransactionManager>,
aggregation_manager: Arc<AggregationManager>,
activity_manager: Arc<ActivityManager>,
pool: SqlitePool,
_config: GrpcConfig,
}
Expand All @@ -108,6 +111,7 @@ impl<P: Provider + Sync> DojoWorld<P> {
let token_transfer_manager = Arc::new(TokenTransferManager::new(config.clone()));
let transaction_manager = Arc::new(TransactionManager::new(config.clone()));
let aggregation_manager = Arc::new(AggregationManager::new(config.clone()));
let activity_manager = Arc::new(ActivityManager::new(config.clone()));

// Spawn subscription services on the main runtime
// They use try_send and non-blocking operations to avoid starving other tasks
Expand Down Expand Up @@ -147,6 +151,10 @@ impl<P: Provider + Sync> DojoWorld<P> {
&aggregation_manager,
)));

tokio::spawn(subscriptions::activity::Service::new(Arc::clone(
&activity_manager,
)));

Self {
storage,
messaging,
Expand All @@ -161,6 +169,7 @@ impl<P: Provider + Sync> DojoWorld<P> {
token_transfer_manager,
transaction_manager,
aggregation_manager,
activity_manager,
pool,
_config: config,
}
Expand Down Expand Up @@ -243,6 +252,8 @@ type SubscribeTransactionsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTransactionsResponse, Status>> + Send>>;
type SubscribeAggregationsResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeAggregationsResponse, Status>> + Send>>;
type SubscribeActivitiesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeActivitiesResponse, Status>> + Send>>;

#[tonic::async_trait]
impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for DojoWorld<P> {
Expand All @@ -255,6 +266,7 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
type SubscribeTokenTransfersStream = SubscribeTokenTransfersResponseStream;
type SubscribeTransactionsStream = SubscribeTransactionsResponseStream;
type SubscribeAggregationsStream = SubscribeAggregationsResponseStream;
type SubscribeActivitiesStream = SubscribeActivitiesResponseStream;

async fn world_metadata(
&self,
Expand Down Expand Up @@ -487,6 +499,65 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
}))
}

async fn subscribe_activities(
&self,
request: Request<SubscribeActivitiesRequest>,
) -> ServiceResult<Self::SubscribeActivitiesStream> {
let SubscribeActivitiesRequest {
world_addresses,
namespaces,
caller_addresses,
} = request.into_inner();

let filter = subscriptions::activity::ActivityFilter {
world_addresses: world_addresses
.into_iter()
.map(|addr| Felt::from_bytes_be_slice(&addr))
.collect(),
namespaces,
caller_addresses: caller_addresses
.into_iter()
.map(|addr| Felt::from_bytes_be_slice(&addr))
.collect(),
};

let rx = self.activity_manager.add_subscriber(filter).await;

Ok(Response::new(
Box::pin(ReceiverStream::new(rx)) as Self::SubscribeActivitiesStream
))
}

async fn update_activities_subscription(
&self,
request: Request<UpdateActivitiesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateActivitiesSubscriptionRequest {
subscription_id,
world_addresses,
namespaces,
caller_addresses,
} = request.into_inner();

let filter = subscriptions::activity::ActivityFilter {
world_addresses: world_addresses
.into_iter()
.map(|addr| Felt::from_bytes_be_slice(&addr))
.collect(),
namespaces,
caller_addresses: caller_addresses
.into_iter()
.map(|addr| Felt::from_bytes_be_slice(&addr))
.collect(),
};

self.activity_manager
.update_subscriber(subscription_id, filter)
.await;

Ok(Response::new(()))
}

async fn retrieve_contracts(
&self,
request: Request<RetrieveContractsRequest>,
Expand Down
197 changes: 197 additions & 0 deletions crates/grpc/server/src/subscriptions/activity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use dashmap::DashMap;
use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet_crypto::Felt;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use torii_broker::types::ActivityUpdate;
use torii_broker::MemoryBroker;
use torii_proto::Activity;
use tracing::{error, trace};

use crate::GrpcConfig;
use torii_proto::proto::world::SubscribeActivitiesResponse;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::activity";

#[derive(Debug, Default, Clone)]
pub struct ActivityFilter {
pub world_addresses: Vec<Felt>,
pub namespaces: Vec<String>,
pub caller_addresses: Vec<Felt>,
}

impl ActivityFilter {
pub fn matches(&self, activity: &Activity) -> bool {
// If no filters specified, match all
if self.world_addresses.is_empty()
&& self.namespaces.is_empty()
&& self.caller_addresses.is_empty()
{
return true;
}

// Check world_address filter
let world_match = self.world_addresses.is_empty()
|| self.world_addresses.contains(&activity.world_address);

// Check namespace filter
let namespace_match =
self.namespaces.is_empty() || self.namespaces.contains(&activity.namespace);

// Check caller_address filter
let caller_match = self.caller_addresses.is_empty()
|| self.caller_addresses.contains(&activity.caller_address);

world_match && namespace_match && caller_match
}
}

#[derive(Debug)]
pub struct ActivitySubscriber {
pub(crate) filter: ActivityFilter,
pub(crate) sender: Sender<Result<SubscribeActivitiesResponse, tonic::Status>>,
}

#[derive(Debug)]
pub struct ActivityManager {
subscribers: DashMap<u64, ActivitySubscriber>,
config: GrpcConfig,
}

impl ActivityManager {
pub fn new(config: GrpcConfig) -> Self {
Self {
subscribers: DashMap::new(),
config,
}
}

pub async fn add_subscriber(
&self,
filter: ActivityFilter,
) -> Receiver<Result<SubscribeActivitiesResponse, tonic::Status>> {
let subscription_id = rand::thread_rng().gen::<u64>();
let (sender, receiver) = channel(self.config.subscription_buffer_size);

// Send initial empty message to establish stream
let _ = sender
.send(Ok(SubscribeActivitiesResponse {
activity: None,
subscription_id,
}))
.await;

self.subscribers
.insert(subscription_id, ActivitySubscriber { filter, sender });

receiver
}

pub async fn update_subscriber(&self, id: u64, filter: ActivityFilter) {
if let Some(mut subscriber) = self.subscribers.get_mut(&id) {
subscriber.filter = filter;
}
}

pub(super) async fn remove_subscriber(&self, id: u64) {
self.subscribers.remove(&id);
}
}

#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
simple_broker: Pin<Box<dyn Stream<Item = Activity> + Send>>,
activity_sender: UnboundedSender<Activity>,
}

impl Service {
pub fn new(subs_manager: Arc<ActivityManager>) -> Self {
let (activity_sender, activity_receiver) = unbounded_channel();
let service = Self {
simple_broker: if subs_manager.config.optimistic {
Box::pin(MemoryBroker::<ActivityUpdate>::subscribe_optimistic())
} else {
Box::pin(MemoryBroker::<ActivityUpdate>::subscribe())
},
activity_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, activity_receiver));

service
}

async fn publish_updates(
subs: Arc<ActivityManager>,
mut activity_receiver: UnboundedReceiver<Activity>,
) {
while let Some(update) = activity_receiver.recv().await {
Self::process_activity_update(&subs, &update).await;
}
}

async fn process_activity_update(subs: &Arc<ActivityManager>, activity: &Activity) {
let mut closed_stream = Vec::new();

for sub in subs.subscribers.iter() {
let idx = sub.key();
let sub = sub.value();

// Check if the subscriber is interested in this activity
if !sub.filter.matches(activity) {
continue;
}

let resp = SubscribeActivitiesResponse {
activity: Some(activity.clone().into()),
subscription_id: *idx,
};

// Use try_send to avoid blocking on slow subscribers
match sub.sender.try_send(Ok(resp)) {
Ok(_) => {
// Message sent successfully
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
// Channel is full, subscriber is too slow - disconnect them
trace!(target = LOG_TARGET, subscription_id = %idx, "Disconnecting slow subscriber - channel full");
closed_stream.push(*idx);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
// Channel is closed, subscriber has disconnected
closed_stream.push(*idx);
}
}
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing activity stream.");
subs.remove_subscriber(id).await
}
}
}

impl Future for Service {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(activity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.activity_sender.send(activity) {
error!(target = LOG_TARGET, error = %e, "Sending activity update to processor.");
}
}

Poll::Pending
}
}
2 changes: 1 addition & 1 deletion crates/grpc/server/src/subscriptions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct AggregationSubscriber {
pub(crate) sender: Sender<Result<SubscribeAggregationsResponse, tonic::Status>>,
}

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct AggregationManager {
subscribers: DashMap<u64, AggregationSubscriber>,
config: GrpcConfig,
Expand Down
1 change: 1 addition & 0 deletions crates/grpc/server/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use torii_proto::{
Clause, ComparisonOperator, KeysClause, LogicalOperator, MemberValue, PatternMatching,
};

pub mod activity;
pub mod aggregation;
pub mod contract;
pub mod entity;
Expand Down
Loading
Loading