Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repository = "https://github.com/astraly-labs/pragma-common"
license = "MIT"

[features]
default = []
serde = ["dep:serde"]
borsh = ["dep:borsh"]
proto = ["dep:prost"]
Expand Down
117 changes: 96 additions & 21 deletions src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/// <https://github.com/madara-alliance/madara/blob/main/crates/madara/primitives/utils/src/service.rs>
use std::{panic, time::Duration};

use anyhow::Context;
use anyhow::{anyhow, Context};
use futures::Future;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -71,7 +71,7 @@ pub trait Service: 'static + Send + Sync {
let runner = ServiceRunner::new(ctx, &mut join_set);

self.start(runner).await.context("Starting service")?;
drive_joinset(join_set).await
drive_critical_joinset(join_set).await
}
}

Expand Down Expand Up @@ -109,54 +109,112 @@ impl<'a> ServiceRunner<'a> {
/// A group of services that can be started together
#[derive(Default)]
pub struct ServiceGroup {
services: Vec<Box<dyn Service>>,
join_set: Option<JoinSet<anyhow::Result<()>>>,
critical_services: Vec<Box<dyn Service>>,
auxiliary_services: Vec<Box<dyn Service>>,
critical_join_set: Option<JoinSet<anyhow::Result<()>>>,
auxiliary_join_set: Option<JoinSet<anyhow::Result<()>>>,
}

impl ServiceGroup {
pub fn new(services: Vec<Box<dyn Service>>) -> Self {
pub fn new(
critical_services: Vec<Box<dyn Service>>,
auxiliary_services: Vec<Box<dyn Service>>,
) -> Self {
let has_critical_services = !critical_services.is_empty();
let has_auxiliary_services = !auxiliary_services.is_empty();

Self {
services,
join_set: Some(JoinSet::default()),
critical_services,
auxiliary_services,
critical_join_set: if has_critical_services {
Some(JoinSet::default())
} else {
None
},
auxiliary_join_set: if has_auxiliary_services {
Some(JoinSet::default())
} else {
None
},
}
}

pub fn push(&mut self, service: impl Service) {
if self.join_set.is_none() {
self.join_set = Some(JoinSet::default());
pub fn push_critical(&mut self, service: impl Service) {
if self.critical_join_set.is_none() {
self.critical_join_set = Some(JoinSet::default());
}
self.services.push(Box::new(service));
self.critical_services.push(Box::new(service));
}

pub fn push_auxiliary(&mut self, service: impl Service) {
if self.auxiliary_join_set.is_none() {
self.auxiliary_join_set = Some(JoinSet::default());
}
self.auxiliary_services.push(Box::new(service));
}

#[must_use]
pub fn with_critical(mut self, service: impl Service) -> Self {
self.push_critical(service);
self
}

#[must_use]
pub fn with(mut self, service: impl Service) -> Self {
self.push(service);
pub fn with_auxiliary(mut self, service: impl Service) -> Self {
self.push_auxiliary(service);
self
}
}

#[async_trait::async_trait]
impl Service for ServiceGroup {
async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> {
let mut own_join_set = self
.join_set
if self.critical_services.is_empty() {
return Err(anyhow!("ServiceGroup started without any critical service"));
}

let mut own_critical_join_set = self
.critical_join_set
.take()
.expect("Service has already been started");
.context("ServiceGroup has already been started")?;

for service in &mut self.services {
for service in &mut self.critical_services {
let ctx = runner.ctx.clone();
service
.start(ServiceRunner::new(ctx, &mut own_join_set))
.start(ServiceRunner::new(ctx, &mut own_critical_join_set))
.await
.context("Starting service")?;
.context("Starting critical service")?;
}

runner.join_set.spawn(drive_joinset(own_join_set));
if !self.auxiliary_services.is_empty() {
let mut own_auxiliary_join_set = self
.auxiliary_join_set
.take()
.context("ServiceGroup has already been started")?;

for service in &mut self.auxiliary_services {
let ctx = runner.ctx.clone();
// Ignore start result for auxiliary services
let _ = service
.start(ServiceRunner::new(ctx, &mut own_auxiliary_join_set))
.await;
}

runner.join_set.spawn(drive_critical_and_auxiliary_joinsets(
own_critical_join_set,
own_auxiliary_join_set,
));
} else {
runner
.join_set
.spawn(drive_critical_joinset(own_critical_join_set));
};

Ok(())
}
}

async fn drive_joinset(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
async fn drive_critical_joinset(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
while let Some(result) = join_set.join_next().await {
match result {
Ok(result) => result?,
Expand All @@ -166,5 +224,22 @@ async fn drive_joinset(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Res
Err(_) => {}
}
}

Ok(())
}

async fn drive_critical_and_auxiliary_joinsets(
critical_join_set: JoinSet<anyhow::Result<()>>,
mut auxiliary_join_set: JoinSet<anyhow::Result<()>>,
) -> anyhow::Result<()> {
let (res_critical, _ret_auxiliary) = futures::future::join(
drive_critical_joinset(critical_join_set),
// Ignore result for auxiliary services
async { while let Some(_result) = auxiliary_join_set.join_next().await {} },
)
.await;

res_critical?;

Ok(())
}
Loading
Loading