Skip to content

Commit 0231894

Browse files
authored
Merge pull request #137 from chrivers/chrivers/service-instancing
Rework `svc` ("service") crate, to support templated services. These are analogous to systemd template service, in which a service name can contain `@` to indicate it is a template. In this way, instead of manually registering `foo-1`, `foo-2`, etc, we can register `foo@`, and then start instances `1`, `2`, etc. This makes service management cleaner and simpler, and provides a way to recognize groups of related services.
2 parents 0f9baec + 162bdcb commit 0231894

File tree

18 files changed

+428
-143
lines changed

18 files changed

+428
-143
lines changed

crates/bifrost-api/src/service.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
use std::collections::BTreeMap;
22

33
use serde::{Deserialize, Serialize};
4-
use svc::traits::ServiceState;
54
use uuid::Uuid;
65

6+
use svc::serviceid::ServiceName;
7+
use svc::traits::ServiceState;
8+
79
use crate::Client;
810
use crate::error::BifrostResult;
911

1012
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
1113
pub struct Service {
1214
pub id: Uuid,
13-
pub name: String,
15+
pub name: ServiceName,
1416
pub state: ServiceState,
1517
}
1618

@@ -24,12 +26,12 @@ impl Client {
2426
self.get("service").await
2527
}
2628

27-
pub async fn service_stop(&self, id: Uuid) -> BifrostResult<()> {
29+
pub async fn service_stop(&self, id: Uuid) -> BifrostResult<Uuid> {
2830
self.put(&format!("service/{id}"), ServiceState::Stopped)
2931
.await
3032
}
3133

32-
pub async fn service_start(&self, id: Uuid) -> BifrostResult<()> {
34+
pub async fn service_start(&self, id: Uuid) -> BifrostResult<Uuid> {
3335
self.put(&format!("service/{id}"), ServiceState::Running)
3436
.await
3537
}

crates/svc/src/error.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::error::Error;
33
use thiserror::Error;
44

55
use crate::manager::{ServiceEvent, SvmRequest};
6-
use crate::serviceid::ServiceId;
6+
use crate::serviceid::{ServiceId, ServiceName};
77
use crate::traits::ServiceState;
88

99
#[derive(Error, Debug)]
@@ -43,13 +43,22 @@ pub enum SvcError {
4343
ServiceNotFound(ServiceId),
4444

4545
#[error("Service {0} already exists")]
46-
ServiceAlreadyExists(String),
46+
ServiceAlreadyExists(ServiceName),
4747

4848
#[error("All services stopped")]
4949
Shutdown,
5050

5151
#[error("Service has failed")]
5252
ServiceFailed,
53+
54+
#[error("Templated service generation failed")]
55+
ServiceGeneration(Box<dyn Error + Send>),
56+
}
57+
58+
impl SvcError {
59+
pub fn generation(err: impl Error + Send + 'static) -> Self {
60+
Self::ServiceGeneration(Box::new(err))
61+
}
5362
}
5463

5564
#[derive(Error, Debug)]

crates/svc/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ pub mod manager;
1010
pub mod rpc;
1111
#[cfg(feature = "manager")]
1212
pub mod runservice;
13+
#[cfg(feature = "manager")]
14+
pub mod template;

crates/svc/src/manager.rs

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@ use uuid::Uuid;
1515
use crate::error::{RunSvcError, SvcError, SvcResult};
1616
use crate::rpc::RpcRequest;
1717
use crate::runservice::StandardService;
18-
use crate::serviceid::{IntoServiceId, ServiceId};
18+
use crate::serviceid::{IntoServiceId, ServiceId, ServiceName};
19+
use crate::template::ServiceTemplate;
1920
use crate::traits::{Service, ServiceRunner, ServiceState};
2021

2122
#[derive(Debug)]
2223
pub struct ServiceInstance {
2324
tx: watch::Sender<ServiceState>,
24-
name: String,
25+
name: ServiceName,
2526
state: ServiceState,
2627
abort_handle: AbortHandle,
2728
}
@@ -60,13 +61,14 @@ impl ServiceEvent {
6061

6162
/// A request to a [`ServiceManager`]
6263
pub enum SvmRequest {
63-
Stop(RpcRequest<ServiceId, SvcResult<()>>),
64-
Start(RpcRequest<ServiceId, SvcResult<()>>),
64+
Stop(RpcRequest<ServiceId, SvcResult<Uuid>>),
65+
Start(RpcRequest<ServiceId, SvcResult<Uuid>>),
6566
Status(RpcRequest<ServiceId, SvcResult<ServiceState>>),
66-
List(RpcRequest<(), Vec<(Uuid, String)>>),
67+
List(RpcRequest<(), Vec<(Uuid, ServiceName)>>),
6768
Resolve(RpcRequest<ServiceId, SvcResult<Uuid>>),
68-
LookupName(RpcRequest<ServiceId, SvcResult<String>>),
69+
LookupName(RpcRequest<ServiceId, SvcResult<ServiceName>>),
6970
Register(RpcRequest<(String, ServiceFunc), SvcResult<Uuid>>),
71+
RegisterTemplate(RpcRequest<(String, Box<dyn ServiceTemplate>), SvcResult<()>>),
7072
Subscribe(RpcRequest<mpsc::UnboundedSender<ServiceEvent>, SvcResult<Uuid>>),
7173
Shutdown(RpcRequest<(), ()>),
7274
}
@@ -128,19 +130,29 @@ impl SvmClient {
128130
.await?
129131
}
130132

131-
pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult<()> {
133+
pub async fn register_template(
134+
&mut self,
135+
name: impl AsRef<str>,
136+
generator: impl ServiceTemplate + 'static,
137+
) -> SvcResult<()> {
138+
let name = name.as_ref().to_string();
139+
self.rpc(SvmRequest::RegisterTemplate, (name, Box::new(generator)))
140+
.await?
141+
}
142+
143+
pub async fn start(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
132144
self.rpc(SvmRequest::Start, id.service_id()).await?
133145
}
134146

135-
pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult<()> {
147+
pub async fn stop(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
136148
self.rpc(SvmRequest::Stop, id.service_id()).await?
137149
}
138150

139151
pub async fn resolve(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
140152
self.rpc(SvmRequest::Resolve, id.service_id()).await?
141153
}
142154

143-
pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult<String> {
155+
pub async fn lookup_name(&mut self, id: impl IntoServiceId) -> SvcResult<ServiceName> {
144156
self.rpc(SvmRequest::LookupName, id.service_id()).await?
145157
}
146158

@@ -197,7 +209,7 @@ impl SvmClient {
197209
self.rpc(SvmRequest::Status, id.service_id()).await?
198210
}
199211

200-
pub async fn list(&mut self) -> SvcResult<Vec<(Uuid, String)>> {
212+
pub async fn list(&mut self) -> SvcResult<Vec<(Uuid, ServiceName)>> {
201213
self.rpc(SvmRequest::List, ()).await
202214
}
203215

@@ -214,6 +226,10 @@ impl Debug for SvmRequest {
214226
Self::Status(arg0) => f.debug_tuple("Status").field(arg0).finish(),
215227
Self::List(arg0) => f.debug_tuple("List").field(arg0).finish(),
216228
Self::Register(_arg0) => f.debug_tuple("Register").field(&"<service>").finish(),
229+
Self::RegisterTemplate(_arg0) => f
230+
.debug_tuple("RegisterTemplate")
231+
.field(&"<service>")
232+
.finish(),
217233
Self::Resolve(arg0) => f.debug_tuple("Resolve").field(arg0).finish(),
218234
Self::LookupName(arg0) => f.debug_tuple("ResolveName").field(arg0).finish(),
219235
Self::Subscribe(_arg0) => f.debug_tuple("Subscribe").finish(),
@@ -229,8 +245,9 @@ pub struct ServiceManager {
229245
service_tx: mpsc::UnboundedSender<ServiceEvent>,
230246
subscribers: BTreeMap<Uuid, mpsc::UnboundedSender<ServiceEvent>>,
231247
svcs: BTreeMap<Uuid, ServiceInstance>,
232-
names: BTreeMap<String, Uuid>,
248+
names: BTreeMap<ServiceName, Uuid>,
233249
tasks: JoinSet<Result<(), RunSvcError>>,
250+
templates: BTreeMap<String, Box<dyn ServiceTemplate>>,
234251
shutdown: bool,
235252
}
236253

@@ -254,6 +271,7 @@ impl ServiceManager {
254271
svcs: BTreeMap::new(),
255272
names: BTreeMap::new(),
256273
tasks: JoinSet::new(),
274+
templates: BTreeMap::new(),
257275
shutdown: false,
258276
}
259277
}
@@ -284,8 +302,7 @@ impl ServiceManager {
284302
self.control_tx.clone()
285303
}
286304

287-
fn register(&mut self, name: &str, svc: ServiceFunc) -> SvcResult<Uuid> {
288-
let name = name.to_string();
305+
fn register(&mut self, name: ServiceName, svc: ServiceFunc) -> SvcResult<Uuid> {
289306
if self.names.contains_key(&name) {
290307
return Err(SvcError::ServiceAlreadyExists(name));
291308
}
@@ -297,7 +314,7 @@ impl ServiceManager {
297314

298315
let rec = ServiceInstance {
299316
tx,
300-
name: name.to_string(),
317+
name: name.clone(),
301318
state: ServiceState::Registered,
302319
abort_handle,
303320
};
@@ -317,7 +334,7 @@ impl ServiceManager {
317334
match &id {
318335
ServiceId::Name(name) => self
319336
.names
320-
.get(name.as_str())
337+
.get(name)
321338
.ok_or_else(|| SvcError::ServiceNotFound(id))
322339
.copied(),
323340
ServiceId::Id(uuid) => {
@@ -351,23 +368,49 @@ impl ServiceManager {
351368
Ok(&self.svcs[&id])
352369
}
353370

354-
fn start(&self, id: impl IntoServiceId) -> SvcResult<()> {
355-
self.get(&id).and_then(|svc| {
371+
fn start(&mut self, id: impl IntoServiceId) -> SvcResult<Uuid> {
372+
let id = id.service_id();
373+
374+
// if the service is known, attempt to start it
375+
if let Ok(svc) = self.get(&id) {
356376
log::debug!("Starting service: {id} {}", &svc.name);
357-
Ok(svc.tx.send(ServiceState::Running)?)
358-
})
377+
svc.tx.send(ServiceState::Running)?;
378+
return self.resolve(&id);
379+
}
380+
381+
// ..else, check if it's a named instance
382+
let ServiceId::Name(svc_name) = &id else {
383+
return Err(SvcError::ServiceNotFound(id));
384+
};
385+
386+
let Some(inst) = svc_name.instance() else {
387+
return Err(SvcError::ServiceNotFound(id));
388+
};
389+
390+
let Some(tmpl) = &self.templates.get(svc_name.name()) else {
391+
return Err(SvcError::ServiceNotFound(id));
392+
};
393+
394+
let inner = tmpl.generate(inst.to_string())?;
395+
let svc = StandardService::new(svc_name.name(), inner);
396+
397+
let uuid = self.register(svc_name.clone(), svc.boxed())?;
398+
399+
Ok(uuid)
359400
}
360401

361-
fn stop(&self, id: impl IntoServiceId) -> SvcResult<()> {
402+
fn stop(&self, id: impl IntoServiceId) -> SvcResult<Uuid> {
362403
let id = self.resolve(id)?;
363404

364405
if self.svcs[&id].state == ServiceState::Stopped {
365-
return Ok(());
406+
return Ok(id);
366407
}
367408

368409
log::debug!("Stopping service: {id} {}", self.svcs[&id].name);
369410
self.get(id)
370-
.and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?))
411+
.and_then(|svc| Ok(svc.tx.send(ServiceState::Stopped)?))?;
412+
413+
Ok(id)
371414
}
372415

373416
fn notify_subscribers(&mut self, event: ServiceEvent) {
@@ -412,12 +455,19 @@ impl ServiceManager {
412455
let mut res = vec![];
413456

414457
for (name, id) in &self.names {
415-
res.push((*id, name.to_string()));
458+
res.push((*id, name.clone()));
416459
}
417460
res
418461
}),
419462

420-
SvmRequest::Register(rpc) => rpc.respond(|(name, svc)| self.register(&name, svc)),
463+
SvmRequest::Register(rpc) => {
464+
rpc.respond(|(name, svc)| self.register(ServiceName::from(name), svc));
465+
}
466+
467+
SvmRequest::RegisterTemplate(rpc) => rpc.respond(|(name, tmpl)| {
468+
self.templates.insert(name, tmpl);
469+
Ok(())
470+
}),
421471

422472
SvmRequest::Resolve(rpc) => rpc.respond(|id| self.resolve(&id)),
423473

crates/svc/src/policy.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ use std::time::Duration;
44
#[cfg(feature = "manager")]
55
use tokio::time::sleep;
66

7+
#[derive(Debug, Clone, Copy)]
78
pub enum Retry {
89
No,
910
Limit(u32),
1011
Forever,
1112
}
1213

14+
#[derive(Debug, Clone, Copy)]
1315
pub struct Policy {
1416
pub retry: Retry,
1517
pub delay: Option<Duration>,

crates/svc/src/runservice.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use tokio::time::sleep;
55
use uuid::Uuid;
66

77
use crate::error::RunSvcError;
8-
use crate::manager::ServiceEvent;
8+
use crate::manager::{ServiceEvent, ServiceFunc};
99
use crate::policy::{Policy, Retry};
10-
use crate::traits::{Service, ServiceRunner, ServiceState};
10+
use crate::traits::{Service, ServiceRunner, ServiceState, StopResult};
1111

1212
#[allow(clippy::struct_field_names)]
1313
struct State {
@@ -101,6 +101,12 @@ impl<S: Service> StandardService<S> {
101101
}
102102
}
103103

104+
impl<S: Service + 'static> StandardService<S> {
105+
pub fn boxed(self) -> ServiceFunc {
106+
Box::new(|a, b, c| self.run(a, b, c))
107+
}
108+
}
109+
104110
#[allow(clippy::too_many_lines)]
105111
#[async_trait]
106112
impl<S: Service> ServiceRunner for StandardService<S> {
@@ -192,23 +198,27 @@ impl<S: Service> ServiceRunner for StandardService<S> {
192198
}
193199
},
194200
_ = rx.changed() => if *rx.borrow() == ServiceState::Stopped {
195-
if S::SIGNAL_STOP {
196-
log::trace!(target:target, "Service state change requested (graceful)");
197-
svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?;
198-
tokio::select! {
199-
res = svc.run() => {
200-
log::trace!(target:target, "Service finished running within timeout: {res:?}");
201-
},
202-
() = sleep(Duration::from_secs(1)) => {
203-
log::warn!("timeout");
201+
log::trace!(target:target, "Stopping service");
202+
let stop = svc.signal_stop().await.map_err(|e| RunSvcError::ServiceError(Box::new(e)))?;
203+
match stop {
204+
StopResult::Delivered => {
205+
log::trace!(target:target, "Service state change requested (graceful)");
206+
tokio::select! {
207+
res = svc.run() => {
208+
log::trace!(target:target, "Service finished running within timeout: {res:?}");
209+
},
210+
() = sleep(Duration::from_secs(1)) => {
211+
log::warn!("timeout");
212+
}
204213
}
205-
}
206-
state.set(ServiceState::Stopping)?;
207-
} else {
208-
log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow());
209-
if *rx.borrow_and_update() == ServiceState::Stopped {
210214
state.set(ServiceState::Stopping)?;
211215
}
216+
StopResult::NotSupported => {
217+
log::trace!(target:target, "Service state change requested: {:?} -> {:?}", state.get(), *rx.borrow());
218+
if *rx.borrow_and_update() == ServiceState::Stopped {
219+
state.set(ServiceState::Stopping)?;
220+
}
221+
}
212222
}
213223
}
214224
}

0 commit comments

Comments
 (0)