Skip to content

Commit 47cc504

Browse files
committed
refactor: dependency injection on grpc and BeeMsg handlers
* Accept a trait impl (`App*` / `AppAll`) to instead of conrete `Context` type on handlers * Implement a runtime `App` that satifies the above and is passed to the handlers at runtime * Add a `TestApp` implementation with rudimentary functionality for testing the handlers
1 parent 00ae0e1 commit 47cc504

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1017
-719
lines changed

mgmtd/src/app.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
//! Interfaces and implementations for in-app interaction between tasks or threads.
2+
3+
mod runtime;
4+
#[cfg(test)]
5+
pub(crate) mod test;
6+
7+
use crate::StaticInfo;
8+
use crate::license::LicensedFeature;
9+
use anyhow::Result;
10+
use protobuf::license::GetCertDataResult;
11+
pub(crate) use runtime::RuntimeApp;
12+
use rusqlite::{Connection, Transaction};
13+
use shared::bee_msg::Msg;
14+
use shared::bee_serde::{Deserializable, Serializable};
15+
use shared::types::{NodeId, NodeType, Uid};
16+
use std::fmt::Debug;
17+
use std::future::Future;
18+
use std::net::SocketAddr;
19+
use std::path::Path;
20+
use std::sync::Arc;
21+
22+
pub(crate) trait App: Debug + Clone + Send + 'static {
23+
/// Return a borrow to the applications static, immutable config and derived info
24+
fn static_info(&self) -> &StaticInfo;
25+
26+
// Database access
27+
28+
/// DB Read transaction
29+
fn db_read_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
30+
&self,
31+
op: T,
32+
) -> impl Future<Output = Result<R>> + Send;
33+
34+
/// DB write transaction
35+
fn db_write_tx<T: Send + 'static + FnOnce(&Transaction) -> Result<R>, R: Send + 'static>(
36+
&self,
37+
op: T,
38+
) -> impl Future<Output = Result<R>> + Send;
39+
40+
/// DB write transaction without fsync
41+
fn db_write_tx_no_sync<
42+
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
43+
R: Send + 'static,
44+
>(
45+
&self,
46+
op: T,
47+
) -> impl Future<Output = Result<R>> + Send;
48+
49+
/// Provides access to a DB connection handle, no transaction
50+
fn db_conn<T: Send + 'static + FnOnce(&mut Connection) -> Result<R>, R: Send + 'static>(
51+
&self,
52+
op: T,
53+
) -> impl Future<Output = Result<R>> + Send;
54+
55+
// BeeMsg communication
56+
57+
/// Send a [Msg] to a node via TCP and receive the response
58+
fn beemsg_request<M: Msg + Serializable, R: Msg + Deserializable>(
59+
&self,
60+
node_uid: Uid,
61+
msg: &M,
62+
) -> impl Future<Output = Result<R>> + Send;
63+
64+
/// Send a [Msg] to all nodes of a type via UDP
65+
fn beemsg_send_notifications<M: Msg + Serializable>(
66+
&self,
67+
node_types: &'static [NodeType],
68+
msg: &M,
69+
) -> impl Future<Output = ()> + Send;
70+
71+
/// Replace all stored BeeMsg network addresses of a node in the store
72+
fn beemsg_replace_node_addrs(&self, node_uid: Uid, new_addrs: impl Into<Arc<[SocketAddr]>>);
73+
74+
// Run state
75+
76+
/// Check if management is in pre shutdown state
77+
fn rs_pre_shutdown(&self) -> bool;
78+
/// Notify the runtime control that a particular client pulled states of a particular node type
79+
fn rs_notify_client_pulled_state(&self, node_type: NodeType, node_id: NodeId);
80+
81+
// Licensing control
82+
83+
/// Load and verify a license certificate
84+
fn lic_load_and_verify_cert(
85+
&self,
86+
cert_path: &Path,
87+
) -> impl Future<Output = Result<String>> + Send;
88+
89+
/// Get license certificate data
90+
fn lic_get_cert_data(&self) -> Result<GetCertDataResult>;
91+
92+
/// Get licensed number of machines
93+
fn lic_get_num_machines(&self) -> Result<u32>;
94+
95+
/// Verify a feature is licensed
96+
fn lic_verify_feature(&self, feature: LicensedFeature) -> Result<()>;
97+
}

