Skip to content

Commit 5edcce4

Browse files
committed
refactor(hermes): clean exit
1 parent a7d133d commit 5edcce4

File tree

6 files changed

+127
-68
lines changed

6 files changed

+127
-68
lines changed

hermes/src/api.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use {
1111
Router,
1212
},
1313
serde_qs::axum::QsQueryConfig,
14-
std::sync::Arc,
14+
std::sync::{
15+
atomic::Ordering,
16+
Arc,
17+
},
1518
tokio::{
1619
signal,
1720
sync::mpsc::Receiver,
@@ -95,34 +98,37 @@ pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()
9598
.with_state(state.clone())
9699
// Permissive CORS layer to allow all origins
97100
.layer(CorsLayer::permissive())
98-
// non-strict mode permits escaped [] in URL parameters.
99-
// 5 is the allowed depth (also the default value for this parameter).
101+
// Non-strict mode permits escaped [] in URL parameters. 5 is the allowed depth (also the
102+
// default value for this parameter).
100103
.layer(Extension(QsQueryConfig::new(5, false)));
101104

102-
103105
// Call dispatch updates to websocket every 1 seconds
104106
// FIXME use a channel to get updates from the store
105107
tokio::spawn(async move {
106-
loop {
107-
// Panics if the update channel is closed, which should never happen.
108-
// If it happens we have no way to recover, so we just panic.
109-
update_rx
110-
.recv()
111-
.await
112-
.expect("state update channel is closed");
108+
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
109+
// Causes a full application shutdown if an error occurs, we can't recover from this so
110+
// we just quit.
111+
if update_rx.recv().await.is_none() {
112+
log::error!("Failed to receive update from store.");
113+
crate::SHOULD_EXIT.store(true, Ordering::Release);
114+
break;
115+
}
113116

114117
notify_updates(state.ws.clone()).await;
115118
}
119+
120+
log::info!("Shutting down websocket updates...")
116121
});
117122

118123
// Binds the axum's server to the configured address and port. This is a blocking call and will
119124
// not return until the server is shutdown.
120125
axum::Server::try_bind(&opts.api_addr)?
121126
.serve(app.into_make_service())
122127
.with_graceful_shutdown(async {
123-
signal::ctrl_c()
124-
.await
125-
.expect("Ctrl-c signal handler failed.");
128+
// Ignore Ctrl+C errors, either way we need to shut down. The main Ctrl+C handler
129+
// should also have triggered so we will let that one print the shutdown warning.
130+
let _ = signal::ctrl_c().await;
131+
crate::SHOULD_EXIT.store(true, Ordering::Release);
126132
})
127133
.await?;
128134

hermes/src/main.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
use {
66
crate::store::Store,
77
anyhow::Result,
8+
futures::future::join_all,
9+
std::sync::atomic::AtomicBool,
810
structopt::StructOpt,
11+
tokio::spawn,
912
};
1013

1114
mod api;
@@ -15,6 +18,14 @@ mod macros;
1518
mod network;
1619
mod store;
1720

21+
// A static exit flag to indicate to running threads that we're shutting down. This is used to
22+
// gracefully shutdown the application.
23+
//
24+
// NOTE: A more idiomatic approach would be to use a tokio::sync::broadcast channel, and to send a
25+
// shutdown signal to all running tasks. However, this is a bit more complicated to implement and
26+
// we don't rely on global state for anything else.
27+
pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
28+
1829
/// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin.
1930
async fn init() -> Result<()> {
2031
log::info!("Initializing Hermes...");
@@ -29,35 +40,46 @@ async fn init() -> Result<()> {
2940
let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
3041

3142
// Initialize a cache store with a 1000 element circular buffer.
32-
let store = Store::new(update_tx, 1000);
43+
let store = Store::new(update_tx.clone(), 1000);
44+
45+
// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We
46+
// also send off any notifications needed to close off any waiting tasks.
47+
spawn(async move {
48+
tokio::signal::ctrl_c().await.unwrap();
49+
log::info!("Shut down signal received, waiting for tasks...");
50+
SHOULD_EXIT.store(true, std::sync::atomic::Ordering::Release);
51+
let _ = update_tx.send(()).await;
52+
});
3353

34-
network::p2p::spawn(opts.clone(), store.clone()).await?;
35-
network::pythnet::spawn(opts.clone(), store.clone()).await?;
36-
api::run(opts.clone(), store.clone(), update_rx).await?;
54+
// Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown
55+
// signal has been observed).
56+
let tasks = join_all([
57+
Box::pin(spawn(network::p2p::spawn(opts.clone(), store.clone()))),
58+
Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))),
59+
Box::pin(spawn(api::run(opts.clone(), store.clone(), update_rx))),
60+
])
61+
.await;
62+
63+
for task in tasks {
64+
task??;
65+
}
3766
}
3867
}
3968

