diff --git a/Cargo.lock b/Cargo.lock index f2da978..480c42a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3931,7 +3931,7 @@ dependencies = [ [[package]] name = "pythnet-watcher" -version = "1.1.0" +version = "1.1.1" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 383e144..4bc7201 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pythnet-watcher" -version = "1.1.0" +version = "1.1.1" edition = "2021" [dependencies] diff --git a/src/main.rs b/src/main.rs index b920415..49f633b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -156,69 +156,74 @@ async fn run_listener(input: RunListenerInput) -> anyhow::Result<()> { ) .await?; - tokio::select! { - update = stream.next() => { - let Some(update) = update else { - tracing::error!("Failed to receive update"); - tokio::spawn(async move { unsubscribe().await }); - bail!("Stream ended"); - }; - let started = Instant::now(); - let unreliable_data = decode_and_verify_update(&input.wormhole_pid, &input.accumulator_address, update); - let status = if unreliable_data.is_err() { - "error" - } else { - "success" - }; - let duration = started.elapsed(); - metrics::histogram!("decode_and_verify_observed_messages_duration").record( - duration.as_secs_f64(), - ); - metrics::counter!("decode_and_verify_observed_messages", &[("status", status)]).increment(1); - if let Ok(unreliable_data) = unreliable_data { - tokio::spawn({ - let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone()); - async move { - let started = Instant::now(); - let body = message_data_to_body(&unreliable_data); - let status = match Observation::try_new(body.clone(), signer.clone()).await { - Ok(observation) => { - join_all(api_clients.iter().map(|api_client| { - let observation = observation.clone(); - let api_client = api_client.clone(); - async move { - if let Err(e) = api_client.post_observation(observation).await { - tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); - } else { - tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); + let mut exit_receiver = EXIT.subscribe(); + // Check if the exit flag is already set, if so, we don't need to wait. + if *exit_receiver.borrow() { + tracing::info!("Received exit signal, stopping pythnet watcher"); + return Ok(()); + } + + loop { + tokio::select! { + update = stream.next() => { + let Some(update) = update else { + tracing::error!("Failed to receive update"); + tokio::spawn(async move { unsubscribe().await }); + bail!("Stream ended"); + }; + let started = Instant::now(); + let unreliable_data = decode_and_verify_update(&input.wormhole_pid, &input.accumulator_address, update); + let status = if unreliable_data.is_err() { + "error" + } else { + "success" + }; + let duration = started.elapsed(); + metrics::histogram!("decode_and_verify_observed_messages_duration").record( + duration.as_secs_f64(), + ); + metrics::counter!("decode_and_verify_observed_messages", &[("status", status)]).increment(1); + if let Ok(unreliable_data) = unreliable_data { + tokio::spawn({ + let (api_clients, signer) = (input.api_clients.clone(), input.signer.clone()); + async move { + let started = Instant::now(); + let body = message_data_to_body(&unreliable_data); + let status = match Observation::try_new(body.clone(), signer.clone()).await { + Ok(observation) => { + join_all(api_clients.iter().map(|api_client| { + let observation = observation.clone(); + let api_client = api_client.clone(); + async move { + if let Err(e) = api_client.post_observation(observation).await { + tracing::warn!(url = api_client.get_base_url().to_string(), error = ?e, "Failed to post observation"); + } else { + tracing::info!(url = api_client.get_base_url().to_string(), "Observation posted successfully"); + } } - } - })).await; - "success" - } - Err(e) => { - tracing::error!(error = ?e, "Failed to create observation"); - "error" - } - }; - let duration = started.elapsed(); - metrics::histogram!("create_and_post_observation_duration").record( - duration.as_secs_f64(), - ); - metrics::counter!("create_and_post_observation", &[("status", status)]).increment(1); - } - }); + })).await; + "success" + } + Err(e) => { + tracing::error!(error = ?e, "Failed to create observation"); + "error" + } + }; + let duration = started.elapsed(); + metrics::histogram!("create_and_post_observation_duration").record( + duration.as_secs_f64(), + ); + metrics::counter!("create_and_post_observation", &[("status", status)]).increment(1); + } + }); + } + } + _ = exit_receiver.changed() => { + tracing::info!("Received exit signal, stopping pythnet watcher"); + return Ok(()) } - } - _ = wait_for_exit() => { - tracing::info!("Received exit signal, stopping pythnet watcher"); - return Ok(()) } } - - tokio::spawn(async move { unsubscribe().await }); - - bail!("Stream ended") } async fn get_signer(run_options: config::RunOptions) -> anyhow::Result> { @@ -293,6 +298,10 @@ where break; } } + + if *EXIT.borrow() { + break; + } } }