diff --git a/voyager/modules/state/sui/src/main.rs b/voyager/modules/state/sui/src/main.rs index 7301a42ee9..0b93a010cd 100644 --- a/voyager/modules/state/sui/src/main.rs +++ b/voyager/modules/state/sui/src/main.rs @@ -61,7 +61,9 @@ pub struct Module { pub ibc_store: ObjectID, - pub ibc_contract: ObjectID, + pub initial_ibc_contract: ObjectID, + + pub latest_ibc_contract: ObjectID, pub ibc_store_initial_seq: SequenceNumber, } @@ -174,7 +176,7 @@ impl Module { .expect("there must be some events exist") .into_iter() .find_map(|e| { - if e.type_.address == self.ibc_contract.into() + if e.type_.address == self.initial_ibc_contract.into() && e.type_.module.as_str() == "events" && e.type_.name.as_str() == "PacketSend" { @@ -255,7 +257,8 @@ impl StateModule for Module { sui_client, rpc_url: config.rpc_url, ibc_store: config.ibc_store, - ibc_contract, + initial_ibc_contract: config.ibc_contract, + latest_ibc_contract: ibc_contract, ibc_store_initial_seq, }) } @@ -306,7 +309,7 @@ impl StateModuleServer for Module { ) -> RpcResult { let query = SuiQuery::new_with_store( &self.sui_client, - self.ibc_contract, + self.latest_ibc_contract, self.ibc_store, self.ibc_store_initial_seq, ) diff --git a/voyager/plugins/client-update/sui/src/main.rs b/voyager/plugins/client-update/sui/src/main.rs index f3a3a27e8b..f98c18cf53 100644 --- a/voyager/plugins/client-update/sui/src/main.rs +++ b/voyager/plugins/client-update/sui/src/main.rs @@ -12,7 +12,7 @@ use sui_sdk::{ base_types::ObjectID, committee::EpochId, full_checkpoint_content::CheckpointTransaction, }, }; -use tracing::instrument; +use tracing::{info, instrument}; use unionlabs::ibc::core::client::height::Height; use voyager_sdk::{ DefaultCmd, anyhow, @@ -177,26 +177,36 @@ impl Module { let mut headers = vec![]; let mut is_first = true; - for epoch in from..to { - let query = json!({ - "query": "query ($epoch_id: UInt53) { epoch(epochId: $epoch_id) { checkpoints(last: 1) { edges { node { sequenceNumber } } } } }", - "variables": { "epoch_id": epoch } - }); - - let resp = client - .try_clone() - .expect("no body, so this will work") - .body(query.to_string()) - .send() - .await - .unwrap() - .text() - .await - .unwrap(); + let epoch_ids: Vec = (from..to).collect(); + let query = json!({ + "query": "query ($epoch_ids: [UInt53]) { multiGetEpochs(keys: $epoch_ids) { checkpoints(last: 1) { edges { node { sequenceNumber } } } } }", + "variables": { "epoch_ids": epoch_ids} + }); + + let resp = client + .try_clone() + .expect("no body, so this will work") + .body(query.to_string()) + .send() + .await + .map_err(RpcError::retryable("error fetching epoch checkpoint"))? + .error_for_status() + .map_err(RpcError::retryable( + "error fetching epoch checkpoint: error status", + ))? + .text() + .await + .map_err(RpcError::retryable( + "error fetching epoch checkpoint: error reading text from body", + ))?; + + info!(%resp); - let v: serde_json::Value = serde_json::from_str(&resp).unwrap(); + let v: &serde_json::Value = + &serde_json::from_str::(&resp).unwrap()["data"]["multiGetEpochs"]; - let update_to = v["data"]["epoch"]["checkpoints"]["edges"][0]["node"]["sequenceNumber"] + for (i, _) in (from..to).enumerate() { + let update_to = v[i]["checkpoints"]["edges"][0]["node"]["sequenceNumber"] .as_u64() .unwrap(); diff --git a/voyager/plugins/transaction-batch/src/call.rs b/voyager/plugins/transaction-batch/src/call.rs index 664d3bfc37..c0718460c4 100644 --- a/voyager/plugins/transaction-batch/src/call.rs +++ b/voyager/plugins/transaction-batch/src/call.rs @@ -4,6 +4,7 @@ use std::cmp::Ordering; use enumorph::Enumorph; use ibc_classic_spec::IbcClassic; use ibc_union_spec::{IbcUnion, query::PacketsByBatchHash}; +use itertools::Itertools; use macros::model; use serde_json::json; use tracing::{debug, info, instrument, warn}; @@ -18,7 +19,7 @@ use voyager_sdk::{ primitives::{ChainId, QueryHeight}, rpc::{RpcError, RpcResult}, types::RawClientId, - vm::{Op, data, now, promise}, + vm::{Op, conc, data, now, promise}, }; use crate::{ @@ -55,6 +56,13 @@ where module: &Module, voyager_client: &VoyagerClient, ) -> RpcResult> { + #[derive(Debug)] + enum TargetHeights { + None, + Min(Height), + Exact(Vec), + } + let client_state_meta = voyager_client .client_state_meta::( module.chain_id.clone(), @@ -71,55 +79,57 @@ where .query_latest_height(client_state_meta.counterparty_chain_id.clone(), true) .await?; - let target_height = self + let target_heights = self .batches .iter() .flatten() .map(|e| e.provable_height) - .reduce(|acc, elem| match (elem, acc) { - (EventProvableHeight::Min(elem), EventProvableHeight::Min(acc)) => { - // the min target height of a batch of `Min` events is the highest min height - // given the batch [10, 11, 12] - // the min height that all events are provable at is 12 - EventProvableHeight::Min(elem.max(acc)) + .try_fold(TargetHeights::None, |acc, elem| match (elem, acc) { + (EventProvableHeight::Min(elem), TargetHeights::None) => { + Ok(TargetHeights::Min(elem)) } - (EventProvableHeight::Exactly(elem), EventProvableHeight::Exactly(acc)) => { - assert_eq!(elem, acc, "multiple exact heights in the batch"); - EventProvableHeight::Exactly(elem) + (EventProvableHeight::Exactly(elem), TargetHeights::None) => { + Ok(TargetHeights::Exact(vec![elem])) } - tuple => { - panic!("cannot mix exact and min provable heights currently (found {tuple:?})"); + + (EventProvableHeight::Min(elem), TargetHeights::Min(acc)) => { + Ok(TargetHeights::Min(elem.max(acc))) } - }) - .expect("batch has at least one event; qed;"); - - // at this point we assume that a valid update exists - we only ever enqueue this message behind the relevant WaitForHeight on the counterparty chain. to prevent explosions, we do a sanity check here. - { - let (EventProvableHeight::Min(target_height) - | EventProvableHeight::Exactly(target_height)) = target_height; - - if latest_height < target_height { - // we treat this as a missing state error, since this message assumes the state exists. - return Err(RpcError::missing_state(format!( - "the latest height of the counterparty chain ({counterparty_chain_id}) \ - is {latest_height} and the latest trusted height on the client tracking \ - it ({client_id}) on this chain ({self_chain_id}) is {trusted_height}. \ - in order to create an update for this client, we need to wait for the \ - counterparty chain to progress to the next consensus checkpoint greater \ - than the required target height {target_height}", - counterparty_chain_id = client_state_meta.counterparty_chain_id, - trusted_height = client_state_meta.counterparty_height, - client_id = self.client_id, - self_chain_id = module.chain_id, - )) - .with_data(json!({ - "current_timestamp": now(), - }))); - } - } - match target_height { - EventProvableHeight::Min(target_height) => { + (EventProvableHeight::Exactly(elem), TargetHeights::Exact(acc)) => Ok( + TargetHeights::Exact(acc.into_iter().chain([elem]).collect()), + ), + + (elem, acc) => Err(RpcError::fatal_from_message(format!( + "cannot mix exact and min update heights in a \ + single instance of this plugin: {elem:?}, {acc:?}" + ))), + })?; + + let mut ops = vec![]; + + // TODO: Check the same for exact heights? + match target_heights { + TargetHeights::Min(target_height) => { + // at this point we assume that a valid update exists - we only ever enqueue this message behind the relevant WaitForHeight on the counterparty chain. to prevent explosions, we do a sanity check here. + if latest_height < target_height { + // we treat this as a missing state error, since this message assumes the state exists. + return Err(RpcError::missing_state(format!( + "the latest height of the counterparty chain ({counterparty_chain_id}) \ + is {latest_height} and the latest trusted height on the client tracking \ + it ({client_id}) on this chain ({self_chain_id}) is {trusted_height}. \ + in order to create an update for this client, we need to wait for the \ + counterparty chain to progress to the next consensus checkpoint greater \ + than the required target height {target_height}", + counterparty_chain_id = client_state_meta.counterparty_chain_id, + trusted_height = client_state_meta.counterparty_height, + client_id = self.client_id, + self_chain_id = module.chain_id, + )) + .with_data(json!({ + "current_timestamp": now(), + }))); + } if client_state_meta.counterparty_height >= target_height { info!( "client {client_id} has already been updated to a height \ @@ -128,28 +138,28 @@ where client_id = self.client_id, ); - make_msgs( + ops.push(make_msgs( module, self.client_id, self.batches, None, client_state_meta.clone(), client_state_meta.counterparty_height, - ) + )?); } else { - Ok(promise( + ops.push(promise( [call(FetchUpdateHeaders { - client_type: client_info.client_type, + client_type: client_info.client_type.clone(), counterparty_chain_id: module.chain_id.clone(), - chain_id: client_state_meta.counterparty_chain_id, + chain_id: client_state_meta.counterparty_chain_id.clone(), client_id: RawClientId::new(self.client_id.clone()), update_from: client_state_meta.counterparty_height, update_to: if latest_height.height() < target_height.height() { warn!( "latest height {latest_height} is less than the target \ - height {target_height}, there may be something wrong \ - with the rpc for {} - client {} will be updated to the \ - target height instead of the latest height", + height {target_height}, there may be something wrong \ + with the rpc for {} - client {} will be updated to the \ + target height instead of the latest height", module.chain_id, self.client_id ); target_height @@ -162,61 +172,53 @@ where module.plugin_name(), ModuleCallback::from(MakeIbcMessagesFromUpdate:: { client_id: self.client_id.clone(), - batches: self.batches, + batches: self.batches.clone(), }), ), - )) + )); } } - EventProvableHeight::Exactly(target_height) => { - match client_state_meta.counterparty_height.cmp(&target_height) { - Ordering::Equal => { - info!( - "client {client_id} has already been updated to \ - the desired target height ({} == {target_height})", - client_state_meta.counterparty_height, - client_id = self.client_id, - ); - make_msgs( - module, - self.client_id, - self.batches, - None, - client_state_meta.clone(), - client_state_meta.counterparty_height, - ) - } - Ordering::Less => Ok(promise( - [call(FetchUpdateHeaders { - client_type: client_info.client_type, - counterparty_chain_id: module.chain_id.clone(), - chain_id: client_state_meta.counterparty_chain_id, - client_id: RawClientId::new(self.client_id.clone()), - update_from: client_state_meta.counterparty_height, - update_to: target_height, - })], - [], - PluginMessage::new( - module.plugin_name(), - ModuleCallback::from(MakeIbcMessagesFromUpdate:: { - client_id: self.client_id.clone(), - batches: self.batches, - }), - ), - )), - // update backwards - // currently this is only supported in sui, and as such has some baked-in assumptions about the semantics of when this branch is hit - Ordering::Greater => { - info!( - "updating client to an earlier height ({} -> {target_height})", - client_state_meta.counterparty_height - ); - - Ok(promise( + TargetHeights::None => todo!(), + #[allow(unstable_name_collisions)] + TargetHeights::Exact(target_heights) => { + info!( + "found exact heights: [{}]", + target_heights + .iter() + .map(|h| h.to_string()) + .intersperse(",".to_string()) + .collect::(), + ); + + for events in self.batches { + let EventProvableHeight::Exactly(target_height) = events[0].provable_height + else { + panic!("???") + }; + + match client_state_meta.counterparty_height.cmp(&target_height) { + Ordering::Equal => { + info!( + "client {client_id} has already been updated to \ + the desired target height ({} == {target_height})", + client_state_meta.counterparty_height, + client_id = self.client_id, + ); + + ops.push(make_msgs( + module, + self.client_id.clone(), + vec![events], + None, + client_state_meta.clone(), + client_state_meta.counterparty_height, + )?); + } + Ordering::Less => ops.push(promise( [call(FetchUpdateHeaders { - client_type: client_info.client_type, + client_type: client_info.client_type.clone(), counterparty_chain_id: module.chain_id.clone(), - chain_id: client_state_meta.counterparty_chain_id, + chain_id: client_state_meta.counterparty_chain_id.clone(), client_id: RawClientId::new(self.client_id.clone()), update_from: client_state_meta.counterparty_height, update_to: target_height, @@ -226,14 +228,43 @@ where module.plugin_name(), ModuleCallback::from(MakeIbcMessagesFromUpdate:: { client_id: self.client_id.clone(), - batches: self.batches, + batches: vec![events], }), ), - )) + )), + // update backwards + // currently this is only supported in sui, and as such has some baked-in assumptions about the semantics of when this branch is hit + Ordering::Greater => { + info!( + "updating client to an earlier height ({} -> {target_height})", + client_state_meta.counterparty_height + ); + + ops.push(promise( + [call(FetchUpdateHeaders { + client_type: client_info.client_type.clone(), + counterparty_chain_id: module.chain_id.clone(), + chain_id: client_state_meta.counterparty_chain_id.clone(), + client_id: RawClientId::new(self.client_id.clone()), + update_from: client_state_meta.counterparty_height, + update_to: target_height, + })], + [], + PluginMessage::new( + module.plugin_name(), + ModuleCallback::from(MakeIbcMessagesFromUpdate:: { + client_id: self.client_id.clone(), + batches: vec![events], + }), + ), + )); + } } } } } + + Ok(conc(ops)) } }