Skip to content

Commit cd7adf3

Browse files
authored
Additional schema changes per pull/25143 (#25438)
## Description Addressing post-merge feedback from pull/25143. - Additional epoch end fields for graphql - Optimizing object iteration - Fixing mislocated pipeline name constants/comments (Object types table has already been removed, we agreed to just load types from the object data table directly) ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] Indexing Framework:
1 parent 21c7520 commit cd7adf3

File tree

4 files changed

+258
-79
lines changed

4 files changed

+258
-79
lines changed

crates/sui-kvstore/src/handlers/epochs_end.rs

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@
44
use std::sync::Arc;
55

66
use anyhow::Context as _;
7+
use anyhow::bail;
78
use sui_indexer_alt_framework::pipeline::Processor;
9+
use sui_types::event::SystemEpochInfoEvent;
810
use sui_types::full_checkpoint_content::Checkpoint;
11+
use sui_types::transaction::TransactionDataAPI;
12+
use sui_types::transaction::TransactionKind;
913

1014
use crate::bigtable::proto::bigtable::v2::mutate_rows_request::Entry;
1115
use crate::handlers::BigTableProcessor;
1216
use crate::tables;
1317

1418
/// Pipeline that writes epoch end data to BigTable.
15-
/// This is written when a new epoch starts (for the previous epoch).
1619
pub struct EpochEndPipeline;
1720

1821
#[async_trait::async_trait]
@@ -21,26 +24,102 @@ impl Processor for EpochEndPipeline {
2124
type Value = Entry;
2225

2326
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
24-
let Some(epoch_info) = checkpoint.epoch_info()? else {
27+
let summary = &checkpoint.summary;
28+
29+
let Some(end_of_epoch) = summary.end_of_epoch_data.as_ref() else {
2530
return Ok(vec![]);
2631
};
2732

28-
if epoch_info.epoch == 0 {
29-
return Ok(vec![]);
30-
}
33+
let Some(transaction) = checkpoint.transactions.iter().find(|tx| {
34+
matches!(
35+
tx.transaction.kind(),
36+
TransactionKind::ChangeEpoch(_) | TransactionKind::EndOfEpochTransaction(_)
37+
)
38+
}) else {
39+
bail!(
40+
"Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData",
41+
summary.sequence_number,
42+
);
43+
};
3144

32-
let epoch_id = epoch_info.epoch - 1;
33-
let start_checkpoint = epoch_info
34-
.start_checkpoint
35-
.context("missing start_checkpoint for epoch end")?;
36-
let end_timestamp_ms = epoch_info
37-
.start_timestamp_ms
38-
.context("missing start_timestamp_ms for epoch end")?;
39-
let end_checkpoint = start_checkpoint - 1;
45+
let epoch_id = summary.epoch;
46+
let end_timestamp_ms = summary.timestamp_ms;
47+
let end_checkpoint = summary.sequence_number;
48+
let cp_hi = summary.sequence_number + 1;
49+
let tx_hi = summary.network_total_transactions;
50+
51+
let epoch_commitments = bcs::to_bytes(&end_of_epoch.epoch_commitments)
52+
.context("Failed to serialize EpochCommitment-s")?;
53+
54+
let (
55+
safe_mode,
56+
total_stake,
57+
storage_fund_balance,
58+
storage_fund_reinvestment,
59+
storage_charge,
60+
storage_rebate,
61+
stake_subsidy_amount,
62+
total_gas_fees,
63+
total_stake_rewards_distributed,
64+
leftover_storage_fund_inflow,
65+
) = if let Some(SystemEpochInfoEvent {
66+
total_stake,
67+
storage_fund_reinvestment,
68+
storage_charge,
69+
storage_rebate,
70+
storage_fund_balance,
71+
stake_subsidy_amount,
72+
total_gas_fees,
73+
total_stake_rewards_distributed,
74+
leftover_storage_fund_inflow,
75+
..
76+
}) = transaction
77+
.events
78+
.iter()
79+
.flat_map(|events| &events.data)
80+
.find_map(|event| {
81+
event
82+
.is_system_epoch_info_event()
83+
.then(|| bcs::from_bytes(&event.contents))
84+
})
85+
.transpose()
86+
.context("Failed to deserialize SystemEpochInfoEvent")?
87+
{
88+
(
89+
false,
90+
Some(total_stake),
91+
Some(storage_fund_balance),
92+
Some(storage_fund_reinvestment),
93+
Some(storage_charge),
94+
Some(storage_rebate),
95+
Some(stake_subsidy_amount),
96+
Some(total_gas_fees),
97+
Some(total_stake_rewards_distributed),
98+
Some(leftover_storage_fund_inflow),
99+
)
100+
} else {
101+
(true, None, None, None, None, None, None, None, None, None)
102+
};
40103

41104
let entry = tables::make_entry(
42105
tables::epochs::encode_key(epoch_id),
43-
tables::epochs::encode_end(end_timestamp_ms, end_checkpoint),
106+
tables::epochs::encode_end(
107+
end_timestamp_ms,
108+
end_checkpoint,
109+
cp_hi,
110+
tx_hi,
111+
safe_mode,
112+
total_stake,
113+
storage_fund_balance,
114+
storage_fund_reinvestment,
115+
storage_charge,
116+
storage_rebate,
117+
stake_subsidy_amount,
118+
total_gas_fees,
119+
total_stake_rewards_distributed,
120+
leftover_storage_fund_inflow,
121+
&epoch_commitments,
122+
),
44123
Some(end_timestamp_ms),
45124
);
46125

crates/sui-kvstore/src/handlers/objects.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@ impl Processor for ObjectsPipeline {
2222

2323
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
2424
let timestamp_ms = checkpoint.summary.timestamp_ms;
25-
let mut entries = Vec::with_capacity(checkpoint.object_set.len());
26-
27-
for object in checkpoint.object_set.iter() {
28-
let object_key = ObjectKey(object.id(), object.version());
29-
let entry = tables::make_entry(
30-
tables::objects::encode_key(&object_key),
31-
tables::objects::encode(object)?,
32-
Some(timestamp_ms),
33-
);
34-
entries.push(entry);
25+
let mut entries = vec![];
26+
27+
for txn in &checkpoint.transactions {
28+
for object in txn.output_objects(&checkpoint.object_set) {
29+
let object_key = ObjectKey(object.id(), object.version());
30+
let entry = tables::make_entry(
31+
tables::objects::encode_key(&object_key),
32+
tables::objects::encode(object)?,
33+
Some(timestamp_ms),
34+
);
35+
entries.push(entry);
36+
}
3537
}
3638

3739
Ok(entries)

crates/sui-kvstore/src/lib.rs

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,39 @@
11
// Copyright (c) Mysten Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
mod bigtable;
5+
pub mod config;
6+
mod handlers;
7+
pub mod tables;
8+
9+
use std::sync::OnceLock;
10+
11+
use anyhow::Result;
12+
use async_trait::async_trait;
13+
use prometheus::Registry;
14+
use serde::Deserialize;
15+
use serde::Serialize;
16+
use sui_indexer_alt_framework::Indexer;
17+
use sui_indexer_alt_framework::IndexerArgs;
18+
use sui_indexer_alt_framework::ingestion::ClientArgs;
19+
use sui_indexer_alt_framework::pipeline::CommitterConfig;
20+
use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
21+
use sui_types::balance_change::BalanceChange;
22+
use sui_types::base_types::ObjectID;
23+
use sui_types::committee::EpochId;
24+
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
25+
use sui_types::digests::CheckpointDigest;
26+
use sui_types::digests::TransactionDigest;
27+
use sui_types::effects::TransactionEffects;
28+
use sui_types::effects::TransactionEvents;
29+
use sui_types::event::Event;
30+
use sui_types::messages_checkpoint::CheckpointContents;
31+
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
32+
use sui_types::messages_checkpoint::CheckpointSummary;
33+
use sui_types::object::Object;
34+
use sui_types::storage::ObjectKey;
35+
use sui_types::transaction::Transaction;
36+
437
pub use crate::bigtable::client::BigTableClient;
538
pub use crate::bigtable::store::BigTableConnection;
639
pub use crate::bigtable::store::BigTableStore;
@@ -16,6 +49,11 @@ pub use crate::handlers::ObjectsPipeline;
1649
pub use crate::handlers::PrevEpochUpdate;
1750
pub use crate::handlers::TransactionsPipeline;
1851
pub use crate::handlers::set_max_mutations;
52+
pub use config::CommitterLayer;
53+
pub use config::ConcurrentLayer;
54+
pub use config::IndexerConfig;
55+
pub use config::IngestionConfig;
56+
pub use config::PipelineLayer;
1957

2058
pub const CHECKPOINTS_PIPELINE: &str =
2159
<BigTableHandler<CheckpointsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
@@ -32,10 +70,8 @@ pub const EPOCH_END_PIPELINE: &str =
3270
pub const EPOCH_LEGACY_PIPELINE: &str =
3371
<EpochLegacyPipeline as sui_indexer_alt_framework::pipeline::Processor>::NAME;
3472

35-
/// All pipeline names registered by the indexer. Single source of truth used for:
36-
/// - Pipeline registration in `BigTableIndexer::new()`
37-
/// - Per-pipeline watermark queries in `get_watermark()`
38-
/// - Legacy watermark tracker expected count
73+
/// All pipeline names registered by the indexer. Used by `LegacyWatermarkTracker`
74+
/// to know when all pipelines have reported.
3975
pub const ALL_PIPELINE_NAMES: [&str; 7] = [
4076
CHECKPOINTS_PIPELINE,
4177
CHECKPOINTS_BY_DIGEST_PIPELINE,
@@ -46,44 +82,15 @@ pub const ALL_PIPELINE_NAMES: [&str; 7] = [
4682
EPOCH_LEGACY_PIPELINE,
4783
];
4884

49-
use std::sync::OnceLock;
50-
51-
use anyhow::Result;
52-
use async_trait::async_trait;
53-
use prometheus::Registry;
54-
use serde::Deserialize;
55-
use serde::Serialize;
56-
use sui_indexer_alt_framework::Indexer;
57-
use sui_indexer_alt_framework::IndexerArgs;
58-
use sui_indexer_alt_framework::ingestion::ClientArgs;
59-
use sui_indexer_alt_framework::pipeline::CommitterConfig;
60-
use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
61-
use sui_types::balance_change::BalanceChange;
62-
use sui_types::base_types::ObjectID;
63-
use sui_types::committee::EpochId;
64-
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
65-
use sui_types::digests::CheckpointDigest;
66-
use sui_types::digests::TransactionDigest;
67-
use sui_types::effects::TransactionEffects;
68-
use sui_types::effects::TransactionEvents;
69-
use sui_types::event::Event;
70-
use sui_types::messages_checkpoint::CheckpointContents;
71-
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
72-
use sui_types::messages_checkpoint::CheckpointSummary;
73-
use sui_types::object::Object;
74-
use sui_types::storage::ObjectKey;
75-
use sui_types::transaction::Transaction;
76-
77-
mod bigtable;
78-
pub mod config;
79-
mod handlers;
80-
pub mod tables;
81-
82-
pub use config::CommitterLayer;
83-
pub use config::ConcurrentLayer;
84-
pub use config::IndexerConfig;
85-
pub use config::IngestionConfig;
86-
pub use config::PipelineLayer;
85+
/// Non-legacy pipeline names used for the default `get_watermark` implementation.
86+
const WATERMARK_PIPELINES: [&str; 6] = [
87+
CHECKPOINTS_PIPELINE,
88+
CHECKPOINTS_BY_DIGEST_PIPELINE,
89+
TRANSACTIONS_PIPELINE,
90+
OBJECTS_PIPELINE,
91+
EPOCH_START_PIPELINE,
92+
EPOCH_END_PIPELINE,
93+
];
8794

8895
static WRITE_LEGACY_DATA: OnceLock<bool> = OnceLock::new();
8996

@@ -140,6 +147,19 @@ pub struct EpochData {
140147
pub system_state: Option<sui_types::sui_system_state::SuiSystemState>,
141148
pub end_timestamp_ms: Option<u64>,
142149
pub end_checkpoint: Option<u64>,
150+
pub cp_hi: Option<u64>,
151+
pub tx_hi: Option<u64>,
152+
pub safe_mode: Option<bool>,
153+
pub total_stake: Option<u64>,
154+
pub storage_fund_balance: Option<u64>,
155+
pub storage_fund_reinvestment: Option<u64>,
156+
pub storage_charge: Option<u64>,
157+
pub storage_rebate: Option<u64>,
158+
pub stake_subsidy_amount: Option<u64>,
159+
pub total_gas_fees: Option<u64>,
160+
pub total_stake_rewards_distributed: Option<u64>,
161+
pub leftover_storage_fund_inflow: Option<u64>,
162+
pub epoch_commitments: Option<Vec<u8>>,
143163
}
144164

145165
/// Serializable watermark for per-pipeline tracking in BigTable.
@@ -152,16 +172,6 @@ pub struct Watermark {
152172
pub timestamp_ms_hi_inclusive: u64,
153173
}
154174

155-
/// Non-legacy pipeline names used for the default `get_watermark` implementation.
156-
const WATERMARK_PIPELINES: [&str; 6] = [
157-
CHECKPOINTS_PIPELINE,
158-
CHECKPOINTS_BY_DIGEST_PIPELINE,
159-
TRANSACTIONS_PIPELINE,
160-
OBJECTS_PIPELINE,
161-
EPOCH_START_PIPELINE,
162-
EPOCH_END_PIPELINE,
163-
];
164-
165175
#[async_trait]
166176
pub trait KeyValueStoreReader {
167177
async fn get_objects(&mut self, objects: &[ObjectKey]) -> Result<Vec<Object>>;

0 commit comments

Comments
 (0)