4069
Ok(())
4170
}
4271

4372
#[tokio::main]
44-
async fn main() -> Result<!> {
73+
async fn main() -> Result<()> {
4574
env_logger::init();
4675

47-
tokio::spawn(async move {
48-
// Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
49-
// should be set to 1 for this otherwise it will only print the top-level error.
50-
if let Err(result) = init().await {
51-
eprintln!("{}", result.backtrace());
52-
for cause in result.chain() {
53-
eprintln!("{cause}");
54-
}
55-
std::process::exit(1);
56-
}
57-
});
76+
// Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
77+
// should be set to 1 for this otherwise it will only print the top-level error.
78+
if let Err(result) = init().await {
79+
eprintln!("{}", result.backtrace());
80+
result.chain().for_each(|cause| eprintln!("{cause}"));
81+
std::process::exit(1);
82+
}
5883

59-
// TODO: Setup a Ctrl-C handler that waits. We use process::exit(0) for now but we should have
60-
// a graceful shutdown with an AtomicBool or similar before production.
61-
tokio::signal::ctrl_c().await?;
62-
std::process::exit(0);
84+
Ok(())
6385
}

hermes/src/network/p2p.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use {
2525
CString,
2626
},
2727
sync::{
28+
atomic::Ordering,
2829
mpsc::{
2930
Receiver,
3031
Sender,
@@ -76,13 +77,15 @@ extern "C" fn proxy(o: ObservationC) {
7677
let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned();
7778
// The chances of the mutex getting poisioned is very low and if it happens
7879
// there is no way for us to recover from it.
79-
if let Err(e) = OBSERVATIONS
80+
if OBSERVATIONS
8081
.0
8182
.lock()
82-
.expect("Cannot acquire p2p channel lock")
83-
.send(vaa)
83+
.map_err(|_| ())
84+
.and_then(|tx| tx.send(vaa).map_err(|_| ()))
85+
.is_err()
8486
{
85-
log::error!("Failed to send observation: {}", e);
87+
log::error!("Failed to lock p2p observation channel or to send observation.");
88+
crate::SHOULD_EXIT.store(true, Ordering::Release);
8689
}
8790
}
8891

@@ -129,40 +132,47 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
129132
log::info!("Starting P2P server on {:?}", opts.wh_listen_addrs);
130133

131134
std::thread::spawn(|| {
132-
bootstrap(
135+
if bootstrap(
133136
opts.wh_network_id,
134137
opts.wh_bootstrap_addrs,
135138
opts.wh_listen_addrs,
136139
)
137-
.unwrap()
140+
.is_err()
141+
{
142+
log::error!("Failed to bootstrap P2P server.");
143+
crate::SHOULD_EXIT.store(true, Ordering::Release);
144+
}
138145
});
139146

140147
tokio::spawn(async move {
141148
// Listen in the background for new VAA's from the p2p layer
142149
// and update the state accordingly.
143-
loop {
150+
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
144151
let vaa_bytes = tokio::task::spawn_blocking(|| {
145152
let observation = OBSERVATIONS.1.lock();
146153
let observation = match observation {
147154
Ok(observation) => observation,
148155
Err(e) => {
149156
// This should never happen, but if it does, we want to panic and crash
150157
// as it is not recoverable.
151-
panic!("Failed to lock p2p observation channel: {e}");
158+
log::error!("Failed to lock p2p observation channel: {e}");
159+
crate::SHOULD_EXIT.store(true, Ordering::Release);
160+
return Err(anyhow::anyhow!("Failed to lock p2p observation channel"));
152161
}
153162
};
154163

155164
match observation.recv() {
156-
Ok(vaa_bytes) => vaa_bytes,
165+
Ok(vaa_bytes) => Ok(vaa_bytes),
157166
Err(e) => {
158-
// This should never happen, but if it does, we want to panic and crash
159-
// as it is not recoverable.
160-
panic!("Failed to receive p2p observation: {e}");
167+
// This should never happen, but if it does, we want to shutdown the
168+
// application as it is unrecoverable.
169+
log::error!("Failed to receive p2p observation: {e}");
170+
crate::SHOULD_EXIT.store(true, Ordering::Release);
171+
Err(anyhow::anyhow!("Failed to receive p2p observation."))
161172
}
162173
}
163174
})
164-
.await
165-
.unwrap();
175+
.await??;
166176

167177
let store = store.clone();
168178
tokio::spawn(async move {
@@ -171,6 +181,9 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
171181
}
172182
});
173183
}
184+
185+
log::info!("Shutting down P2P server...");
186+
Ok::<(), anyhow::Error>(())
174187
});
175188

176189
Ok(())

hermes/src/network/pythnet.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ use {
4747
system_program,
4848
},
4949
std::{
50-
sync::Arc,
50+
sync::{
51+
atomic::Ordering,
52+
Arc,
53+
},
5154
time::Duration,
5255
},
5356
tokio::time::Instant,
@@ -125,7 +128,7 @@ async fn fetch_bridge_data(
125128
}
126129
}
127130

128-
pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
131+
pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
129132
let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;
130133

131134
let config = RpcProgramAccountsConfig {
@@ -147,7 +150,7 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
147150
.program_subscribe(&system_program::id(), Some(config))
148151
.await?;
149152

150-
loop {
153+
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
151154
match notif.next().await {
152155
Some(update) => {
153156
let account: Account = match update.value.account.decode() {
@@ -198,6 +201,8 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
198201
}
199202
}
200203
}
204+
205+
Ok(())
201206
}
202207

203208
/// Fetch existing GuardianSet accounts from Wormhole.
@@ -264,33 +269,42 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
264269
)
265270
.await?;
266271

267-
{
272+
let task_listener = {
268273
let store = store.clone();
269274
let pythnet_ws_endpoint = opts.pythnet_ws_endpoint.clone();
270275
tokio::spawn(async move {
271-
loop {
276+
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
272277
let current_time = Instant::now();
273278

274279
if let Err(ref e) = run(store.clone(), pythnet_ws_endpoint.clone()).await {
275280
log::error!("Error in Pythnet network listener: {:?}", e);
276-
}
277-
278-
if current_time.elapsed() < Duration::from_secs(30) {
279-
log::error!(
280-
"Pythnet network listener is restarting too quickly. Sleeping for 1s"
281-
);
282-
tokio::time::sleep(Duration::from_secs(1)).await;
281+
if current_time.elapsed() < Duration::from_secs(30) {
282+
log::error!(
283+
"Pythnet network listener restarting too quickly. Sleeping for 1s"
284+
);
285+
tokio::time::sleep(Duration::from_secs(1)).await;
286+
}
283287
}
284288
}
285-
});
286-
}
287289

288-
{
290+
log::info!("Shutting down Pythnet listener...");
291+
})
292+
};
293+
294+
let task_guadian_watcher = {
289295
let store = store.clone();
290296
let pythnet_http_endpoint = opts.pythnet_http_endpoint.clone();
291297
tokio::spawn(async move {
292-
loop {
293-
tokio::time::sleep(Duration::from_secs(60)).await;
298+
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
299+
// Poll for new guardian sets every 60 seconds. We use a short wait time so we can
300+
// properly exit if a quit signal was received. This isn't a perfect solution, but
301+
// it's good enough for now.
302+
for _ in 0..60 {
303+
if crate::SHOULD_EXIT.load(Ordering::Acquire) {
304+
break;
305+
}
306+
tokio::time::sleep(Duration::from_secs(1)).await;
307+
}
294308

295309
match fetch_existing_guardian_sets(
296310
store.clone(),
@@ -305,8 +319,11 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
305319
}
306320
}
307321
}
308-
});
309-
}
310322

323+
log::info!("Shutting down Pythnet guardian set poller...");
324+
})
325+
};
326+
327+
let _ = tokio::join!(task_listener, task_guadian_watcher);
311328
Ok(())
312329
}

hermes/src/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ mod test {
341341
/// Generate list of updates for the given list of messages at a given slot with given sequence
342342
///
343343
/// Sequence in Vaas is used to filter duplicate messages (as by wormhole design there is only
344-
/// one message per sequence)
344+
/// one message per sequence).
345345
pub fn generate_update(messages: Vec<Message>, slot: Slot, sequence: u64) -> Vec<Update> {
346346
let mut updates = Vec::new();
347347

hermes/src/store/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ mod test {
296296
}
297297
}
298298

299+
#[cfg(test)]
299300
pub async fn create_and_store_dummy_price_feed_message_state(
300301
storage: &Storage,
301302
feed_id: FeedId,

0 commit comments

Comments
 (0)