Skip to content

Commit 359fe81

Browse files
authored
feat: index info fallback (#1068)
* make gossip client index and info method to accept a full peer address * add fallback for get_info and get_block_index * fix clippy * add transaction fallback * add protocol version endpoint
1 parent 4826847 commit 359fe81

File tree

3 files changed

+158
-47
lines changed

3 files changed

+158
-47
lines changed

crates/p2p/src/chain_sync.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ async fn pull_highest_blocks(
10561056
HashMap::new();
10571057

10581058
for (miner_address, peer) in peers {
1059-
match gossip_client.get_info(peer.address.gossip).await {
1059+
match gossip_client.get_info(peer.address).await {
10601060
Ok(info) => {
10611061
let hash = info.block_hash;
10621062
let height = info.height;
@@ -1216,7 +1216,7 @@ async fn check_and_update_full_validation_switch_height(
12161216

12171217
for (_, peer) in trusted_peers.iter() {
12181218
debug!("Sync task: Trusted peer: {:?}", peer);
1219-
let node_info = match gossip_client.get_info(peer.address.gossip).await {
1219+
let node_info = match gossip_client.get_info(peer.address).await {
12201220
Ok(info) => info,
12211221
Err(err) => {
12221222
warn!(
@@ -1313,7 +1313,7 @@ async fn get_block_index(
13131313

13141314
match gossip_client
13151315
.get_block_index(
1316-
peer.address.gossip,
1316+
peer.address,
13171317
BlockIndexQuery {
13181318
height: start,
13191319
limit,
@@ -1370,7 +1370,7 @@ async fn is_local_index_is_behind_trusted_peers(
13701370

13711371
for (_, peer) in trusted_peers.iter() {
13721372
debug!("Sync task: Trusted peer: {:?}", peer);
1373-
let node_info = match gossip_client.get_info(peer.address.gossip).await {
1373+
let node_info = match gossip_client.get_info(peer.address).await {
13741374
Ok(info) => info,
13751375
Err(err) => {
13761376
warn!("Sync task: Failed to fetch node info from trusted peer {}: {}, trying another peer", peer.address.gossip, err);
@@ -1418,7 +1418,7 @@ async fn estimate_canonical_height(
14181418
debug!("Sync task: Trusted peer: {:?}", peer);
14191419
let gossip_client = gossip_client.clone();
14201420
async move {
1421-
match gossip_client.get_info(peer.address.gossip).await {
1421+
match gossip_client.get_info(peer.address).await {
14221422
Ok(info) => Some((info.block_index_height, info.cumulative_difficulty)),
14231423
Err(err) => {
14241424
warn!("Sync task: Failed to fetch node info from trusted peer {}: {}, trying another peer", peer.address.gossip, err);
@@ -1464,7 +1464,7 @@ async fn synced_peers_sorted_by_cumulative_diff(
14641464
let peers_and_diffs_futures = peers.into_iter().map(|(addr, peer)| {
14651465
let gossip_client = gossip_client.clone();
14661466
async move {
1467-
match gossip_client.get_info(peer.address.gossip).await {
1467+
match gossip_client.get_info(peer.address).await {
14681468
Ok(info) => {
14691469
if !info.is_syncing {
14701470
Ok((addr, peer, info.cumulative_difficulty))

crates/p2p/src/gossip_client.rs

Lines changed: 139 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::types::{GossipError, GossipResponse, GossipResult, RejectionReason};
66
use crate::GossipCache;
77
use core::time::Duration;
88
use futures::StreamExt as _;
9+
use irys_api_client::{ApiClient as _, IrysApiClient};
910
use irys_domain::{PeerList, ScoreDecreaseReason, ScoreIncreaseReason};
1011
use irys_types::{
1112
AcceptedResponse, BlockHash, BlockIndexItem, BlockIndexQuery, GossipCacheKey, GossipData,
@@ -45,6 +46,8 @@ pub struct GossipClient {
4546
}
4647

4748
impl GossipClient {
49+
pub const CURRENT_PROTOCOL_VERSION: u32 = 1;
50+
4851
#[must_use]
4952
pub fn new(timeout: Duration, mining_address: IrysAddress) -> Self {
5053
Self {
@@ -108,36 +111,75 @@ impl GossipClient {
108111
let res = self.send_data_internal(url, &requested_data).await;
109112
let response_time = start_time.elapsed();
110113
Self::handle_data_retrieval_score(peer_list, &res, &peer.0, response_time);
114+
115+
if !matches!(&res, Ok(GossipResponse::Accepted(Some(_)))) {
116+
if let GossipDataRequest::Transaction(tx_id) = requested_data {
117+
debug!(
118+
"Gossip pull_data failed for transaction {}, falling back to API client",
119+
tx_id
120+
);
121+
let api_client = IrysApiClient::new();
122+
if let Ok(tx_response) = api_client.get_transaction(peer.1.address.api, tx_id).await
123+
{
124+
let gossip_data = match tx_response {
125+
IrysTransactionResponse::Storage(tx) => GossipData::Transaction(tx),
126+
IrysTransactionResponse::Commitment(tx) => {
127+
GossipData::CommitmentTransaction(tx)
128+
}
129+
};
130+
return Ok(GossipResponse::Accepted(Some(gossip_data)));
131+
}
132+
}
133+
}
134+
111135
res
112136
}
113137

114-
pub async fn get_info(&self, peer: SocketAddr) -> Result<NodeInfo, GossipClientError> {
115-
let url = format!("http://{}/gossip/info", peer);
116-
let response = self
117-
.internal_client()
118-
.get(&url)
119-
.send()
120-
.await
121-
.map_err(|error| GossipClientError::GetRequest(peer.to_string(), error.to_string()))?;
138+
pub async fn get_info(&self, peer: PeerAddress) -> Result<NodeInfo, GossipClientError> {
139+
let url = format!("http://{}/gossip/info", peer.gossip);
140+
let response = self.internal_client().get(&url).send().await;
122141

123-
if !response.status().is_success() {
124-
return Err(GossipClientError::GetRequest(
125-
peer.to_string(),
126-
response.status().to_string(),
127-
));
142+
let gossip_result = match response {
143+
Ok(resp) => {
144+
if !resp.status().is_success() {
145+
Err(GossipClientError::GetRequest(
146+
peer.gossip.to_string(),
147+
resp.status().to_string(),
148+
))
149+
} else {
150+
match resp.json::<GossipResponse<NodeInfo>>().await {
151+
Ok(GossipResponse::Accepted(info)) => Ok(info),
152+
Ok(GossipResponse::Rejected(reason)) => Err(GossipClientError::GetRequest(
153+
peer.gossip.to_string(),
154+
format!("Request rejected: {:?}", reason),
155+
)),
156+
Err(e) => Err(GossipClientError::GetJsonResponsePayload(
157+
peer.gossip.to_string(),
158+
e.to_string(),
159+
)),
160+
}
161+
}
162+
}
163+
Err(e) => Err(GossipClientError::GetRequest(
164+
peer.gossip.to_string(),
165+
e.to_string(),
166+
)),
167+
};
168+
169+
if let Ok(info) = gossip_result {
170+
return Ok(info);
128171
}
129172

130-
let response: GossipResponse<NodeInfo> = response.json().await.map_err(|error| {
131-
GossipClientError::GetJsonResponsePayload(peer.to_string(), error.to_string())
132-
})?;
173+
debug!(
174+
"Gossip get_info failed for peer {}, falling back to API client",
175+
peer.gossip
176+
);
133177

134-
match response {
135-
GossipResponse::Accepted(info) => Ok(info),
136-
GossipResponse::Rejected(reason) => Err(GossipClientError::GetRequest(
137-
peer.to_string(),
138-
format!("Request rejected: {:?}", reason),
139-
)),
140-
}
178+
let api_client = IrysApiClient::new();
179+
api_client
180+
.node_info(peer.api)
181+
.await
182+
.map_err(|e| GossipClientError::GetRequest(peer.api.to_string(), e.to_string()))
141183
}
142184

143185
pub async fn get_peer_list(
@@ -228,37 +270,76 @@ impl GossipClient {
228270

229271
pub async fn get_block_index(
230272
&self,
231-
peer: SocketAddr,
273+
peer: PeerAddress,
232274
query: BlockIndexQuery,
233275
) -> Result<Vec<BlockIndexItem>, GossipClientError> {
234-
let url = format!("http://{}/gossip/block-index", peer);
276+
let url = format!("http://{}/gossip/block-index", peer.gossip);
277+
let response = self.internal_client().get(&url).query(&query).send().await;
278+
279+
let gossip_result = match response {
280+
Ok(resp) => {
281+
if !resp.status().is_success() {
282+
Err(GossipClientError::GetRequest(
283+
peer.gossip.to_string(),
284+
resp.status().to_string(),
285+
))
286+
} else {
287+
match resp.json::<GossipResponse<Vec<BlockIndexItem>>>().await {
288+
Ok(GossipResponse::Accepted(index)) => Ok(index),
289+
Ok(GossipResponse::Rejected(reason)) => Err(GossipClientError::GetRequest(
290+
peer.gossip.to_string(),
291+
format!("Request rejected: {:?}", reason),
292+
)),
293+
Err(e) => Err(GossipClientError::GetJsonResponsePayload(
294+
peer.gossip.to_string(),
295+
e.to_string(),
296+
)),
297+
}
298+
}
299+
}
300+
Err(e) => Err(GossipClientError::GetRequest(
301+
peer.gossip.to_string(),
302+
e.to_string(),
303+
)),
304+
};
305+
306+
if let Ok(index) = gossip_result {
307+
return Ok(index);
308+
}
309+
310+
debug!(
311+
"Gossip get_block_index failed for peer {}, falling back to API client",
312+
peer.gossip
313+
);
314+
315+
let api_client = IrysApiClient::new();
316+
api_client
317+
.get_block_index(peer.api, query)
318+
.await
319+
.map_err(|e| GossipClientError::GetRequest(peer.api.to_string(), e.to_string()))
320+
}
321+
322+
pub async fn get_protocol_version(&self, peer: PeerAddress) -> Result<u32, GossipClientError> {
323+
let url = format!("http://{}/gossip/protocol_version", peer.gossip);
235324
let response = self
236325
.internal_client()
237326
.get(&url)
238-
.query(&query)
239327
.send()
240328
.await
241-
.map_err(|error| GossipClientError::GetRequest(peer.to_string(), error.to_string()))?;
329+
.map_err(|error| {
330+
GossipClientError::GetRequest(peer.gossip.to_string(), error.to_string())
331+
})?;
242332

243333
if !response.status().is_success() {
244334
return Err(GossipClientError::GetRequest(
245-
peer.to_string(),
335+
peer.gossip.to_string(),
246336
response.status().to_string(),
247337
));
248338
}
249339

250-
let response: GossipResponse<Vec<BlockIndexItem>> =
251-
response.json().await.map_err(|error| {
252-
GossipClientError::GetJsonResponsePayload(peer.to_string(), error.to_string())
253-
})?;
254-
255-
match response {
256-
GossipResponse::Accepted(index) => Ok(index),
257-
GossipResponse::Rejected(reason) => Err(GossipClientError::GetRequest(
258-
peer.to_string(),
259-
format!("Request rejected: {:?}", reason),
260-
)),
261-
}
340+
response.json().await.map_err(|error| {
341+
GossipClientError::GetJsonResponsePayload(peer.gossip.to_string(), error.to_string())
342+
})
262343
}
263344

264345
pub async fn check_health(
@@ -1646,4 +1727,22 @@ mod tests {
16461727
assert!(final_score <= PeerScore::MAX);
16471728
}
16481729
}
1730+
1731+
mod protocol_version_tests {
1732+
use super::*;
1733+
1734+
#[tokio::test]
1735+
async fn test_get_protocol_version() {
1736+
let server = MockHttpServer::new_with_response(200, "1", "application/json");
1737+
let fixture = TestFixture::new();
1738+
let peer = create_peer_address("127.0.0.1", server.port());
1739+
1740+
let version = fixture
1741+
.client
1742+
.get_protocol_version(peer)
1743+
.await
1744+
.expect("to get version");
1745+
assert_eq!(version, 1);
1746+
}
1747+
}
16491748
}

crates/p2p/src/server.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,14 @@ where
561561
HttpResponse::Ok().json(GossipResponse::Accepted(requested_blocks))
562562
}
563563

564+
#[expect(
565+
clippy::unused_async,
566+
reason = "Actix-web handler signature requires handlers to be async"
567+
)]
568+
async fn handle_protocol_version() -> HttpResponse {
569+
HttpResponse::Ok().json(crate::gossip_client::GossipClient::CURRENT_PROTOCOL_VERSION)
570+
}
571+
564572
fn handle_invalid_data(
565573
peer_miner_address: &IrysAddress,
566574
error: &GossipError,
@@ -708,7 +716,11 @@ where
708716
.route("/info", web::get().to(Self::handle_info))
709717
.route("/peer-list", web::get().to(Self::handle_peer_list))
710718
.route("/version", web::post().to(Self::handle_version))
711-
.route("/block-index", web::get().to(Self::handle_block_index)),
719+
.route("/block-index", web::get().to(Self::handle_block_index))
720+
.route(
721+
"/protocol_version",
722+
web::get().to(Self::handle_protocol_version),
723+
),
712724
)
713725
})
714726
.shutdown_timeout(5)

0 commit comments

Comments
 (0)