Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.

Commit 278b8a9

Browse files
definitelynobodyrvolosatovs
authored andcommitted
feat: restrict which ports a workload is allowed to use
- Don't allow ports outside of a specific range. - Don't allow ports already in use by another running workload. - Prevent users listening on too many ports. - Fetch Enarx.toml from drawbridge to determine which ports it will use. Signed-off-by: Nicholas Farshidmehr <nicholas@profian.com>
1 parent ae678ea commit 278b8a9

File tree

8 files changed

+308
-25
lines changed

8 files changed

+308
-25
lines changed

Cargo.nix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,6 +1998,7 @@ in
19981998
features = builtins.concatLists [
19991999
[ "bytes" ]
20002000
[ "default" ]
2001+
[ "fs" ]
20012002
[ "io-std" ]
20022003
[ "io-util" ]
20032004
[ "libc" ]

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = "2021"
66
[dependencies]
77
axum = { version = "0.5.5", features = ["multipart", "headers"] }
88
askama = { version = "0.11.1", default-features = false }
9-
tokio = { version = "1.19.2", features = ["macros", "process", "rt-multi-thread", "io-util", "sync"] }
9+
tokio = { version = "1.19.2", features = ["macros", "process", "rt-multi-thread", "io-util", "fs", "sync"] }
1010
tracing = { version = "0.1.35", default-features = false, features = ["std", "release_max_level_info"] }
1111
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
1212
tower-http = { version = "0.3.0", features = ["trace"] }

src/data.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// SPDX-FileCopyrightText: 2022 Profian Inc. <opensource@profian.com>
2+
// SPDX-License-Identifier: AGPL-3.0-only
3+
4+
use crate::jobs::Job;
5+
6+
#[derive(Debug, Default)]
7+
pub struct Data {
8+
job: Option<Job>,
9+
}
10+
11+
impl Data {
12+
pub fn new(job: Option<Job>) -> Self {
13+
Self { job }
14+
}
15+
16+
pub fn job(&self) -> &Option<Job> {
17+
&self.job
18+
}
19+
20+
pub fn job_mut(&mut self) -> Option<&mut Job> {
21+
self.job.as_mut()
22+
}
23+
24+
pub async fn kill_job(&mut self) {
25+
if let Some(job) = &mut self.job {
26+
job.kill().await;
27+
}
28+
29+
self.job = None;
30+
}
31+
}

src/jobs.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
// SPDX-FileCopyrightText: 2022 Profian Inc. <opensource@profian.com>
2+
// SPDX-License-Identifier: AGPL-3.0-only
3+
4+
use crate::ports;
5+
16
use std::process::Stdio;
27
use std::str::FromStr;
38
use std::sync::atomic::AtomicUsize;
@@ -47,11 +52,16 @@ pub struct Job {
4752
slug: Option<String>,
4853
wasm: Option<NamedTempFile>,
4954
toml: Option<NamedTempFile>,
55+
reserved_ports: Vec<u16>,
5056
}
5157

5258
impl Drop for Job {
5359
fn drop(&mut self) {
5460
COUNT.fetch_sub(1, Ordering::SeqCst);
61+
62+
if !self.reserved_ports.is_empty() {
63+
error!("a job was not cleaned up correctly");
64+
}
5565
}
5666
}
5767

@@ -66,6 +76,7 @@ impl Job {
6676
slug: Option<String>,
6777
wasm: Option<NamedTempFile>,
6878
toml: Option<NamedTempFile>,
79+
reserved_ports: Vec<u16>,
6980
) -> Result<Self, Response> {
7081
let workload_type = WorkloadType::from_str(&workload_type).map_err(|e| {
7182
debug!("Failed to parse workload type: {e}");
@@ -122,6 +133,7 @@ impl Job {
122133
slug,
123134
wasm,
124135
toml,
136+
reserved_ports,
125137
})
126138
}
127139

@@ -131,4 +143,10 @@ impl Job {
131143
Standard::Error => self.exec.stderr.as_mut().unwrap().read(buffer).await,
132144
}
133145
}
146+
147+
pub async fn kill(&mut self) {
148+
let _ = self.exec.kill().await;
149+
ports::free(&self.reserved_ports).await;
150+
self.reserved_ports.clear();
151+
}
134152
}

src/main.rs

Lines changed: 125 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,22 @@
55
#![warn(clippy::all, rust_2018_idioms, unused_lifetimes)]
66

77
mod auth;
8+
mod data;
89
mod jobs;
10+
mod ports;
911
mod redirect;
1012
mod reference;
1113
mod secret;
1214
mod templates;
1315

16+
use crate::data::Data;
1417
use crate::reference::Ref;
1518
use crate::templates::{HtmlTemplate, IdxTemplate, JobTemplate};
1619

1720
use std::fs::read;
1821
use std::io::Write;
1922
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
23+
use std::ops::Range;
2024
use std::time::Duration;
2125

