Skip to content

Commit 2221a93

Browse files
authored
implement p2p service auto recover on failure (#487)
1 parent c96f2bc commit 2221a93

File tree

1 file changed

+104
-20
lines changed

1 file changed

+104
-20
lines changed

crates/worker/src/p2p/service.rs

Lines changed: 104 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ pub struct P2PContext {
3939
pub provider_wallet: Wallet,
4040
}
4141

42+
#[derive(Clone)]
4243
pub struct P2PService {
4344
endpoint: Endpoint,
45+
secret_key: SecretKey,
4446
node_id: String,
4547
listening_addrs: Vec<String>,
4648
cancellation_token: CancellationToken,
@@ -49,6 +51,11 @@ pub struct P2PService {
4951
wallet: Wallet,
5052
}
5153

54+
enum EndpointLoopResult {
55+
Shutdown,
56+
EndpointClosed,
57+
}
58+
5259
impl P2PService {
5360
/// Create a new P2P service with a unique worker identity
5461
pub async fn new(
@@ -74,7 +81,7 @@ impl P2PService {
7481

7582
// Create the endpoint
7683
let endpoint = Endpoint::builder()
77-
.secret_key(secret_key)
84+
.secret_key(secret_key.clone())
7885
.alpns(vec![PRIME_P2P_PROTOCOL.to_vec()])
7986
.discovery_n0()
8087
.relay_mode(RelayMode::Default)
@@ -93,6 +100,7 @@ impl P2PService {
93100

94101
Ok(Self {
95102
endpoint,
103+
secret_key,
96104
node_id,
97105
listening_addrs,
98106
cancellation_token,
@@ -111,33 +119,109 @@ impl P2PService {
111119
pub fn listening_addresses(&self) -> &[String] {
112120
&self.listening_addrs
113121
}
114-
/// Start accepting incoming connections
122+
123+
/// Recreate the endpoint with the same identity
124+
async fn recreate_endpoint(&self) -> Result<Endpoint> {
125+
info!("Recreating P2P endpoint with node ID: {}", self.node_id);
126+
127+
let endpoint = Endpoint::builder()
128+
.secret_key(self.secret_key.clone())
129+
.alpns(vec![PRIME_P2P_PROTOCOL.to_vec()])
130+
.discovery_n0()
131+
.relay_mode(RelayMode::Default)
132+
.bind()
133+
.await?;
134+
135+
let node_addr = endpoint.node_addr().await?;
136+
let listening_addrs = node_addr
137+
.direct_addresses
138+
.iter()
139+
.map(|addr| addr.to_string())
140+
.collect::<Vec<_>>();
141+
142+
info!(
143+
"P2P endpoint recreated, listening on: {:?}",
144+
listening_addrs
145+
);
146+
Ok(endpoint)
147+
}
148+
/// Start accepting incoming connections with automatic recovery
115149
pub async fn start(&self) -> Result<()> {
116-
let endpoint = self.endpoint.clone();
150+
let service = Arc::new(self.clone());
117151
let cancellation_token = self.cancellation_token.clone();
118-
let context = self.context.clone();
119-
let allowed_addresses = self.allowed_addresses.clone();
120-
let wallet = self.wallet.clone();
152+
121153
tokio::spawn(async move {
122-
loop {
123-
tokio::select! {
124-
_ = cancellation_token.cancelled() => {
125-
info!("P2P service shutting down");
126-
break;
127-
}
128-
incoming = endpoint.accept() => {
129-
if let Some(incoming) = incoming {
130-
tokio::spawn(Self::handle_connection(incoming, context.clone(), allowed_addresses.clone(), wallet.clone()));
131-
} else {
132-
warn!("P2P endpoint closed");
133-
break;
154+
service.run_with_recovery(cancellation_token).await;
155+
});
156+
157+
Ok(())
158+
}
159+
160+
/// Run the P2P service with automatic endpoint recovery
161+
async fn run_with_recovery(&self, cancellation_token: CancellationToken) {
162+
let mut endpoint = self.endpoint.clone();
163+
let mut retry_delay = Duration::from_secs(1);
164+
const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
165+
166+
loop {
167+
tokio::select! {
168+
_ = cancellation_token.cancelled() => {
169+
info!("P2P service shutting down");
170+
break;
171+
}
172+
result = self.run_endpoint_loop(&endpoint, &cancellation_token) => {
173+
match result {
174+
EndpointLoopResult::Shutdown => break,
175+
EndpointLoopResult::EndpointClosed => {
176+
warn!("P2P endpoint closed, attempting recovery in {:?}", retry_delay);
177+
178+
tokio::select! {
179+
_ = cancellation_token.cancelled() => break,
180+
_ = tokio::time::sleep(retry_delay) => {}
181+
}
182+
183+
match self.recreate_endpoint().await {
184+
Ok(new_endpoint) => {
185+
info!("P2P endpoint successfully recovered");
186+
endpoint = new_endpoint;
187+
retry_delay = Duration::from_secs(1);
188+
}
189+
Err(e) => {
190+
error!("Failed to recreate P2P endpoint: {}", e);
191+
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
192+
}
193+
}
134194
}
135195
}
136196
}
137197
}
138-
});
198+
}
199+
}
139200

140-
Ok(())
201+
/// Run the main endpoint acceptance loop
202+
async fn run_endpoint_loop(
203+
&self,
204+
endpoint: &Endpoint,
205+
cancellation_token: &CancellationToken,
206+
) -> EndpointLoopResult {
207+
let context = self.context.clone();
208+
let allowed_addresses = self.allowed_addresses.clone();
209+
let wallet = self.wallet.clone();
210+
211+
loop {
212+
tokio::select! {
213+
_ = cancellation_token.cancelled() => {
214+
return EndpointLoopResult::Shutdown;
215+
}
216+
incoming = endpoint.accept() => {
217+
if let Some(incoming) = incoming {
218+
tokio::spawn(Self::handle_connection(incoming, context.clone(), allowed_addresses.clone(), wallet.clone()));
219+
} else {
220+
return EndpointLoopResult::EndpointClosed;
221+
}
222+
}
223+
}
224+
}
141225
}
142226

143227
/// Handle an incoming connection

0 commit comments

Comments
 (0)