Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions crates/grpc/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ use sqlx::SqlitePool;
use torii_proto::proto::world::world_server::WorldServer;
use torii_proto::proto::world::{
PublishMessageBatchRequest, PublishMessageBatchResponse, PublishMessageRequest,
PublishMessageResponse, RetrieveAggregationsRequest, RetrieveAggregationsResponse,
RetrieveContractsRequest, RetrieveContractsResponse, RetrieveControllersRequest,
RetrieveControllersResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest,
RetrieveTokenBalancesResponse, RetrieveTokenContractsRequest, RetrieveTokenContractsResponse,
RetrieveTokenTransfersRequest, RetrieveTokenTransfersResponse, RetrieveTokensRequest,
RetrieveTokensResponse, RetrieveTransactionsRequest, RetrieveTransactionsResponse,
SubscribeAggregationsRequest, SubscribeAggregationsResponse, SubscribeContractsRequest,
SubscribeContractsResponse, SubscribeEntitiesRequest, SubscribeEntityResponse,
SubscribeEventMessagesRequest, SubscribeEventsResponse, SubscribeTokenBalancesRequest,
SubscribeTokenBalancesResponse, SubscribeTokenTransfersRequest,
SubscribeTokenTransfersResponse, SubscribeTokensRequest, SubscribeTokensResponse,
SubscribeTransactionsRequest, SubscribeTransactionsResponse,
PublishMessageResponse, RetrieveActivitiesRequest, RetrieveActivitiesResponse,
RetrieveAggregationsRequest, RetrieveAggregationsResponse, RetrieveContractsRequest,
RetrieveContractsResponse, RetrieveControllersRequest, RetrieveControllersResponse,
RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, RetrieveTokenBalancesResponse,
RetrieveTokenContractsRequest, RetrieveTokenContractsResponse, RetrieveTokenTransfersRequest,
RetrieveTokenTransfersResponse, RetrieveTokensRequest, RetrieveTokensResponse,
RetrieveTransactionsRequest, RetrieveTransactionsResponse, SubscribeAggregationsRequest,
SubscribeAggregationsResponse, SubscribeContractsRequest, SubscribeContractsResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeTokenBalancesRequest, SubscribeTokenBalancesResponse,
SubscribeTokenTransfersRequest, SubscribeTokenTransfersResponse, SubscribeTokensRequest,
SubscribeTokensResponse, SubscribeTransactionsRequest, SubscribeTransactionsResponse,
UpdateAggregationsSubscriptionRequest, UpdateAggregationsSubscriptionResponse,
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
UpdateTokenSubscriptionRequest, UpdateTokenTransfersSubscriptionRequest, WorldMetadataRequest,
Expand Down Expand Up @@ -465,6 +465,28 @@ impl<P: Provider + Sync + Send + 'static> proto::world::world_server::World for
Ok(Response::new(UpdateAggregationsSubscriptionResponse {}))
}

async fn retrieve_activities(
&self,
request: Request<RetrieveActivitiesRequest>,
) -> Result<Response<RetrieveActivitiesResponse>, Status> {
let RetrieveActivitiesRequest { query } = request.into_inner();
let query = query
.ok_or_else(|| Status::invalid_argument("Missing query argument"))?
.try_into()
.map_err(|e: ProtoError| Status::invalid_argument(e.to_string()))?;

let activities = self
.storage
.activities(&query)
.await
.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(RetrieveActivitiesResponse {
activities: activities.items.into_iter().map(Into::into).collect(),
next_cursor: activities.next_cursor.unwrap_or_default(),
}))
}

