Skip to content

Commit 0e36dbd

Browse files
authored
feat(service): critical/auxiliary distinction in ServiceGroup (#25)
1 parent 27d294a commit 0e36dbd

File tree

3 files changed

+308
-22
lines changed

3 files changed

+308
-22
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ repository = "https://github.com/astraly-labs/pragma-common"
1111
license = "MIT"
1212

1313
[features]
14+
default = []
1415
serde = ["dep:serde"]
1516
borsh = ["dep:borsh"]
1617
proto = ["dep:prost"]

src/services.rs

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
/// <https://github.com/madara-alliance/madara/blob/main/crates/madara/primitives/utils/src/service.rs>
33
use std::{panic, time::Duration};
44

5-
use anyhow::Context;
5+
use anyhow::{anyhow, Context};
66
use futures::Future;
77
use tokio::task::JoinSet;
88
use tokio_util::sync::CancellationToken;
@@ -71,7 +71,7 @@ pub trait Service: 'static + Send + Sync {
7171
let runner = ServiceRunner::new(ctx, &mut join_set);
7272

7373
self.start(runner).await.context("Starting service")?;
74-
drive_joinset(join_set).await
74+
drive_critical_joinset(join_set).await
7575
}
7676
}
7777

@@ -109,54 +109,112 @@ impl<'a> ServiceRunner<'a> {
109109
/// A group of services that can be started together
110110
#[derive(Default)]
111111
pub struct ServiceGroup {
112-
services: Vec<Box<dyn Service>>,
113-
join_set: Option<JoinSet<anyhow::Result<()>>>,
112+
critical_services: Vec<Box<dyn Service>>,
113+
auxiliary_services: Vec<Box<dyn Service>>,
114+
critical_join_set: Option<JoinSet<anyhow::Result<()>>>,
115+
auxiliary_join_set: Option<JoinSet<anyhow::Result<()>>>,
114116
}
115117

116118
impl ServiceGroup {
117-
pub fn new(services: Vec<Box<dyn Service>>) -> Self {
119+
pub fn new(
120+
critical_services: Vec<Box<dyn Service>>,
121+
auxiliary_services: Vec<Box<dyn Service>>,
122+
) -> Self {
123+
let has_critical_services = !critical_services.is_empty();
124+
let has_auxiliary_services = !auxiliary_services.is_empty();
125+
118126
Self {
119-
services,
120-
join_set: Some(JoinSet::default()),
127+
critical_services,
128+
auxiliary_services,
129+
critical_join_set: if has_critical_services {
130+
Some(JoinSet::default())
131+
} else {
132+
None
133+
},
134+
auxiliary_join_set: if has_auxiliary_services {
135+
Some(JoinSet::default())
136+
} else {
137+
None
138+
},
121139
}
122140
}
123141

124-
pub fn push(&mut self, service: impl Service) {
125-
if self.join_set.is_none() {
126-
self.join_set = Some(JoinSet::default());
142+
pub fn push_critical(&mut self, service: impl Service) {
143+
if self.critical_join_set.is_none() {
144+
self.critical_join_set = Some(JoinSet::default());
127145
}
128-
self.services.push(Box::new(service));
146+
self.critical_services.push(Box::new(service));
147+
}
148+
149+
pub fn push_auxiliary(&mut self, service: impl Service) {
150+
if self.auxiliary_join_set.is_none() {
151+
self.auxiliary_join_set = Some(JoinSet::default());
152+
}
153+
self.auxiliary_services.push(Box::new(service));
154+
}
155+
156+
#[must_use]
157+
pub fn with_critical(mut self, service: impl Service) -> Self {
158+
self.push_critical(service);
159+
self
129160
}
130161

131162
#[must_use]
132-
pub fn with(mut self, service: impl Service) -> Self {
133-
self.push(service);
163+
pub fn with_auxiliary(mut self, service: impl Service) -> Self {
164+
self.push_auxiliary(service);
134165
self
135166
}
136167
}
137168

138169
#[async_trait::async_trait]
139170
impl Service for ServiceGroup {
140171
async fn start<'a>(&mut self, runner: ServiceRunner<'a>) -> anyhow::Result<()> {
141-
let mut own_join_set = self
142-
.join_set
172+
if self.critical_services.is_empty() {
173+
return Err(anyhow!("ServiceGroup started without any critical service"));
174+
}
175+
176+
let mut own_critical_join_set = self
177+
.critical_join_set
143178
.take()
144-
.expect("Service has already been started");
179+
.context("ServiceGroup has already been started")?;
145180

146-
for service in &mut self.services {
181+
for service in &mut self.critical_services {
147182
let ctx = runner.ctx.clone();
148183
service
149-
.start(ServiceRunner::new(ctx, &mut own_join_set))
184+
.start(ServiceRunner::new(ctx, &mut own_critical_join_set))
150185
.await
151-
.context("Starting service")?;
186+
.context("Starting critical service")?;
152187
}
153188

154-
runner.join_set.spawn(drive_joinset(own_join_set));
189+
if !self.auxiliary_services.is_empty() {
190+
let mut own_auxiliary_join_set = self
191+
.auxiliary_join_set
192+
.take()
193+
.context("ServiceGroup has already been started")?;
194+
195+
for service in &mut self.auxiliary_services {
196+
let ctx = runner.ctx.clone();
197+
// Ignore start result for auxiliary services
198+
let _ = service
199+
.start(ServiceRunner::new(ctx, &mut own_auxiliary_join_set))
200+
.await;
201+
}
202+
203+
runner.join_set.spawn(drive_critical_and_auxiliary_joinsets(
204+
own_critical_join_set,
205+
own_auxiliary_join_set,
206+
));
207+
} else {
208+
runner
209+
.join_set
210+
.spawn(drive_critical_joinset(own_critical_join_set));
211+
};
212+
155213
Ok(())
156214
}
157215
}
158216

159-
async fn drive_joinset(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
217+
async fn drive_critical_joinset(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
160218
while let Some(result) = join_set.join_next().await {
161219
match result {
162220
Ok(result) => result?,
@@ -166,5 +224,22 @@ async fn drive_joinset(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Res
166224
Err(_) => {}
167225
}
168226
}
227+
228+
Ok(())
229+
}
230+
231+
async fn drive_critical_and_auxiliary_joinsets(
232+
critical_join_set: JoinSet<anyhow::Result<()>>,
233+
mut auxiliary_join_set: JoinSet<anyhow::Result<()>>,
234+
) -> anyhow::Result<()> {
235+
let (res_critical, _ret_auxiliary) = futures::future::join(
236+
drive_critical_joinset(critical_join_set),
237+
// Ignore result for auxiliary services
238+
async { while let Some(_result) = auxiliary_join_set.join_next().await {} },
239+
)
240+
.await;
241+
242+
res_critical?;
243+
169244
Ok(())
170245
}

0 commit comments

Comments
 (0)