Skip to content

Commit 98c4968

Browse files
tostockertom-kuchlerTobias Stocker
authored
Arbitrary field ordering in invocation requests, single-shot composition invocations, more github test cases. (#86)
* Allow overwriting of loaded functions * Change timestamp points * Allowing for arbitrary field ordering in invocation request * Added single-shot composition invocations * Changed record export to json format * Added two more github tests to check the timestamp and log_function_stdio features * Remove unneeded comments --------- Co-authored-by: Tom Kuchler <tom.j.kuchler@gmail.com> Co-authored-by: Tobias Stocker <tstocker@r630-09.ethz.ch>
1 parent 8477fe9 commit 98c4968

File tree

11 files changed

+473
-181
lines changed

11 files changed

+473
-181
lines changed

.github/workflows/cargo-tests.yaml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,21 @@ jobs:
1818
compute_driver: [ mmu ]
1919
system_driver: [ reqwest_io ]
2020
platform: [ubuntu-24.04, ubuntu-24.04-arm]
21+
additional_features: [""]
2122

23+
# the following additional features should be platform agnostic, thus we only test them on one specification
24+
include:
25+
- platform: ubuntu-24.04
26+
compute_driver: mmu
27+
system_driver: reqwest_io
28+
additional_features: "timestamp"
29+
30+
- platform: ubuntu-24.04
31+
compute_driver: mmu
32+
system_driver: reqwest_io
33+
additional_features: "log_function_stdio"
34+
35+
name: ${{matrix.platform}} - ${{matrix.compute_driver}}|${{matrix.system_driver}} - ${{ matrix.additional_features || 'core' }}
2236
runs-on: ${{matrix.platform}}
2337

2438
steps:
@@ -27,7 +41,7 @@ jobs:
2741
# set up feature specific prerequisits
2842
- name: mmu worker build
2943
if: ${{ matrix.compute_driver == 'mmu' }}
30-
run: cargo build -F ${{matrix.compute_driver}} --bin mmu_worker --target $(arch)-unknown-linux-gnu
44+
run: cargo build -F ${{matrix.compute_driver}},${{matrix.additional_features}} --bin mmu_worker --target $(arch)-unknown-linux-gnu
3145
# run the actual tests
3246
- name: run tests
33-
run: SINGLE_CORE_MODE=true cargo test -F ${{matrix.compute_driver}},${{matrix.system_driver}}
47+
run: SINGLE_CORE_MODE=true cargo test -F ${{matrix.compute_driver}},${{matrix.system_driver}},${{matrix.additional_features}}

dandelion_commons/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ pub enum DispatcherError {
154154
CompositionCombine,
155155
/// dispatcher found mistake when trying to find waiting functions
156156
DependencyError,
157+
/// dispatcher got invalid composition
158+
InvalidComposition,
157159
}
158160

159161
#[derive(Debug, Clone, Copy, PartialEq)]

dandelion_commons/src/records.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,25 @@ use std::time::Instant;
55
/// Maximum usize to expect when converting a record point to a usize
66
/// By setting the last element to this explicitly, the compiler will throw an error,
77
/// if there are more than this, because it enumerates from 0 and won't allow a number to be assigned twice.
8-
const LAST_RECORD_POINT: usize = 11;
8+
const LAST_RECORD_POINT: usize = 9;
99

1010
#[repr(usize)]
1111
#[derive(Clone, Copy, Debug, PartialEq)]
1212
pub enum RecordPoint {
13-
/// Queue to load the function code + ctx
14-
PrepareEnvQueue,
15-
/// Enqueue parsing operation (async)
16-
ParsingQueue,
13+
/// Frontend has finished deserializing
14+
DeserializationEnd,
15+
/// When the request first enters the dispatcher
16+
EnterDispatcher,
17+
/// Queue to get the function executed on the engine (async)
18+
ExecutionQueue,
1719
/// Start parsing (sync)
1820
ParsingStart,
1921
/// Finished Parsing (sync)
2022
ParsingEnd,
21-
/// Dequeue from parsing (async)
22-
ParsingDequeue,
2323
/// Start loading code + alloc ctx (sync)
2424
LoadStart,
2525
/// Start data transfer to the ctx (sync)
2626
TransferStart,
27-
/// Queue to get an engine for execution
28-
GetEngineQueue,
29-
/// Queue to get the function executed on the engine (async)
30-
ExecutionQueue,
3127
/// Start execution of the function on the engine (sync)
3228
EngineStart,
3329
/// End execution of the function on the engine (sync)
@@ -77,23 +73,27 @@ impl FunctionTimestamp {
7773
#[cfg(feature = "timestamp")]
7874
impl fmt::Display for FunctionTimestamp {
7975
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80-
write!(f, "function_id {}, time_points: ", self.function_id)?;
76+
write!(
77+
f,
78+
"{{\"function_id\": {}, \"time_points\": [",
79+
self.function_id
80+
)?;
8181
// write own time points
8282
for index in 0..LAST_RECORD_POINT {
8383
let duration = unsafe { *self.time_points[index].get() };
8484
write!(f, "{},", duration.as_micros())?;
8585
}
8686
let duration = unsafe { *self.time_points[LAST_RECORD_POINT].get() };
87-
write!(f, "{}, children: {{", duration.as_micros())?;
87+
write!(f, "{}], \"children\": [", duration.as_micros())?;
8888
let child_guard = self.children.lock().unwrap();
8989
let num_children = child_guard.len();
9090
if num_children > 0 {
9191
for index in 0..num_children - 1 {
92-
write!(f, "[{}],", child_guard[index])?;
92+
write!(f, "{},", child_guard[index])?;
9393
}
94-
write!(f, "[{}]", child_guard[num_children - 1])?;
94+
write!(f, "{}", child_guard[num_children - 1])?;
9595
}
96-
write!(f, "}}")?;
96+
write!(f, "]}}")?;
9797
Ok(())
9898
}
9999
}

dispatcher/src/dispatcher.rs

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
use core::pin::Pin;
77
use dandelion_commons::{
88
records::{RecordPoint, Recorder},
9-
DandelionResult, FunctionId,
9+
DandelionError, DandelionResult, DispatcherError, FunctionId,
1010
};
1111
use futures::{
1212
future::{join_all, ready, Either},
@@ -102,14 +102,13 @@ impl Dispatcher {
102102

103103
pub async fn queue_function_by_name(
104104
&self,
105-
function_name: String,
105+
function_id: Arc<String>,
106106
inputs: Vec<DispatcherInput>,
107107
caching: bool,
108-
start_time: std::time::Instant,
108+
mut recorder: Recorder,
109109
) -> DandelionResult<(Vec<Option<CompositionSet>>, Recorder)> {
110-
debug!("Queuing function {}", function_name);
111-
let function_id = Arc::new(function_name);
112-
let recorder = Recorder::new(function_id.clone(), start_time);
110+
debug!("Queuing function {}", function_id);
111+
recorder.record(RecordPoint::EnterDispatcher);
113112

114113
let mut input_vec = Vec::with_capacity(inputs.len());
115114
input_vec.resize(inputs.len(), None);
@@ -130,6 +129,53 @@ impl Dispatcher {
130129
return Ok((results, recorder));
131130
}
132131

132+
pub async fn queue_unregistered_composition(
133+
&self,
134+
composition_desc: String,
135+
inputs: Vec<DispatcherInput>,
136+
caching: bool,
137+
recorder: Recorder,
138+
) -> DandelionResult<(Vec<Option<CompositionSet>>, Recorder)> {
139+
debug!("Parsing single use composition");
140+
let composition_meta_pairs = self
141+
.function_registry
142+
.parse_compositions(&composition_desc.as_str())?;
143+
if composition_meta_pairs.len() != 1 {
144+
debug!(
145+
"Expected exactly one composition got {}",
146+
composition_meta_pairs.len()
147+
);
148+
return Err(DandelionError::Dispatcher(
149+
DispatcherError::InvalidComposition,
150+
));
151+
}
152+
153+
debug!(
154+
"Queuing single use composition {}",
155+
composition_meta_pairs[0].0
156+
);
157+
let mut input_vec = Vec::with_capacity(inputs.len());
158+
input_vec.resize(inputs.len(), None);
159+
for (index, input) in inputs.into_iter().enumerate() {
160+
match input {
161+
DispatcherInput::None => (),
162+
DispatcherInput::Set(set) => {
163+
input_vec[index] = Some(set);
164+
}
165+
}
166+
}
167+
let results = self
168+
.queue_composition(
169+
composition_meta_pairs[0].1.clone(),
170+
input_vec,
171+
caching,
172+
recorder.get_sub_recorder(),
173+
)
174+
.await?;
175+
176+
return Ok((results, recorder));
177+
}
178+
133179
/// Queue a composition for execution.
134180
/// Returns a Vec of sets with the index corresponding to the set index in the composition definition
135181
///
@@ -432,7 +478,7 @@ impl Dispatcher {
432478
) -> Pin<
433479
Box<dyn Future<Output = DandelionResult<Vec<Option<CompositionSet>>>> + 'dispatcher + Send>,
434480
> {
435-
trace!("queueing function with id: {}", function_id);
481+
debug!("Queueing function with id: {}", function_id);
436482
Box::pin(async move {
437483
// find an engine capable of running the function
438484
// TODO: think about more distinctions, that allow pushing chains of functions which can be executed by single engine,
@@ -447,7 +493,6 @@ impl Dispatcher {
447493

448494
let metadata = func_info.metadata;
449495
// run on engine
450-
recorder.record(RecordPoint::GetEngineQueue);
451496
trace!(
452497
"Running function {} with input sets {:?} and output sets {:?} and alternatives: {:?}",
453498
function_id,
@@ -473,12 +518,12 @@ impl Dispatcher {
473518
recorder.record(RecordPoint::FutureReturn);
474519

475520
#[cfg(feature = "log_function_stdio")]
476-
for opt in result.content.iter() {
521+
for opt in context.content.iter() {
477522
if opt.as_ref().is_some_and(|s| s.ident == "stdio") {
478523
for itm in opt.as_ref().unwrap().buffers.iter() {
479524
if itm.ident == "stderr" && itm.data.size > 0 {
480525
let mut stderr_output: Vec<u8> = vec![0; itm.data.size];
481-
result.context.read(itm.data.offset, &mut stderr_output)?;
526+
context.context.read(itm.data.offset, &mut stderr_output)?;
482527
warn!(
483528
"Function result contains stderr output:\n{}",
484529
std::str::from_utf8(stderr_output.as_slice())
@@ -487,7 +532,7 @@ impl Dispatcher {
487532
}
488533
if itm.ident == "stdout" && itm.data.size > 0 {
489534
let mut stdout_output: Vec<u8> = vec![0; itm.data.size];
490-
result.context.read(itm.data.offset, &mut stdout_output)?;
535+
context.context.read(itm.data.offset, &mut stdout_output)?;
491536
debug!(
492537
"Function output:\n{}",
493538
std::str::from_utf8(stdout_output.as_slice())

dispatcher/src/function_registry.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,19 @@ impl FunctionRegistry {
550550
Ok(())
551551
}
552552

553+
/// Parses the compositions without inserting it into the registry.
554+
pub fn parse_compositions(
555+
&self,
556+
composition_desc: &str,
557+
) -> DandelionResult<Vec<(FunctionId, Composition, Metadata)>> {
558+
// TODO: might want to return the parsing issue back to the user in a better way
559+
let module = dparser::parse(composition_desc).map_err(|parse_error| {
560+
print_errors(composition_desc, parse_error);
561+
DandelionError::Composition(CompositionError::ParsingError)
562+
})?;
563+
self.composition_from_module(module)
564+
}
565+
553566
/// Checks if a function identifier is registered in the function registry.
554567
pub fn exists_id(&self, function_id: &FunctionId) -> bool {
555568
let lock_guard = self

dispatcher/src/remote.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use std::thread::{self, JoinHandle};
2+
3+
use dandelion_commons::DandelionResult;
4+
use log::error;
5+
use machine_interface::machine_config::EngineType;
6+
use multinode::client::{create_client, RemoteClient};
7+
use tokio::runtime::Builder;
8+
9+
use crate::queue::{get_engine_flag, WorkQueue};
10+
11+
static RETRY_TIMEOUT: u64 = 1000;
12+
13+
enum Status {
14+
INACTIVE,
15+
ACTIVE,
16+
STOPPED,
17+
FINISHED,
18+
}
19+
20+
pub struct RemoteNode {
21+
client: RemoteClient,
22+
work_queue: WorkQueue,
23+
engines: Vec<(EngineType, u32)>,
24+
status: Status,
25+
polling_thread: Option<JoinHandle<()>>,
26+
}
27+
28+
impl RemoteNode {
29+
pub async fn new(
30+
local_host: String,
31+
local_port: u16,
32+
remote_host: String,
33+
remote_port: u16,
34+
work_queue: WorkQueue,
35+
engines: Vec<(EngineType, u32)>,
36+
) -> RemoteNode {
37+
let client = create_client(
38+
local_host,
39+
local_port,
40+
remote_host,
41+
remote_port,
42+
RETRY_TIMEOUT,
43+
)
44+
.await;
45+
RemoteNode {
46+
client,
47+
work_queue,
48+
engines,
49+
status: Status::INACTIVE,
50+
polling_thread: None,
51+
}
52+
}
53+
54+
fn start_polling(&mut self) {
55+
let queue = self.work_queue.clone();
56+
self.polling_thread = Some(thread::spawn(move || {
57+
let rt = Builder::new_current_thread()
58+
.enable_io()
59+
.build()
60+
.expect("Failed to create polling thread!");
61+
rt.block_on(async {
62+
loop {
63+
let (work, dept) = queue.get_work(0);
64+
}
65+
})
66+
}));
67+
}
68+
69+
async fn stop_polling(&mut self) {
70+
self.status = Status::STOPPED;
71+
if let Some(handle) = self.polling_thread.take() {
72+
let res = handle.join();
73+
if let Err(err) = res {
74+
error!("Polling thread exited with error: {:?}", err);
75+
}
76+
}
77+
}
78+
79+
pub fn add_remote(
80+
&mut self,
81+
remote_host: String,
82+
remote_port: u16,
83+
engines: Vec<(EngineType, u32)>,
84+
) -> DandelionResult<()> {
85+
// TODO: add node to list of remotes
86+
87+
if self.polling_thread.is_none() {
88+
self.start_polling();
89+
}
90+
91+
Ok(())
92+
}
93+
94+
pub fn remove_remote(&mut self, remote_host: String, remote_port: u16) -> DandelionResult<()> {
95+
// TODO: remove remote from list of remotes
96+
97+
// TODO: wait for all jobs to complete
98+
// TODO: if list becomes empty stop the polling thread
99+
100+
Ok(())
101+
}
102+
}

machine_interface/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ backend_debug = []
1717
reqwest_io = ["std", "dep:reqwest", "dep:http", "dep:bytes", "dep:memcache"]
1818
bytes_context = ["std", "dep:bytes"]
1919
timestamp = ["dandelion_commons/timestamp"]
20+
vendored-ssl = ["dep:openssl"]
2021
test_export = []
2122

2223
[build-dependencies]
@@ -58,7 +59,7 @@ memcache = { version = "0.18.0", optional = true }
5859
bytes = { version = "1.6", optional = true}
5960
# To make sure we can compile everywhere.
6061
# This sets it for the other dependencies we have that use it.
61-
openssl = { version = "0.10", features = ["vendored"] }
62+
openssl = { version = "0.10", features = ["vendored"], optional = true }
6263

6364
# disable benchmarks in library, to not run all unit tests on every benchmark
6465
# also needs to be disabled for criterion flags to work that are not available for tests

0 commit comments

Comments
 (0)