2226
use axum::extract::{Multipart, Query};
@@ -32,9 +36,10 @@ use lazy_static::lazy_static;
3236
use once_cell::sync::Lazy;
3337
use reqwest::{Client, ClientBuilder};
3438
use serde::Deserialize;
39+
use tokio::fs::read_to_string;
3540
use tokio::time::{sleep, timeout};
3641
use tower_http::trace::TraceLayer;
37-
use tracing::{debug, error, info};
42+
use tracing::{debug, error, info, warn};
3843
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
3944

4045
static HTTP: Lazy<Client> = Lazy::new(|| {
@@ -53,11 +58,6 @@ lazy_static! {
5358
.collect::<Vec<_>>();
5459
}
5560

56-
#[derive(Debug, Default)]
57-
struct Data {
58-
job: Option<jobs::Job>,
59-
}
60-
6161
/// Demo workload executor.
6262
///
6363
/// Any command-line options listed here may be specified by one or
@@ -97,6 +97,18 @@ struct Args {
9797
#[clap(long, default_value_t = 15 * 60)]
9898
timeout_starred: u64,
9999

100+
/// The lowest listen port allowed in an Enarx.toml.
101+
#[clap(long, default_value_t = 0)]
102+
port_min: u16,
103+
104+
/// The highest listen port allowed in an Enarx.toml.
105+
#[clap(long, default_value_t = 0)]
106+
port_max: u16,
107+
108+
/// The maximum number of listen ports a workload is allowed to have (0 to disable).
109+
#[clap(long, default_value_t = 0)]
110+
listen_max: u16,
111+
100112
/// Command to execute, normally path to `enarx` binary.
101113
/// This command will be executed as: `<cmd> run --wasmcfgfile <path-to-config> <path-to-wasm>`
102114
#[clap(long, default_value = "enarx")]
@@ -135,6 +147,16 @@ impl Args {
135147
let other = Other {
136148
addr: self.addr,
137149
jobs: self.jobs,
150+
port_range: match (self.port_min, self.port_max) {
151+
(0, 0) => None,
152+
(min, 0) => Some(min..u16::MAX),
153+
(min, max) => Some(min..max),
154+
},
155+
listen_max: if self.listen_max == 0 {
156+
None
157+
} else {
158+
Some(self.listen_max)
159+
},
138160
cmd: self.command,
139161
};
140162

@@ -185,6 +207,8 @@ impl Limits {
185207
struct Other {
186208
addr: SocketAddr,
187209
jobs: usize,
210+
port_range: Option<Range<u16>>,
211+
listen_max: Option<u16>,
188212
cmd: String,
189213
}
190214

@@ -245,7 +269,17 @@ async fn main() -> anyhow::Result<()> {
245269
.route(
246270
"/",
247271
get(move |user| root_get(user, limits))
248-
.post(move |user, mp| root_post(user, mp, other.cmd, limits, other.jobs))
272+
.post(move |user, mp| {
273+
root_post(
274+
user,
275+
mp,
276+
other.cmd,
277+
limits,
278+
other.port_range,
279+
other.listen_max,
280+
other.jobs,
281+
)
282+
})
249283
.delete(root_delete),
250284
);
251285

@@ -261,7 +295,7 @@ async fn root_get(user: Option<Ref<auth::User<Data>>>, limits: Limits) -> impl I
261295
let (user, star) = match user {
262296
None => (false, false),
263297
Some(user) => {
264-
if user.read().await.data.job.is_some() {
298+
if user.read().await.data.job().is_some() {
265299
return HtmlTemplate(JobTemplate).into_response();
266300
}
267301

@@ -288,13 +322,15 @@ async fn root_post(
288322
mut multipart: Multipart,
289323
command: String,
290324
limits: Limits,
325+
port_range: Option<Range<u16>>,
326+
listen_max: Option<u16>,
291327
jobs: usize,
292328
) -> impl IntoResponse {
293329
let star = user.read().await.is_starred("enarx/enarx").await;
294330
let ttl = limits.time_to_live(star);
295331
let size = limits.size(star);
296332

297-
if user.read().await.data.job.is_some() {
333+
if user.read().await.data.job().is_some() {
298334
return Err(Redirect::to("/").into_response());
299335
}
300336

@@ -413,21 +449,81 @@ async fn root_post(
413449

414450
let workload_type = workload_type.ok_or_else(|| StatusCode::BAD_REQUEST.into_response())?;
415451

452+
let enarx_config_string = match &toml {
453+
Some(toml) => read_to_string(toml).await.map_err(|e| {
454+
debug!("failed to read enarx config file: {e}");
455+
StatusCode::INTERNAL_SERVER_ERROR.into_response()
456+
})?,
457+
None => {
458+
let slug = slug
459+
.as_ref()
460+
.ok_or_else(|| StatusCode::BAD_REQUEST.into_response())?;
461+
let (repo, tag) = slug
462+
.split_once(':')
463+
.ok_or_else(|| StatusCode::BAD_REQUEST.into_response())?;
464+
get_enarx_config_from_drawbridge(repo, tag)
465+
.await
466+
.map_err(|e| {
467+
debug!("failed to get toml from drawbridge with tag: {}: {e}", slug);
468+
StatusCode::BAD_REQUEST.into_response()
469+
})?
470+
.text()
471+
.await
472+
.map_err(|e| {
473+
debug!(
474+
"failed to get toml body from drawbridge response: {}: {e}",
475+
slug
476+
);
477+
StatusCode::BAD_REQUEST.into_response()
478+
})?
479+
}
480+
};
481+
482+
let ports = ports::get_listen_ports(&enarx_config_string).map_err(|e| {
483+
debug!("failed to get ports from enarx config: {e}");
484+
StatusCode::BAD_REQUEST.into_response()
485+
})?;
486+
487+
if let Some(listen_max) = listen_max {
488+
// Check if the user is trying to listen on too many ports.
489+
if ports.len() > listen_max as usize {
490+
return Err(redirect::too_many_listeners(listen_max).into_response());
491+
}
492+
}
493+
494+
if let Some(port_range) = port_range {
495+
// Check if the port is outside of the range of allowed ports
496+
let illegal_ports = ports
497+
.iter()
498+
.filter(|port| !port_range.contains(port))
499+
.cloned()
500+
.collect::<Vec<_>>();
501+
502+
if !illegal_ports.is_empty() {
503+
return Err(redirect::illegal_ports(&illegal_ports, port_range).into_response());
504+
}
505+
}
506+
507+
// Check if a port is already in use by another running workload
508+
ports::try_reserve(&ports)
509+
.await
510+
.map_err(|port_conflicts| redirect::port_conflicts(&port_conflicts).into_response())?;
511+
416512
// Create the new job and get an identifier.
417513
let uuid = {
418514
let mut lock = user.write().await;
419515

420-
if lock.data.job.is_some() {
516+
if lock.data.job().is_some() {
421517
return Err(Redirect::to("/").into_response());
422518
}
423519

424520
if jobs::Job::count() >= jobs {
425521
return Err(redirect::too_many_workloads().into_response());
426522
}
427523

428-
let job = jobs::Job::new(command, workload_type, slug, wasm, toml)?;
524+
let job = jobs::Job::new(command, workload_type, slug, wasm, toml, ports)?;
429525
let uuid = job.uuid;
430-
lock.data = Data { job: Some(job) };
526+
lock.data = Data::new(Some(job));
431527
uuid
432528
};
433529

@@ -439,8 +535,8 @@ async fn root_post(
439535
if let Some(user) = weak.upgrade() {
440536
debug!("timeout for: {}", uuid);
441537
let mut lock = user.write().await;
442-
if lock.data.job.as_ref().map(|j| j.uuid) == Some(uuid) {
443-
lock.data.job = None;
538+
if lock.data.job().as_ref().map(|j| j.uuid) == Some(uuid) {
539+
lock.data.kill_job().await;
444540
}
445541
}
446542
});
@@ -459,17 +555,23 @@ struct EnarxTomlFallbackParams {
459555
tag: String,
460556
}
461557

558+
async fn get_enarx_config_from_drawbridge(
559+
repo: &str,
560+
tag: &str,
561+
) -> Result<reqwest::Response, reqwest::Error> {
562+
HTTP.get(&format!(
563+
"https://store.profian.com/api/v0.2.0/{repo}/_tag/{tag}/tree/Enarx.toml"
564+
))
565+
.send()
566+
.await
567+
}
568+
462569
async fn enarx_toml_fallback(
463570
_user: Ref<auth::User<Data>>,
464571
Query(params): Query<EnarxTomlFallbackParams>,
465572
) -> Result<String, (StatusCode, String)> {
466573
let EnarxTomlFallbackParams { repo, tag } = params;
467-
let response = HTTP
468-
.get(&format!(
469-
"https://store.profian.com/api/v0.2.0/{repo}/_tag/{tag}/tree/Enarx.toml"
470-
))
471-
.send()
472-
.await;
574+
let response = get_enarx_config_from_drawbridge(&repo, &tag).await;
473575
let response = response.map_err(|e| {
474576
(
475577
StatusCode::INTERNAL_SERVER_ERROR,
@@ -497,9 +599,9 @@ async fn enarx_toml_fallback(
497599
async fn root_delete(user: Ref<auth::User<Data>>) -> StatusCode {
498600
let mut lock = user.write().await;
499601

500-
if let Some(uuid) = lock.data.job.as_ref().map(|j| j.uuid) {
602+
if let Some(uuid) = lock.data.job().as_ref().map(|j| j.uuid) {
501603
debug!("killing: {}", uuid);
502-
lock.data.job = None;
604+
lock.data.kill_job().await;
503605
}
504606

505607
StatusCode::OK
@@ -508,7 +610,7 @@ async fn root_delete(user: Ref<auth::User<Data>>) -> StatusCode {
508610
async fn reader(user: Ref<auth::User<Data>>, kind: jobs::Standard) -> Result<Vec<u8>, StatusCode> {
509611
let mut buf = [0; 4096];
510612

511-
match user.write().await.data.job.as_mut() {
613+
match user.write().await.data.job_mut() {
512614
None => Err(StatusCode::NOT_FOUND),
513615
Some(job) => {
514616
let future = job.read(kind, &mut buf);

0 commit comments

Comments
 (0)