Skip to content

Commit f5b5e1f

Browse files
authored
feat: official multi world support (#351)
* feat(storage): multi world scoping for entities * formatting * formatting storeage * update tests * fix set nttiies * rm bad mig * fix up all storeage queries * big refactor for caching Y& processors mujlti world * update grpc & all services to multi world support * fix: storing * finished multi world support for queries * debug log rmeove * further optimize queries * uodate historifal query * multi world fixes * offchain messages world address * fmt * world ids test * ids gql * fix graphql subscription * fix last gql subscriptions * finnalty fix all subs * add world addresses to grpc subs * fmt * clippy fix
1 parent 50482bd commit f5b5e1f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+929
-509
lines changed

crates/cache/src/lib.rs

Lines changed: 78 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::Arc;
33

44
use async_trait::async_trait;
55
use dashmap::DashMap;
6-
use dojo_types::naming;
76
use starknet::core::types::contract::AbiEntry;
87
use starknet::core::types::{
98
BlockId, BlockTag, ContractClass, EntryPointsByType, LegacyContractAbiEntry, StarknetError,
@@ -24,11 +23,17 @@ pub type CacheError = Box<dyn std::error::Error + Send + Sync>;
2423

2524
#[async_trait]
2625
pub trait ReadOnlyCache: Send + Sync + std::fmt::Debug {
27-
/// Get models by selectors. If selectors is empty, returns all models.
28-
async fn models(&self, selectors: &[Felt]) -> Result<Vec<Model>, CacheError>;
26+
/// Get models by selectors from specified worlds.
27+
/// If world_addresses is empty, returns models from all worlds.
28+
/// If selectors is empty, returns all models from the specified worlds.
29+
async fn models(
30+
&self,
31+
world_addresses: &[Felt],
32+
selectors: &[Felt],
33+
) -> Result<Vec<Model>, CacheError>;
2934

30-
/// Get a specific model by selector.
31-
async fn model(&self, selector: Felt) -> Result<Model, CacheError>;
35+
/// Get a specific model by selector for the given world.
36+
async fn model(&self, world_address: Felt, selector: Felt) -> Result<Model, CacheError>;
3237

3338
/// Check if a token is registered.
3439
async fn is_token_registered(&self, token_id: &TokenId) -> bool;
@@ -45,8 +50,8 @@ pub trait ReadOnlyCache: Send + Sync + std::fmt::Debug {
4550

4651
#[async_trait]
4752
pub trait Cache: ReadOnlyCache + Send + Sync + std::fmt::Debug {
48-
/// Register a model in the cache.
49-
async fn register_model(&self, selector: Felt, model: Model);
53+
/// Register a model in the cache, scoped to a world.
54+
async fn register_model(&self, world_address: Felt, selector: Felt, model: Model);
5055

5156
/// Clear all models from the cache.
5257
async fn clear_models(&self);
@@ -78,16 +83,20 @@ impl InMemoryCache {
7883

7984
#[async_trait]
8085
impl ReadOnlyCache for InMemoryCache {
81-
async fn models(&self, selectors: &[Felt]) -> Result<Vec<Model>, CacheError> {
86+
async fn models(
87+
&self,
88+
world_addresses: &[Felt],
89+
selectors: &[Felt],
90+
) -> Result<Vec<Model>, CacheError> {
8291
self.model_cache
83-
.models(selectors)
92+
.models(world_addresses, selectors)
8493
.await
8594
.map_err(|e| Box::new(e) as CacheError)
8695
}
8796

88-
async fn model(&self, selector: Felt) -> Result<Model, CacheError> {
97+
async fn model(&self, world_address: Felt, selector: Felt) -> Result<Model, CacheError> {
8998
self.model_cache
90-
.model(selector)
99+
.model(world_address, selector)
91100
.await
92101
.map_err(|e| Box::new(e) as CacheError)
93102
}
@@ -119,8 +128,8 @@ impl ReadOnlyCache for InMemoryCache {
119128

120129
#[async_trait]
121130
impl Cache for InMemoryCache {
122-
async fn register_model(&self, selector: Felt, model: Model) {
123-
self.model_cache.set(selector, model).await
131+
async fn register_model(&self, world_address: Felt, selector: Felt, model: Model) {
132+
self.model_cache.set(world_address, selector, model).await
124133
}
125134

126135
async fn clear_models(&self) {
@@ -147,48 +156,86 @@ impl Cache for InMemoryCache {
147156

148157
#[derive(Debug)]
149158
pub struct ModelCache {
150-
model_cache: RwLock<HashMap<Felt, Model>>,
159+
// Outer key: world_address, Inner key: model_selector
160+
model_cache: RwLock<HashMap<Felt, HashMap<Felt, Model>>>,
151161
}
152162

153163
impl ModelCache {
154164
pub async fn new(storage: Arc<dyn ReadOnlyStorage>) -> Result<Self, Error> {
155-
let models = storage.models(&[]).await?;
165+
let models = storage.models(&[], &[]).await?;
156166

157-
let mut model_cache = HashMap::new();
167+
let mut model_cache: HashMap<Felt, HashMap<Felt, Model>> = HashMap::new();
158168
for model in models {
159-
let selector = naming::compute_selector_from_names(&model.namespace, &model.name);
160-
model_cache.insert(selector, model);
169+
model_cache
170+
.entry(model.world_address)
171+
.or_default()
172+
.insert(model.selector, model);
161173
}
162174

163175
Ok(Self {
164176
model_cache: RwLock::new(model_cache),
165177
})
166178
}
167179

168-
pub async fn models(&self, selectors: &[Felt]) -> Result<Vec<Model>, Error> {
169-
if selectors.is_empty() {
170-
return Ok(self.model_cache.read().await.values().cloned().collect());
171-
}
172-
173-
let mut schemas = Vec::with_capacity(selectors.len());
174-
for selector in selectors {
175-
schemas.push(self.model(*selector).await?);
180+
pub async fn models(
181+
&self,
182+
world_addresses: &[Felt],
183+
selectors: &[Felt],
184+
) -> Result<Vec<Model>, Error> {
185+
let cache = self.model_cache.read().await;
186+
let mut result = Vec::new();
187+
188+
if world_addresses.is_empty() {
189+
// Return from all worlds
190+
for world_models in cache.values() {
191+
if selectors.is_empty() {
192+
result.extend(world_models.values().cloned());
193+
} else {
194+
for selector in selectors {
195+
if let Some(model) = world_models.get(selector) {
196+
result.push(model.clone());
197+
}
198+
}
199+
}
200+
}
201+
} else {
202+
// Return from specific worlds
203+
for world in world_addresses {
204+
let Some(world_models) = cache.get(world) else {
205+
continue;
206+
};
207+
208+
if selectors.is_empty() {
209+
result.extend(world_models.values().cloned());
210+
} else {
211+
for selector in selectors {
212+
if let Some(model) = world_models.get(selector) {
213+
result.push(model.clone());
214+
}
215+
}
216+
}
217+
}
176218
}
177219

178-
Ok(schemas)
220+
Ok(result)
179221
}
180222

181-
pub async fn model(&self, selector: Felt) -> Result<Model, Error> {
223+
pub async fn model(&self, world_address: Felt, selector: Felt) -> Result<Model, Error> {
182224
let cache = self.model_cache.read().await;
225+
183226
cache
184-
.get(&selector)
227+
.get(&world_address)
228+
.and_then(|world_models| world_models.get(&selector))
185229
.cloned()
186230
.ok_or_else(|| Error::ModelNotFound(selector))
187231
}
188232

189-
pub async fn set(&self, selector: Felt, model: Model) {
233+
pub async fn set(&self, world_address: Felt, selector: Felt, model: Model) {
190234
let mut cache = self.model_cache.write().await;
191-
cache.insert(selector, model);
235+
cache
236+
.entry(world_address)
237+
.or_insert_with(HashMap::new)
238+
.insert(selector, model);
192239
}
193240

194241
pub async fn clear(&self) {

crates/client/src/lib.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ pub struct Client {
3232

3333
impl Client {
3434
/// Returns a initialized [Client] with default max message size (4MB).
35-
pub async fn new(torii_url: String, world: Felt) -> Result<Self, Error> {
36-
let grpc_client = WorldClient::new(torii_url, world).await?;
35+
pub async fn new(torii_url: String) -> Result<Self, Error> {
36+
let grpc_client = WorldClient::new(torii_url).await?;
3737

3838
Ok(Self { inner: grpc_client })
3939
}
@@ -42,39 +42,40 @@ impl Client {
4242
///
4343
/// # Arguments
4444
/// * `torii_url` - The URL of the Torii server
45-
/// * `world` - The world address
4645
/// * `max_message_size` - Maximum size in bytes for gRPC messages (both incoming and outgoing)
4746
pub async fn new_with_config(
4847
torii_url: String,
49-
world: Felt,
5048
max_message_size: usize,
5149
) -> Result<Self, Error> {
52-
let grpc_client = WorldClient::new_with_config(torii_url, world, max_message_size).await?;
50+
let grpc_client = WorldClient::new_with_config(torii_url, max_message_size).await?;
5351

5452
Ok(Self { inner: grpc_client })
5553
}
5654

5755
/// Publishes an offchain message to the world.
5856
/// Returns the entity id of the offchain message.
59-
pub async fn publish_message(&self, message: Message) -> Result<Felt, Error> {
57+
pub async fn publish_message(&self, message: Message) -> Result<String, Error> {
6058
let mut grpc_client = self.inner.clone();
6159
let entity_id = grpc_client.publish_message(message).await?;
6260
Ok(entity_id)
6361
}
6462

6563
/// Publishes a set of offchain messages to the world.
6664
/// Returns the entity ids of the offchain messages.
67-
pub async fn publish_message_batch(&self, messages: Vec<Message>) -> Result<Vec<Felt>, Error> {
65+
pub async fn publish_message_batch(
66+
&self,
67+
messages: Vec<Message>,
68+
) -> Result<Vec<String>, Error> {
6869
let mut grpc_client = self.inner.clone();
6970
let entity_ids = grpc_client.publish_message_batch(messages).await?;
7071
Ok(entity_ids)
7172
}
7273

7374
/// Returns a read lock on the World metadata that the client is connected to.
74-
pub async fn metadata(&self) -> Result<World, Error> {
75+
pub async fn worlds(&self, world_addresses: Vec<Felt>) -> Result<Vec<World>, Error> {
7576
let mut grpc_client = self.inner.clone();
76-
let world = grpc_client.metadata().await?;
77-
Ok(world)
77+
let worlds = grpc_client.worlds(world_addresses).await?;
78+
Ok(worlds)
7879
}
7980

8081
/// Retrieves controllers matching contract addresses.
@@ -402,9 +403,12 @@ impl Client {
402403
pub async fn on_entity_updated(
403404
&self,
404405
clause: Option<Clause>,
406+
world_addresses: Vec<Felt>,
405407
) -> Result<EntityUpdateStreaming, Error> {
406408
let mut grpc_client = self.inner.clone();
407-
let stream = grpc_client.subscribe_entities(clause).await?;
409+
let stream = grpc_client
410+
.subscribe_entities(clause, world_addresses)
411+
.await?;
408412
Ok(stream)
409413
}
410414

@@ -413,10 +417,11 @@ impl Client {
413417
&self,
414418
subscription_id: u64,
415419
clause: Option<Clause>,
420+
world_addresses: Vec<Felt>,
416421
) -> Result<(), Error> {
417422
let mut grpc_client = self.inner.clone();
418423
grpc_client
419-
.update_entities_subscription(subscription_id, clause)
424+
.update_entities_subscription(subscription_id, clause, world_addresses)
420425
.await?;
421426
Ok(())
422427
}
@@ -425,9 +430,12 @@ impl Client {
425430
pub async fn on_event_message_updated(
426431
&self,
427432
clause: Option<Clause>,
433+
world_addresses: Vec<Felt>,
428434
) -> Result<EntityUpdateStreaming, Error> {
429435
let mut grpc_client = self.inner.clone();
430-
let stream = grpc_client.subscribe_event_messages(clause).await?;
436+
let stream = grpc_client
437+
.subscribe_event_messages(clause, world_addresses)
438+
.await?;
431439
Ok(stream)
432440
}
433441

@@ -436,10 +444,11 @@ impl Client {
436444
&self,
437445
subscription_id: u64,
438446
clause: Option<Clause>,
447+
world_addresses: Vec<Felt>,
439448
) -> Result<(), Error> {
440449
let mut grpc_client = self.inner.clone();
441450
grpc_client
442-
.update_event_messages_subscription(subscription_id, clause)
451+
.update_event_messages_subscription(subscription_id, clause, world_addresses)
443452
.await?;
444453
Ok(())
445454
}

crates/graphql/src/mapping.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,10 @@ pub static TOKEN_TYPE_MAPPING: LazyLock<TypeMapping> = LazyLock::new(|| {
461461

462462
pub static PUBLISH_MESSAGE_INPUT_MAPPING: LazyLock<TypeMapping> = LazyLock::new(|| {
463463
IndexMap::from([
464+
(
465+
Name::new("worldAddress"),
466+
TypeData::Simple(TypeRef::named_nn(TypeRef::STRING)),
467+
),
464468
(
465469
Name::new("signature"),
466470
TypeData::Simple(TypeRef::named_nn_list(TypeRef::STRING)),
@@ -474,7 +478,7 @@ pub static PUBLISH_MESSAGE_INPUT_MAPPING: LazyLock<TypeMapping> = LazyLock::new(
474478

475479
pub static PUBLISH_MESSAGE_RESPONSE_MAPPING: LazyLock<TypeMapping> = LazyLock::new(|| {
476480
IndexMap::from([(
477-
Name::new("entityId"),
481+
Name::new("id"),
478482
TypeData::Simple(TypeRef::named_nn(TypeRef::STRING)),
479483
)])
480484
});

crates/graphql/src/object/entity.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use torii_broker::types::EntityUpdate;
1111
use torii_broker::MemoryBroker;
1212
use torii_sqlite::types::Entity;
1313
use torii_sqlite::utils::felt_to_sql_string;
14+
use torii_storage::utils::format_world_scoped_id;
1415

1516
use super::inputs::keys_input::keys_argument;
1617
use super::{BasicObject, ResolvableObject, TypeMapping, ValueMapping};
@@ -83,7 +84,14 @@ impl ResolvableObject for EntityObject {
8384
let id = id.clone();
8485
async move {
8586
let entity_id = felt_to_sql_string(&entity.entity.hashed_keys);
86-
if id.is_none() || id == Some(entity_id.clone()) {
87+
let scoped_entity_id = format_world_scoped_id(
88+
&entity.entity.world_address,
89+
&entity.entity.hashed_keys,
90+
);
91+
if id.is_none()
92+
|| id == Some(entity_id.clone())
93+
|| id == Some(scoped_entity_id.clone())
94+
{
8795
let mut conn = match pool.acquire().await {
8896
Ok(conn) => conn,
8997
Err(_) => return None,
@@ -92,7 +100,7 @@ impl ResolvableObject for EntityObject {
92100
let entity = match sqlx::query_as::<_, Entity>(
93101
"SELECT * FROM entities WHERE id = ?",
94102
)
95-
.bind(&entity_id)
103+
.bind(&scoped_entity_id)
96104
.fetch_one(&mut *conn)
97105
.await
98106
{

crates/graphql/src/object/model.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use torii_broker::types::ModelUpdate;
99
use torii_broker::MemoryBroker;
1010
use torii_sqlite::types::Model;
1111
use torii_sqlite::utils::felt_to_sql_string;
12+
use torii_storage::utils::format_world_scoped_id;
1213

1314
use super::{resolve_many, BasicObject, ResolvableObject, TypeMapping, ValueMapping};
1415
use crate::constants::{
@@ -108,7 +109,14 @@ impl ResolvableObject for ModelObject {
108109
let id = id.clone();
109110
async move {
110111
let model_id = felt_to_sql_string(&model.selector);
111-
if id.is_none() || id == Some(model_id.clone()) {
112+
let scoped_model_id = format_world_scoped_id(
113+
&model.world_address,
114+
&model.selector,
115+
);
116+
if id.is_none()
117+
|| id == Some(model_id.clone())
118+
|| id == Some(scoped_model_id.clone())
119+
{
112120
let mut conn = match pool.acquire().await {
113121
Ok(conn) => conn,
114122
Err(_) => return None,
@@ -117,7 +125,7 @@ impl ResolvableObject for ModelObject {
117125
let model = match sqlx::query_as::<_, Model>(
118126
"SELECT * FROM models WHERE id = ?",
119127
)
120-
.bind(&model_id)
128+
.bind(&scoped_model_id)
121129
.fetch_one(&mut *conn)
122130
.await
123131
{

0 commit comments

Comments
 (0)