async fn retrieve_contracts(
&self,
request: Request<RetrieveContractsRequest>,
Expand Down
36 changes: 36 additions & 0 deletions crates/proto/proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,42 @@ message TransactionQuery {
Pagination pagination = 2;
}

// Activity tracking for user sessions
message Activity {
// Unique identifier: world_address:namespace:caller_address:session_start_timestamp
string id = 1;
// World contract address
bytes world_address = 2;
// Namespace
string namespace = 3;
// Caller address
bytes caller_address = 4;
// Session start time
uint64 session_start = 5;
// Session end time
uint64 session_end = 6;
// Total action count in session
uint32 action_count = 7;
// Map of action names to call counts
map<string, uint32> actions = 8;
// Last update timestamp
uint64 updated_at = 9;
}

message ActivityQuery {
// Filter by world addresses
repeated bytes world_addresses = 1;
// Filter by namespaces
repeated string namespaces = 2;
// Filter by caller addresses
repeated bytes caller_addresses = 3;
// Filter by time range (unix timestamps)
optional uint64 from_time = 4;
optional uint64 to_time = 5;
// Pagination
Pagination pagination = 6;
}

enum ContractType {
WORLD = 0;
ERC20 = 1;
Expand Down
14 changes: 14 additions & 0 deletions crates/proto/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ service World {
// Update an aggregations subscription
rpc UpdateAggregationsSubscription (UpdateAggregationsSubscriptionRequest) returns (UpdateAggregationsSubscriptionResponse);

// Retrieve activities (user session tracking)
rpc RetrieveActivities (RetrieveActivitiesRequest) returns (RetrieveActivitiesResponse);

// Publish a torii offchain message
rpc PublishMessage (PublishMessageRequest) returns (PublishMessageResponse);

Expand Down Expand Up @@ -393,4 +396,15 @@ message UpdateAggregationsSubscriptionRequest {

// A response for updating an aggregations subscription
message UpdateAggregationsSubscriptionResponse {
}

// A request to retrieve activities
message RetrieveActivitiesRequest {
types.ActivityQuery query = 1;
}

// A response containing activities
message RetrieveActivitiesResponse {
string next_cursor = 1;
repeated types.Activity activities = 2;
}
86 changes: 86 additions & 0 deletions crates/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,92 @@ pub struct TransactionQuery {
pub pagination: Pagination,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct Activity {
pub id: String,
pub world_address: Felt,
pub namespace: String,
pub caller_address: Felt,
pub session_start: DateTime<Utc>,
pub session_end: DateTime<Utc>,
pub action_count: u32,
pub actions: HashMap<String, u32>, // Map of action name -> count
pub updated_at: DateTime<Utc>,
}

impl From<Activity> for proto::types::Activity {
fn from(value: Activity) -> Self {
Self {
id: value.id,
world_address: value.world_address.to_bytes_be().to_vec(),
namespace: value.namespace,
caller_address: value.caller_address.to_bytes_be().to_vec(),
session_start: value.session_start.timestamp() as u64,
session_end: value.session_end.timestamp() as u64,
action_count: value.action_count,
actions: value.actions,
updated_at: value.updated_at.timestamp() as u64,
}
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct ActivityQuery {
pub world_addresses: Vec<Felt>,
pub namespaces: Vec<String>,
pub caller_addresses: Vec<Felt>,
pub from_time: Option<DateTime<Utc>>,
pub to_time: Option<DateTime<Utc>>,
pub pagination: Pagination,
}

impl From<ActivityQuery> for proto::types::ActivityQuery {
fn from(value: ActivityQuery) -> Self {
Self {
world_addresses: value
.world_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
namespaces: value.namespaces,
caller_addresses: value
.caller_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
from_time: value.from_time.map(|t| t.timestamp() as u64),
to_time: value.to_time.map(|t| t.timestamp() as u64),
pagination: Some(value.pagination.into()),
}
}
}

impl TryFrom<proto::types::ActivityQuery> for ActivityQuery {
type Error = ProtoError;
fn try_from(value: proto::types::ActivityQuery) -> Result<Self, Self::Error> {
Ok(Self {
world_addresses: value
.world_addresses
.into_iter()
.map(|a| Felt::from_bytes_be_slice(&a))
.collect(),
namespaces: value.namespaces,
caller_addresses: value
.caller_addresses
.into_iter()
.map(|a| Felt::from_bytes_be_slice(&a))
.collect(),
from_time: value
.from_time
.map(|t| DateTime::from_timestamp(t as i64, 0).unwrap()),
to_time: value
.to_time
.map(|t| DateTime::from_timestamp(t as i64, 0).unwrap()),
pagination: value.pagination.map(|p| p.into()).unwrap_or_default(),
})
}
}

impl From<TransactionQuery> for proto::types::TransactionQuery {
fn from(value: TransactionQuery) -> Self {
Self {
Expand Down
84 changes: 79 additions & 5 deletions crates/sqlite/sqlite/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use starknet::core::types::U256;
use starknet_crypto::{poseidon_hash_many, Felt};
use torii_math::I256;
use torii_proto::{
schema::Entity, AggregationEntry, AggregationQuery, BalanceId, CallType, Clause,
CompositeClause, Contract, ContractCursor, ContractQuery, Controller, ControllerQuery, Event,
EventQuery, LogicalOperator, Model, OrderBy, OrderDirection, Page, Query, Token, TokenBalance,
TokenBalanceQuery, TokenContract, TokenContractQuery, TokenId, TokenQuery, TokenTransfer,
TokenTransferQuery, Transaction, TransactionCall, TransactionQuery,
schema::Entity, Activity, ActivityQuery, AggregationEntry, AggregationQuery, BalanceId,
CallType, Clause, CompositeClause, Contract, ContractCursor, ContractQuery, Controller,
ControllerQuery, Event, EventQuery, LogicalOperator, Model, OrderBy, OrderDirection, Page,
Query, Token, TokenBalance, TokenBalanceQuery, TokenContract, TokenContractQuery, TokenId,
TokenQuery, TokenTransfer, TokenTransferQuery, Transaction, TransactionCall, TransactionQuery,
};
use torii_sqlite_types::{HookEvent, Model as SQLModel};
use torii_storage::{ReadOnlyStorage, Storage, StorageError};
Expand Down Expand Up @@ -912,6 +912,80 @@ impl ReadOnlyStorage for Sql {
next_cursor: page.next_cursor,
})
}

/// Returns activities for the storage.
async fn activities(&self, query: &ActivityQuery) -> Result<Page<Activity>, StorageError> {
let executor = PaginationExecutor::new(self.pool.clone());
let mut query_builder = QueryBuilder::new("activities").select(&[
"id".to_string(),
"world_address".to_string(),
"namespace".to_string(),
"caller_address".to_string(),
"session_start".to_string(),
"session_end".to_string(),
"action_count".to_string(),
"actions".to_string(),
"updated_at".to_string(),
]);

if !query.world_addresses.is_empty() {
let placeholders = vec!["?"; query.world_addresses.len()].join(", ");
query_builder =
query_builder.where_clause(&format!("world_address IN ({})", placeholders));
for addr in &query.world_addresses {
query_builder = query_builder.bind_value(felt_to_sql_string(addr));
}
}

if !query.namespaces.is_empty() {
let placeholders = vec!["?"; query.namespaces.len()].join(", ");
query_builder = query_builder.where_clause(&format!("namespace IN ({})", placeholders));
for namespace in &query.namespaces {
query_builder = query_builder.bind_value(namespace.clone());
}
}

if !query.caller_addresses.is_empty() {
let placeholders = vec!["?"; query.caller_addresses.len()].join(", ");
query_builder =
query_builder.where_clause(&format!("caller_address IN ({})", placeholders));
for addr in &query.caller_addresses {
query_builder = query_builder.bind_value(felt_to_sql_string(addr));
}
}

if let Some(from_time) = &query.from_time {
query_builder = query_builder.where_clause("session_end >= ?");
query_builder = query_builder.bind_value(from_time.to_rfc3339());
}

if let Some(to_time) = &query.to_time {
query_builder = query_builder.where_clause("session_end <= ?");
query_builder = query_builder.bind_value(to_time.to_rfc3339());
}

let page = executor
.execute_paginated_query(
query_builder,
&query.pagination,
&OrderBy {
field: "session_end".to_string(),
direction: OrderDirection::Desc,
},
)
.await?;
let items: Vec<Activity> = page
.items
.into_iter()
.map(|row| {
Result::<Activity, Error>::Ok(torii_sqlite_types::Activity::from_row(&row)?.into())
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Page {
items,
next_cursor: page.next_cursor,
})
}
}

#[async_trait]
Expand Down
33 changes: 33 additions & 0 deletions crates/sqlite/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,39 @@ pub struct Event {
pub created_at: DateTime<Utc>,
}

#[derive(FromRow, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Activity {
pub id: String,
pub world_address: String,
pub namespace: String,
pub caller_address: String,
pub session_start: DateTime<Utc>,
pub session_end: DateTime<Utc>,
pub action_count: i32,
pub actions: String, // JSON string
pub updated_at: DateTime<Utc>,
}

impl From<Activity> for torii_proto::Activity {
fn from(value: Activity) -> Self {
use std::collections::HashMap;
let actions: HashMap<String, u32> =
serde_json::from_str(&value.actions).unwrap_or_default();
Self {
id: value.id,
world_address: Felt::from_hex(&value.world_address).unwrap(),
namespace: value.namespace,
caller_address: Felt::from_hex(&value.caller_address).unwrap(),
session_start: value.session_start,
session_end: value.session_end,
action_count: value.action_count as u32,
actions,
updated_at: value.updated_at,
}
}
}

impl From<Event> for torii_proto::EventWithMetadata {
fn from(value: Event) -> Self {
Self {
Expand Down
11 changes: 7 additions & 4 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use torii_math::I256;
use torii_proto::schema::Entity;

use torii_proto::{
AggregationEntry, AggregationQuery, BalanceId, Contract, ContractCursor, ContractQuery,
Controller, ControllerQuery, Event, EventQuery, Model, Page, Query, Token, TokenBalance,
TokenBalanceQuery, TokenContract, TokenContractQuery, TokenId, TokenQuery, TokenTransfer,
TokenTransferQuery, Transaction, TransactionCall, TransactionQuery,
Activity, ActivityQuery, AggregationEntry, AggregationQuery, BalanceId, Contract,
ContractCursor, ContractQuery, Controller, ControllerQuery, Event, EventQuery, Model, Page,
Query, Token, TokenBalance, TokenBalanceQuery, TokenContract, TokenContractQuery, TokenId,
TokenQuery, TokenTransfer, TokenTransferQuery, Transaction, TransactionCall, TransactionQuery,
};

pub mod utils;
Expand Down Expand Up @@ -93,6 +93,9 @@ pub trait ReadOnlyStorage: Send + Sync + Debug {
&self,
query: &AggregationQuery,
) -> Result<Page<AggregationEntry>, StorageError>;

/// Returns activities for the storage.
async fn activities(&self, query: &ActivityQuery) -> Result<Page<Activity>, StorageError>;
}

#[async_trait]
Expand Down
Loading