Skip to content

Commit 07bd9ea

Browse files
authored
fix: historical entities with mutli world support (#359)
* fix: historical entities with mutli world support * fmt
1 parent d9e6ba9 commit 07bd9ea

File tree

3 files changed

+36
-27
lines changed

3 files changed

+36
-27
lines changed

crates/sqlite/sqlite/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ pub enum ParseError {
5858
InvalidTyEntity,
5959
#[error("Invalid FixedSizeArray format: {0}")]
6060
InvalidFixedSizeArray(String),
61+
#[error("Invalid world-scoped ID format: {0}")]
62+
InvalidWorldScopedId(String),
6163
}
6264

6365
#[derive(Debug, thiserror::Error)]

crates/sqlite/sqlite/src/executor/mod.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,31 +585,39 @@ impl<P: Provider + Sync + Send + Clone + 'static> Executor<'_, P> {
585585
if entity.is_historical {
586586
entity_counter += 1;
587587

588+
let (world_address, entity_id) = entity
589+
.entity_id
590+
.split_once(':')
591+
.expect("Invalid world-scoped ID format");
588592
let data = serde_json::to_string(&entity.ty.to_json_value()?)
589593
.map_err(|e| ExecutorQueryError::Parse(ParseError::FromJsonStr(e)))?;
590594
if let Some(keys) = entity.keys_str {
591595
sqlx::query(
592596
"INSERT INTO entities_historical (id, keys, event_id, data, model_id, \
593-
executed_at) VALUES (?, ?, ?, ?, ?, ?) RETURNING *",
597+
executed_at, world_address, entity_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING *",
594598
)
595599
.bind(entity.entity_id.clone())
596600
.bind(keys)
597601
.bind(entity.event_id.clone())
598602
.bind(data)
599603
.bind(entity.model_id.clone())
600604
.bind(entity.block_timestamp.clone())
605+
.bind(world_address)
606+
.bind(entity_id)
601607
.fetch_one(&mut **tx)
602608
.await?;
603609
} else {
604610
sqlx::query(
605611
"INSERT INTO entities_historical (id, event_id, data, model_id, \
606-
executed_at) VALUES (?, ?, ?, ?, ?) RETURNING *",
612+
executed_at, world_address, entity_id) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING *",
607613
)
608614
.bind(entity.entity_id.clone())
609615
.bind(entity.event_id.clone())
610616
.bind(data)
611617
.bind(entity.model_id.clone())
612618
.bind(entity.block_timestamp.clone())
619+
.bind(world_address)
620+
.bind(entity_id)
613621
.fetch_one(&mut **tx)
614622
.await?;
615623
}
@@ -749,18 +757,24 @@ impl<P: Provider + Sync + Send + Clone + 'static> Executor<'_, P> {
749757
if em_query.is_historical {
750758
event_counter += 1;
751759

760+
let (world_address, entity_id) = em_query
761+
.entity_id
762+
.split_once(':')
763+
.expect("Invalid world-scoped ID format");
752764
let data = serde_json::to_string(&em_query.ty.to_json_value()?)
753765
.map_err(|e| ExecutorQueryError::Parse(ParseError::FromJsonStr(e)))?;
754766
sqlx::query(
755767
"INSERT INTO event_messages_historical (id, keys, event_id, data, \
756-
model_id, executed_at) VALUES (?, ?, ?, ?, ?, ?) RETURNING *",
768+
model_id, executed_at, world_address, entity_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING *",
757769
)
758770
.bind(em_query.entity_id.clone())
759771
.bind(em_query.keys_str.clone())
760772
.bind(em_query.event_id.clone())
761773
.bind(data)
762774
.bind(em_query.model_id.clone())
763775
.bind(em_query.block_timestamp.clone())
776+
.bind(world_address)
777+
.bind(entity_id)
764778
.fetch_one(&mut **tx)
765779
.await?;
766780
}

crates/sqlite/sqlite/src/model.rs

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,12 @@ impl Sql {
687687
.map(|_| {
688688
format!(
689689
"{}.model_id = {}.world_address || ':' || ?",
690-
model_relation_table, table
690+
if historical {
691+
table
692+
} else {
693+
model_relation_table
694+
},
695+
table
691696
)
692697
})
693698
.collect();
@@ -704,15 +709,8 @@ impl Sql {
704709
};
705710

706711
let page = if historical {
707-
self.fetch_historical_entities(
708-
table,
709-
model_relation_table,
710-
&where_clause,
711-
bind_values,
712-
pagination,
713-
&schemas,
714-
)
715-
.await?
712+
self.fetch_historical_entities(table, &where_clause, bind_values, pagination, &schemas)
713+
.await?
716714
} else {
717715
let page = self
718716
.fetch_entities(
@@ -746,7 +744,6 @@ impl Sql {
746744
async fn fetch_historical_entities(
747745
&self,
748746
table: &str,
749-
model_relation_table: &str,
750747
where_clause: &str,
751748
bind_values: Vec<String>,
752749
pagination: Pagination,
@@ -756,20 +753,16 @@ impl Sql {
756753

757754
let mut query_builder = QueryBuilder::new(table)
758755
.select(&[
759-
format!("{}.world_address", table),
760-
format!("{}.id", table),
761-
format!("{}.entity_id", table),
762-
format!("{}.data", table),
763-
format!("{}.model_id", table),
764-
format!("{}.event_id", table),
765-
format!("{}.created_at", table),
766-
format!("{}.updated_at", table),
767-
format!("{}.executed_at", table),
768-
format!("group_concat({model_relation_table}.model_id) as model_ids"),
756+
format!("{table}.world_address"),
757+
format!("{table}.id"),
758+
format!("{table}.entity_id"),
759+
format!("{table}.data"),
760+
format!("{table}.model_id"),
761+
format!("{table}.event_id"),
762+
format!("{table}.created_at"),
763+
format!("{table}.updated_at"),
764+
format!("{table}.executed_at"),
769765
])
770-
.join(&format!(
771-
"JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id",
772-
))
773766
.group_by(&format!("{table}.event_id"));
774767

775768
// Add user where clause if provided (applies to already-filtered set)

0 commit comments

Comments
 (0)