Skip to content

Commit 1baebbb

Browse files
authored
[light-client] index accumulator version on events (#24304)
## Description Index accumulator version on events. When building the MMR for authenticated event verification we need to group events by commit rather than checkpoint as there may be multiple commits / settlements per checkpoint. ## Test plan Added a simtest to create multiple commits per checkpoint. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] Indexing Framework:
1 parent 7393626 commit 1baebbb

File tree

9 files changed

+297
-51
lines changed

9 files changed

+297
-51
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sui-core/src/rpc_index.rs

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919
use std::sync::Mutex;
2020
use std::time::Duration;
2121
use std::time::Instant;
22+
use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
2223
use sui_types::base_types::MoveObjectType;
2324
use sui_types::base_types::ObjectID;
2425
use sui_types::base_types::SequenceNumber;
@@ -40,6 +41,7 @@ use sui_types::storage::EpochInfo;
4041
use sui_types::storage::TransactionInfo;
4142
use sui_types::storage::error::Error as StorageError;
4243
use sui_types::sui_system_state::SuiSystemStateTrait;
44+
use sui_types::transaction::{TransactionDataAPI, TransactionKind};
4345
use sysinfo::{MemoryRefreshKind, RefreshKind, System};
4446
use tracing::{debug, info, warn};
4547
use typed_store::DBMapUtils;
@@ -387,6 +389,8 @@ struct IndexStoreTables {
387389
pub struct EventIndexKey {
388390
pub stream_id: SuiAddress,
389391
pub checkpoint_seq: u64,
392+
/// The accumulator version that this event is settled into
393+
pub accumulator_version: u64,
390394
pub transaction_idx: u32,
391395
pub event_index: u32,
392396
}
@@ -686,11 +690,38 @@ impl IndexStoreTables {
686690
Ok(batch)
687691
}
688692

693+
fn extract_accumulator_version(
694+
&self,
695+
tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
696+
) -> Option<u64> {
697+
let TransactionKind::ProgrammableSystemTransaction(pt) =
698+
tx.transaction.transaction_data().kind()
699+
else {
700+
return None;
701+
};
702+
703+
if pt.shared_input_objects().any(|obj| {
704+
obj.id == SUI_ACCUMULATOR_ROOT_OBJECT_ID
705+
&& obj.mutability == sui_types::transaction::SharedObjectMutability::Mutable
706+
}) {
707+
return tx.output_objects.iter().find_map(|obj| {
708+
if obj.id() == SUI_ACCUMULATOR_ROOT_OBJECT_ID {
709+
Some(obj.version().value())
710+
} else {
711+
None
712+
}
713+
});
714+
}
715+
716+
None
717+
}
718+
689719
fn index_transaction_events(
690720
&self,
691721
tx: &sui_types::full_checkpoint_content::CheckpointTransaction,
692722
checkpoint_seq: u64,
693723
tx_idx: u32,
724+
accumulator_version: Option<u64>,
694725
batch: &mut typed_store::rocks::DBBatch,
695726
) -> Result<(), StorageError> {
696727
let acc_events = tx.effects.accumulator_events();
@@ -700,18 +731,29 @@ impl IndexStoreTables {
700731

701732
let mut entries: Vec<(EventIndexKey, ())> = Vec::new();
702733
for acc in acc_events {
703-
if let Some(stream_id) =
704-
sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
705-
&& let AccumulatorValue::EventDigest(event_digests) = &acc.write.value
706-
{
707-
for (idx, _d) in event_digests {
708-
let key = EventIndexKey {
709-
stream_id,
734+
if let AccumulatorValue::EventDigest(event_digests) = &acc.write.value {
735+
let Some(accumulator_version) = accumulator_version else {
736+
mysten_common::debug_fatal!(
737+
"Found events at checkpoint {} tx {} before any accumulator settlement",
710738
checkpoint_seq,
711-
transaction_idx: tx_idx,
712-
event_index: *idx as u32,
713-
};
714-
entries.push((key, ()));
739+
tx_idx
740+
);
741+
continue;
742+
};
743+
744+
if let Some(stream_id) =
745+
sui_types::accumulator_root::stream_id_from_accumulator_event(&acc)
746+
{
747+
for (idx, _d) in event_digests {
748+
let key = EventIndexKey {
749+
stream_id,
750+
checkpoint_seq,
751+
accumulator_version,
752+
transaction_idx: tx_idx,
753+
event_index: *idx as u32,
754+
};
755+
entries.push((key, ()));
756+
}
715757
}
716758
}
717759
}
@@ -787,8 +829,10 @@ impl IndexStoreTables {
787829
index_events: bool,
788830
) -> Result<(), StorageError> {
789831
let cp = checkpoint.checkpoint_summary.sequence_number;
832+
let mut current_accumulator_version: Option<u64> = None;
790833

791-
for (tx_idx, tx) in checkpoint.transactions.iter().enumerate() {
834+
// iterate in reverse order, process accumulator settlements first
835+
for (tx_idx, tx) in checkpoint.transactions.iter().enumerate().rev() {
792836
let info = TransactionInfo::new(
793837
tx.transaction.transaction_data(),
794838
&tx.effects,
@@ -801,7 +845,17 @@ impl IndexStoreTables {
801845
batch.insert_batch(&self.transactions, [(digest, info)])?;
802846

803847
if index_events {
804-
self.index_transaction_events(tx, cp, tx_idx as u32, batch)?;
848+
if let Some(version) = self.extract_accumulator_version(tx) {
849+
current_accumulator_version = Some(version);
850+
}
851+
852+
self.index_transaction_events(
853+
tx,
854+
cp,
855+
tx_idx as u32,
856+
current_accumulator_version,
857+
batch,
858+
)?;
805859
}
806860
}
807861

@@ -937,6 +991,7 @@ impl IndexStoreTables {
937991
&self,
938992
stream_id: SuiAddress,
939993
start_checkpoint: u64,
994+
start_accumulator_version: u64,
940995
start_transaction_idx: u32,
941996
start_event_idx: u32,
942997
end_checkpoint: u64,
@@ -946,15 +1001,18 @@ impl IndexStoreTables {
9461001
let lower = EventIndexKey {
9471002
stream_id,
9481003
checkpoint_seq: start_checkpoint,
1004+
accumulator_version: start_accumulator_version,
9491005
transaction_idx: start_transaction_idx,
9501006
event_index: start_event_idx,
9511007
};
9521008
let upper = EventIndexKey {
9531009
stream_id,
9541010
checkpoint_seq: end_checkpoint,
1011+
accumulator_version: u64::MAX,
9551012
transaction_idx: u32::MAX,
9561013
event_index: u32::MAX,
9571014
};
1015+
9581016
Ok(self
9591017
.events_by_stream
9601018
.safe_iter_with_bounds(Some(lower), Some(upper))
@@ -1510,6 +1568,7 @@ impl RpcIndexStore {
15101568
&self,
15111569
stream_id: SuiAddress,
15121570
start_checkpoint: u64,
1571+
start_accumulator_version: u64,
15131572
start_transaction_idx: u32,
15141573
start_event_idx: u32,
15151574
end_checkpoint: u64,
@@ -1519,6 +1578,7 @@ impl RpcIndexStore {
15191578
self.tables.event_iter(
15201579
stream_id,
15211580
start_checkpoint,
1581+
start_accumulator_version,
15221582
start_transaction_idx,
15231583
start_event_idx,
15241584
end_checkpoint,
@@ -1816,6 +1876,7 @@ mod tests {
18161876
.map(|&checkpoint_seq| EventIndexKey {
18171877
stream_id,
18181878
checkpoint_seq,
1879+
accumulator_version: 0,
18191880
transaction_idx: 0,
18201881
event_index: 0,
18211882
})
@@ -1843,12 +1904,14 @@ mod tests {
18431904
let start_key = EventIndexKey {
18441905
stream_id: SuiAddress::ZERO,
18451906
checkpoint_seq: 0,
1907+
accumulator_version: 0,
18461908
transaction_idx: 0,
18471909
event_index: 0,
18481910
};
18491911
let end_key = EventIndexKey {
18501912
stream_id: SuiAddress::random_for_testing_only(),
18511913
checkpoint_seq: u64::MAX,
1914+
accumulator_version: u64::MAX,
18521915
transaction_idx: u32::MAX,
18531916
event_index: u32::MAX,
18541917
};
@@ -1880,6 +1943,7 @@ mod tests {
18801943
let old_key = EventIndexKey {
18811944
stream_id: SuiAddress::random_for_testing_only(),
18821945
checkpoint_seq: 50,
1946+
accumulator_version: 0,
18831947
transaction_idx: 0,
18841948
event_index: 0,
18851949
};
@@ -1892,6 +1956,7 @@ mod tests {
18921956
let new_key = EventIndexKey {
18931957
stream_id: SuiAddress::random_for_testing_only(),
18941958
checkpoint_seq: 150,
1959+
accumulator_version: 0,
18951960
transaction_idx: 0,
18961961
event_index: 0,
18971962
};
@@ -1904,6 +1969,7 @@ mod tests {
19041969
let boundary_key = EventIndexKey {
19051970
stream_id: SuiAddress::random_for_testing_only(),
19061971
checkpoint_seq: 100,
1972+
accumulator_version: 0,
19071973
transaction_idx: 0,
19081974
event_index: 0,
19091975
};

crates/sui-core/src/storage.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ impl<I> Iterator for BatchedEventIterator<'_, I>
543543
where
544544
I: Iterator<Item = Result<crate::rpc_index::EventIndexKey, TypedStoreError>>,
545545
{
546-
type Item = Result<(u64, u32, u32, sui_types::event::Event), TypedStoreError>;
546+
type Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>;
547547

548548
fn next(&mut self) -> Option<Self::Item> {
549549
let key = match self.key_iter.next()? {
@@ -577,6 +577,7 @@ where
577577

578578
Some(Ok((
579579
key.checkpoint_seq,
580+
key.accumulator_version,
580581
key.transaction_idx,
581582
key.event_index,
582583
event,
@@ -728,20 +729,23 @@ impl RpcIndexes for RestReadStore {
728729
&self,
729730
stream_id: SuiAddress,
730731
start_checkpoint: u64,
732+
start_accumulator_version: Option<u64>,
731733
start_transaction_idx: Option<u32>,
732734
start_event_idx: Option<u32>,
733735
end_checkpoint: u64,
734736
limit: u32,
735737
) -> sui_types::storage::error::Result<
736738
Box<
737-
dyn Iterator<Item = Result<(u64, u32, u32, sui_types::event::Event), TypedStoreError>>
738-
+ '_,
739+
dyn Iterator<
740+
Item = Result<(u64, u64, u32, u32, sui_types::event::Event), TypedStoreError>,
741+
> + '_,
739742
>,
740743
> {
741744
let index = self.index()?;
742745
let key_iter = index.event_iter(
743746
stream_id,
744747
start_checkpoint,
748+
start_accumulator_version.unwrap_or(0),
745749
start_transaction_idx.unwrap_or(0),
746750
start_event_idx.unwrap_or(0),
747751
end_checkpoint,

crates/sui-e2e-tests/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ futures.workspace = true
1717
prometheus.workspace = true
1818
indexmap.workspace = true
1919
insta.workspace = true
20+
itertools.workspace = true
2021
jsonrpsee.workspace = true
2122
test-cluster.workspace = true
2223
rand.workspace = true

0 commit comments

Comments
 (0)