-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun.rs
More file actions
405 lines (341 loc) · 13.6 KB
/
run.rs
File metadata and controls
405 lines (341 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tashi_collections::HashMap;
use crate::cli::LogFormat;
use crate::config;
use crate::config::addresses::Addresses;
use crate::config::permissions::PermissionsConfig;
use crate::config::users::{AuthConfig, UsersConfig};
use crate::mqtt::broker::{self, MqttBroker};
use crate::mqtt::{KeepAlive, TceState};
use crate::transaction::AddNodeTransaction;
use color_eyre::eyre;
use color_eyre::eyre::Context;
use tashi_consensus_engine::quic::QuicSocket;
use tashi_consensus_engine::{
Certificate, Platform, RootCertificates, SecretKey, UnknownConnectionAction,
};
use tokio::sync::mpsc;
#[derive(clap::Args, Clone, Debug)]
pub struct RunArgs {
/// Set the format of log output.
#[clap(short, long, default_value = "full")]
pub log: LogFormat,
/// The TCP socket address to listen for MQTT (non-TLS) connections from clients.
#[clap(short = 'L', long, default_value = "0.0.0.0:1883")]
pub mqtt_addr: SocketAddr,
// `19793` is the ASCII characters `MQ` reinterpreted as a big-endian integer.
/// The UDP socket address to listen for cluster connections from other FoxMQ brokers.
#[clap(short = 'C', long, default_value = "0.0.0.0:19793")]
pub cluster_addr: SocketAddr,
/// Set the maximum Keep Alive interval for MQTT connections, in seconds.
///
/// A client may specify a nonzero interval smaller than this.
/// This also becomes the default Keep Alive interval if a client does not specify one.
///
/// Per the specification, the connection times out if the client does not send any
/// MQTT control packet in 1.5x the Keep Alive interval.
///
/// Set to 0 to allow the client to set any Keep Alive interval, including 0 (no timeout).
///
/// The maximum value allowed by the MQTT spec is 65,535, or 18 hours, 12 minutes, and 15 seconds.
#[clap(long, default_value = "3600")]
pub max_keep_alive: u16,
#[command(flatten)]
pub auth_config: AuthConfig,
#[command(flatten)]
pub secret_key: SecretKeyOpt,
#[command(flatten)]
pub tls_config: TlsConfig,
#[command(flatten)]
pub ws_config: WsConfig,
#[command(flatten)]
pub cluster_config: ClusterConfig,
/// The directory containing `address-book.toml` and (optionally) `users.toml`.
#[clap(default_value = "foxmq.d/")]
pub config_dir: PathBuf,
}
#[derive(clap::Args, Debug, Clone)]
#[group(required = false, multiple = false)]
pub struct SecretKeyOpt {
/// Read the P-256 secret key used to identify this broker in the cluster from hex encoded DER.
///
/// If `--tls-key-file` is not provided and `--mqtts` is enabled,
/// this or `--secret-key-file` will be used by default.
#[clap(short = 'k', long, env)]
pub secret_key: Option<String>,
/// Read the PEM-encoded P-256 secret key used to identify this broker in the cluster from a file.
#[clap(short = 'f', long, env)]
pub secret_key_file: Option<PathBuf>,
}
#[derive(clap::Args, Debug, Clone)]
pub struct TlsConfig {
/// Enable listening for MQTT-over-TLS connections on a separate socket (0.0.0.0:8883 by default).
#[clap(long)]
pub mqtts: bool,
/// The TCP socket address to listen for MQTT-over-TLS (`mmqts`) connections from clients.
#[clap(long, default_value = "0.0.0.0:8883")]
pub mqtts_addr: SocketAddr,
/// The domain name to report for Server Name Identification (SNI) in TLS.
#[clap(long, default_value = "foxmq.local")]
pub server_name: String,
/// Override the secret key used for TLS handshakes.
///
/// Note: this only applies to MQTT-over-TLS connections (`--mqtts`).
///
/// Defaults to the main secret key (`--secret-key`/`--secret-key-file`).
#[clap(long)]
pub tls_key_file: Option<PathBuf>,
/// Path to the X.509 certificate to use for TLS.
///
/// Note: this only applies to MQTT-over-TLS connections (`--mqtts`).
///
/// Defaults to a certificate self-signed with either `--tls-key-file`
/// or the main secret key (`--secret-key`/`--secret-key-file`).
#[clap(long)]
pub tls_cert_file: Option<PathBuf>,
}
/// Websockets configuration
#[derive(clap::Args, Debug, Clone)]
pub struct WsConfig {
/// Enable listening for MQTT-over-Websockets connections on a separate socket (0.0.0.0:8080 by default).
#[clap(long)]
pub websockets: bool,
/// The TCP socket address to listen for MQTT-over-Websockets (`ws`) connections from clients.
#[clap(long, default_value = "0.0.0.0:8080")]
pub websockets_addr: SocketAddr,
}
#[derive(clap::Args, Debug, Clone)]
pub struct ClusterConfig {
/// Path to a TLS certificate or certificate chain file
/// to present to peers on new cluster connections.
///
/// If not set, a self-signed TLS certificate is generated on startup.
///
/// If set, the certificate _must_ match the secret key specified by `--secret-key`
/// or `--secret-key-path`.
#[clap(long, env)]
pub cluster_cert: Option<PathBuf>,
/// Path to a TLS root certificate file to use for cluster operations.
///
/// If set, all peers must present a valid TLS certificate chain that
/// ends with this certificate.
#[clap(long, env, requires("cluster_cert"))]
pub cluster_root_cert: Option<PathBuf>,
/// If set, admit any peer to the cluster that connects with a valid TLS certificate.
///
/// This allows peers to connect even if they aren't in the initial address book.
///
/// A valid certificate is any certificate chain that ends with the certificate
/// specified by `--cluster-root-cert`.
#[clap(long, env, requires("cluster_root_cert"))]
pub cluster_accept_peer_with_cert: bool,
}
struct TceConfig {
config: tashi_consensus_engine::Config,
roots: Option<Arc<RootCertificates>>,
add_nodes: mpsc::UnboundedReceiver<AddNodeTransaction>,
joining_running_session: bool,
}
impl SecretKeyOpt {
/// NOTE: uses blocking I/O internally if the secret key was specified as a file.
pub fn read_key(&self) -> crate::Result<SecretKey> {
if let Some(der) = &self.secret_key {
let der_bytes = hex::decode(der).wrap_err("error decoding hex-encoded secret key")?;
return SecretKey::from_der(&der_bytes)
.wrap_err("error decoding P-256 secret key from DER");
}
if let Some(path) = &self.secret_key_file {
return read_secret_key(path);
}
Ok(SecretKey::generate())
}
}
pub fn main(args: RunArgs) -> crate::Result<()> {
let mut users = config::users::read(&args.config_dir.join("users.toml"))?;
let acl = config::permissions::read(&args.config_dir.join("permissions.toml"))?;
// Merge any auth overrides from the command-line.
users.auth.merge(&args.auth_config);
if users.by_username.is_empty() && !users.auth.allow_anonymous_login {
let command = std::env::args()
.next()
.unwrap_or_else(|| "foxmq".to_string());
eyre::bail!(
"Broker will be impossible to use in current configuration; \
no user logins are configured and anonymous login is disallowed by default. \
Run `{command} user add` to create at least one user login or enable anonymous login. \
Run `{command} help` for details.",
)
}
eyre::ensure!(!users.by_username.is_empty() || users.auth.allow_anonymous_login);
let secret_key = args.secret_key.read_key()?;
// File and stdio aren't truly async in Tokio so we might as well do that before we even start the runtime
let tce_config = match config::addresses::read(&args.config_dir.join("address-book.toml")) {
Ok(addresses) => {
let tce_config =
create_tce_config(secret_key.clone(), &addresses, &args.cluster_config)
.wrap_err("error initializing TCE config")?;
Some(tce_config)
}
Err(_) => {
tracing::info!("Running in non-clustered mode");
None
}
};
let tls_config = args
.tls_config
.mqtts
.then(|| {
let tls_socket_addr = args.tls_config.mqtts_addr;
let key = if let Some(secret_key_file) = &args.tls_config.tls_key_file {
read_secret_key(secret_key_file)?
} else {
secret_key
};
let cert_chain = if let Some(cert_file) = &args.tls_config.tls_cert_file {
let cert_pem = std::fs::read(cert_file)
.wrap_err_with(|| format!("error reading from {}", cert_file.display()))?;
rustls_pemfile::certs(&mut &cert_pem[..])
.collect::<Result<Vec<_>, _>>()
.wrap_err_with(|| {
format!(
"error reading certificate chain from {}",
cert_file.display()
)
})?
} else {
vec![Certificate::generate_self_signed(
&key,
tls_socket_addr,
&args.tls_config.server_name,
None,
)?
.into_rustls()]
};
eyre::Ok(broker::TlsConfig {
socket_addr: tls_socket_addr,
cert_chain,
key: key.to_rustls()?,
})
})
.transpose()?;
let ws_config = args.ws_config.websockets.then(|| args.ws_config.clone());
main_async(args, users, acl, tce_config, tls_config, ws_config)
}
// `#[tokio::main]` doesn't have to be attached to the actual `main()`, and it can accept args
#[tokio::main]
async fn main_async(
args: RunArgs,
users: UsersConfig,
permissions_config: PermissionsConfig,
tce_config: Option<TceConfig>,
tls_config: Option<broker::TlsConfig>,
ws_config: Option<WsConfig>,
) -> crate::Result<()> {
let tce = match tce_config {
Some(tce_config) => {
let (platform, messages) = Platform::start(
tce_config.config,
QuicSocket::bind_udp(args.cluster_addr).await?,
tce_config.joining_running_session,
)?;
Some(TceState {
platform: Arc::new(platform),
messages,
roots: tce_config.roots,
add_nodes: tce_config.add_nodes,
})
}
None => None,
};
let tce_platform = tce.as_ref().map(|tce| tce.platform.clone());
let mut broker = MqttBroker::bind(
args.mqtt_addr,
tls_config,
ws_config,
users,
permissions_config,
tce,
KeepAlive::from_seconds(args.max_keep_alive),
)
.await?;
loop {
tokio::select! {
res = broker.run() => {
res?;
}
res = tokio::signal::ctrl_c() => {
res.wrap_err("error from ctrl_c() handler")?;
if let Some(platform) = tce_platform {
platform.shutdown().await;
}
break;
}
}
}
tracing::info!(
"Ctrl-C received; waiting for {} connections to close",
broker.connections()
);
broker.shutdown().await
}
fn create_tce_config(
secret_key: SecretKey,
addresses: &Addresses,
config: &ClusterConfig,
) -> crate::Result<TceConfig> {
let nodes: HashMap<_, _> = addresses
.addresses
.iter()
.map(|address| (address.key.clone(), address.addr))
.collect();
// The address book is only required to contain the existing nodes.
let joining_running_session = !nodes.contains_key(&secret_key.public_key());
let mut tce_config = tashi_consensus_engine::Config::new(secret_key);
tce_config
.initial_nodes(nodes)
.enable_hole_punching(false)
// TODO: we can dispatch messages before they come to consensus
// but we need to make sure we don't duplicate PUBLISHes.
// .report_events_before_consensus(true)
// Since a FoxMQ cluster is permissioned, don't kick failed nodes which may later recover.
.fallen_behind_kick_seconds(None);
if let Some(cert_path) = &config.cluster_cert {
tce_config.tls_cert_chain(Certificate::load_chain_from(cert_path)?);
}
let roots = if let Some(root_cert_path) = &config.cluster_root_cert {
let roots = Arc::new(RootCertificates::read_from(root_cert_path)?);
tce_config.tls_roots(roots.clone());
Some(roots)
} else {
None
};
let (add_nodes_tx, add_nodes_rx) = mpsc::unbounded_channel();
if config.cluster_accept_peer_with_cert {
tce_config.on_unknown_connection(move |addr, key, certs| {
// Certificate chain has already been verified by TCE at this point.
add_nodes_tx
.send(AddNodeTransaction {
socket_addr: addr.into(),
key: key.clone(),
certs: certs.iter().map(Into::into).collect(),
})
.ok();
Ok(UnknownConnectionAction::VoteToAddPeer)
});
}
Ok(TceConfig {
config: tce_config,
roots,
add_nodes: add_nodes_rx,
joining_running_session,
})
}
/// NOTE: uses blocking I/O internally.
fn read_secret_key(path: &Path) -> crate::Result<SecretKey> {
// There's no benefit to using `tokio::fs` because it just does the blocking work on a background thread.
let pem = std::fs::read(path).wrap_err_with(|| format!("error reading {}", path.display()))?;
SecretKey::from_pem(&pem)
.wrap_err_with(|| format!("error reading P-256 secret key from {}", path.display()))
}