mgmtd/src/app/runtime.rs

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
use super::*;
2+
use crate::ClientPulledStateNotification;
3+
use crate::bee_msg::dispatch_request;
4+
use crate::license::LicenseVerifier;
5+
use anyhow::Result;
6+
use protobuf::license::GetCertDataResult;
7+
use rusqlite::{Connection, Transaction};
8+
use shared::conn::msg_dispatch::{DispatchRequest, Request};
9+
use shared::conn::outgoing::Pool;
10+
use shared::run_state::WeakRunStateHandle;
11+
use sqlite::Connections;
12+
use std::fmt::Debug;
13+
use std::ops::Deref;
14+
use tokio::sync::mpsc;
15+
16+
/// A collection of Handles used for interacting and accessing the different components of the app.
17+
///
18+
/// This is the actual runtime object that can be shared between tasks. Interfaces should, however,
19+
/// accept any implementation of the AppContext trait instead.
20+
#[derive(Clone, Debug)]
21+
pub(crate) struct RuntimeApp(Arc<InnerAppHandles>);
22+
23+
/// Stores the actual handles.
24+
#[derive(Debug)]
25+
pub(crate) struct InnerAppHandles {
26+
pub conn: Pool,
27+
pub db: Connections,
28+
pub license: LicenseVerifier,
29+
pub info: &'static StaticInfo,
30+
pub run_state: WeakRunStateHandle,
31+
shutdown_client_id: mpsc::Sender<ClientPulledStateNotification>,
32+
}
33+
34+
impl RuntimeApp {
35+
/// Creates a new AppHandles object.
36+
///
37+
/// Takes all the stored handles.
38+
pub(crate) fn new(
39+
conn: Pool,
40+
db: Connections,
41+
license: LicenseVerifier,
42+
info: &'static StaticInfo,
43+
run_state: WeakRunStateHandle,
44+
shutdown_client_id: mpsc::Sender<ClientPulledStateNotification>,
45+
) -> Self {
46+
Self(Arc::new(InnerAppHandles {
47+
conn,
48+
db,
49+
license,
50+
info,
51+
run_state,
52+
shutdown_client_id,
53+
}))
54+
}
55+
}
56+
57+
/// Derefs to InnerAppHandle which stores all the handles.
58+
///
59+
/// Allows transparent access.
60+
impl Deref for RuntimeApp {
61+
type Target = InnerAppHandles;
62+
63+
fn deref(&self) -> &Self::Target {
64+
&self.0
65+
}
66+
}
67+
68+
/// Adds BeeMsg dispatching functionality to AppHandles
69+
impl DispatchRequest for RuntimeApp {
70+
async fn dispatch_request(&self, req: impl Request) -> Result<()> {
71+
dispatch_request(self, req).await
72+
}
73+
}
74+
75+
impl App for RuntimeApp {
76+
fn static_info(&self) -> &StaticInfo {
77+
self.info
78+
}
79+
80+
async fn db_read_tx<
81+
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
82+
R: Send + 'static,
83+
>(
84+
&self,
85+
op: T,
86+
) -> Result<R> {
87+
Connections::read_tx(&self.db, op).await
88+
}
89+
90+
async fn db_write_tx<
91+
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
92+
R: Send + 'static,
93+
>(
94+
&self,
95+
op: T,
96+
) -> Result<R> {
97+
Connections::write_tx(&self.db, op).await
98+
}
99+
100+
async fn db_write_tx_no_sync<
101+
T: Send + 'static + FnOnce(&Transaction) -> Result<R>,
102+
R: Send + 'static,
103+
>(
104+
&self,
105+
op: T,
106+
) -> Result<R> {
107+
Connections::write_tx_no_sync(&self.db, op).await
108+
}
109+
110+
async fn db_conn<
111+
T: Send + 'static + FnOnce(&mut Connection) -> Result<R>,
112+
R: Send + 'static,
113+
>(
114+
&self,
115+
op: T,
116+
) -> Result<R> {
117+
Connections::conn(&self.db, op).await
118+
}
119+
120+
async fn beemsg_request<M: Msg + Serializable, R: Msg + Deserializable>(
121+
&self,
122+
node_uid: Uid,
123+
msg: &M,
124+
) -> Result<R> {
125+
Pool::request(&self.conn, node_uid, msg).await
126+
}
127+
128+
async fn beemsg_send_notifications<M: Msg + Serializable>(
129+
&self,
130+
node_types: &'static [NodeType],
131+
msg: &M,
132+
) {
133+
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");
134+
135+
for t in node_types {
136+
if let Err(err) = async {
137+
let nodes = self
138+
.db_read_tx(move |tx| crate::db::node::get_with_type(tx, *t))
139+
.await?;
140+
141+
self.conn
142+
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
143+
.await?;
144+
145+
Ok(()) as Result<_>
146+
}
147+
.await
148+
{
149+
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
150+
}
151+
}
152+
}
153+
154+
fn beemsg_replace_node_addrs(&self, node_uid: Uid, new_addrs: impl Into<Arc<[SocketAddr]>>) {
155+
Pool::replace_node_addrs(&self.conn, node_uid, new_addrs)
156+
}
157+
158+
fn rs_pre_shutdown(&self) -> bool {
159+
WeakRunStateHandle::pre_shutdown(&self.run_state)
160+
}
161+
162+
fn rs_notify_client_pulled_state(&self, node_type: NodeType, node_id: NodeId) {
163+
if self.run_state.pre_shutdown() {
164+
let tx = self.shutdown_client_id.clone();
165+
166+
// We don't want to block the task calling this and are not interested by the results
167+
tokio::spawn(async move {
168+
let _ = tx.send((node_type, node_id)).await;
169+
});
170+
}
171+
}
172+
173+
async fn lic_load_and_verify_cert(&self, cert_path: &Path) -> Result<String> {
174+
LicenseVerifier::load_and_verify_cert(&self.license, cert_path).await
175+
}
176+
177+
fn lic_get_cert_data(&self) -> Result<GetCertDataResult> {
178+
LicenseVerifier::get_cert_data(&self.license)
179+
}
180+
181+
fn lic_get_num_machines(&self) -> Result<u32> {
182+
LicenseVerifier::get_num_machines(&self.license)
183+
}
184+
185+
fn lic_verify_feature(&self, feature: LicensedFeature) -> Result<()> {
186+
self.license.verify_feature(feature)
187+
}
188+
}

0 commit comments

Comments
 (0)