diff --git a/Rust/Savina/src/lib/matrix.rs b/Rust/Savina/src/lib/matrix.rs index 51a87ada..88aca443 100644 --- a/Rust/Savina/src/lib/matrix.rs +++ b/Rust/Savina/src/lib/matrix.rs @@ -23,25 +23,25 @@ * * @author Johannes Hayeß */ +#![allow(dead_code)] use std::ops::Add; +use std::fmt; -#[derive(Default, Debug, Clone)] +#[derive(Debug, Clone)] pub struct Matrix { data: Vec, + size_x: usize, size_y: usize, } -#[derive(Default)] -pub struct TransposedMatrix { - data: Vec, - size_x: usize, -} +pub struct TransposedMatrix(Matrix); -impl Matrix { - pub fn new(size_x: usize, size_y: usize) -> Self { +impl Matrix { + pub fn new(size_x: usize, size_y: usize) -> Self where T: Default + Clone { Matrix:: { data: vec![T::default(); size_x * size_y], + size_x, size_y, } } @@ -53,39 +53,58 @@ impl Matrix { pub fn set(&mut self, x: usize, y: usize, value: T) { self.data[x * self.size_y + y] = value; } + + pub fn transpose(self) -> TransposedMatrix { + TransposedMatrix(self) + } } pub fn matrix_sum(matrices: &[Matrix]) -> Matrix -where - T: Default + Clone + Copy + Add, + where + T: Default + Clone + Copy + Add, { - let size_x = matrices[0].data.len() / matrices[0].size_y; + let size_x = matrices[0].size_x; let size_y = matrices[0].size_y; let mut result = Matrix::::new(size_x, size_y); for x in 0..size_x { for y in 0..size_y { - result.data[y * size_x + x] = matrices - .iter() - .fold(T::default(), |acc, m| acc + m.data[y * size_x + x]) + for m in matrices { + result.set(x, y, *result.get(x, y) + *m.get(x, y)) + } } } result } -impl TransposedMatrix { - pub fn new(size_x: usize, size_y: usize) -> Self { - TransposedMatrix:: { - data: vec![T::default(); size_x * size_y], - size_x, - } +impl TransposedMatrix { + pub fn new(size_x: usize, size_y: usize) -> Self where T: Default + Clone { + Self(Matrix::new(size_y, size_x)) } pub fn get(&self, x: usize, y: usize) -> &T { - &self.data[y * self.size_x + x] + self.0.get(y, x) } pub fn set(&mut self, x: usize, y: usize, value: T) { - self.data[y * self.size_x + x] = value; + self.0.set(y, x, value) + } +} + +impl fmt::Display for Matrix { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for i in 0..self.size_x { + for j in 0..self.size_y { + write!(f, "{} ", self.get(i, j))?; + } + write!(f, "\n")?; + } + Ok(()) + } +} + +impl fmt::Display for TransposedMatrix { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) } } diff --git a/Rust/Savina/src/parallelism/FilterBank.lf b/Rust/Savina/src/parallelism/FilterBank.lf index 1e1ed0ed..26cd9199 100644 --- a/Rust/Savina/src/parallelism/FilterBank.lf +++ b/Rust/Savina/src/parallelism/FilterBank.lf @@ -1,6 +1,6 @@ /** * Copyright (C) 2020 TU Dresden - * + * * This benchmark is particularly interesting for LF, as it has an interesting * structure and highlights a significant advantage compared to Akka. The * benchmark implements a "filter bank". Each bank consists of a pipeline of @@ -36,7 +36,7 @@ * not needed at all in the LF implementation and makes both the "TaggedForward" * and the "Integrate" actors superflous. The combine reactor simply has a * multiport import and thus simultaneously receives values from all N banks. - * + * * @author Christian Menard * @author Hannes Klein * @author Johannes Hayeß @@ -50,25 +50,25 @@ target Rust { import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; -reactor Producer(numChannels: usize(8), numSimulations: usize(34816), numColumns: usize(16384)) { - state num_simulations(numSimulations); - +reactor Producer(num_channels: usize(8), num_simulations: usize(34816), num_columns: usize(16384)) { + state num_simulations(num_simulations); + state numMessagesSent: usize(0); - + input start: unit; output next: unit; output finished: unit; - + logical action sendNext: unit; - + reaction(start) -> sendNext {= // reset local state self.numMessagesSent = 0; - + // start execution ctx.schedule(sendNext, Asap); =} - + reaction(sendNext) -> sendNext, next, finished {= if self.numMessagesSent < self.num_simulations { ctx.set(next, ()); @@ -81,140 +81,134 @@ reactor Producer(numChannels: usize(8), numSimulations: usize(34816), numColumns } reactor Source { - state maxValue: usize(1000); + state max_value: usize(1000); state current: usize(0); - + input next: unit; output value: f64; - input inFinished: unit; - output outFinished: unit; - + input in_finished: unit; + output out_finished: unit; + reaction (next) -> value {= - ctx.set(value, self.current); - self.current = (self.current + 1) % self.maxValue; + ctx.set(value, self.current as f64); + self.current = (self.current + 1) % self.max_value; =} - - reaction (inFinished) -> outFinished {= - ctx.set(outFinished, ()); + + reaction (in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state self.current = 0; =} } -reactor Bank(bank_index: usize(0), numColumns: usize(16384), numChannels: usize(8)) { - input inValue: double; - input inFinished: void; - output outValue: double; - output outFinished: void; - - input setF: Matrix; - input setH: Matrix; - +reactor Bank(bank_index: usize(0), num_columns: usize(16384), num_channels: usize(8), coefficients: super::filter_bank::MatrixPair({= panic!("") =})) { + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; + + input set_f: Arc>; + input set_h: Arc>; + preamble {= + use std::sync::Arc; use crate::matrix::Matrix; =} - - delay0 = new Delay(delayLength={=numColumns - 1=}); - fir0 = new FirFilter(bank_index=bank_index, peekLength=numColumns); - sample = new SampleFilter(sampleRate=numColumns); - delay1 = new Delay(delayLength={=numColumns - 1=}); - fir1 = new FirFilter(bank_index=bank_index, peekLength=numColumns); - - inFinished, delay0.outFinished, fir0.outFinished, sample.outFinished, delay1.outFinished, fir1.outFinished -> - delay0.inFinished, fir0.inFinished, sample.inFinished, delay1.inFinished, fir1.inFinished, outFinished; - - inValue, delay0.outValue, fir0.outValue, sample.outValue, delay1.outValue, fir1.outValue -> - delay0.inValue, fir0.inValue, sample.inValue, delay1.inValue, fir1.inValue, outValue; - - setH -> fir0.setCoefficients; - setF -> fir1.setCoefficients; + + delay0 = new Delay(delayLength={=num_columns - 1=}); + fir0 = new FirFilter(bank_index=bank_index, peekLength=num_columns, coefficients={=coefficients.0.clone()=}); + sample = new SampleFilter(sampleRate=num_columns); + delay1 = new Delay(delayLength={=num_columns - 1=}); + fir1 = new FirFilter(bank_index=bank_index, peekLength=num_columns, coefficients={=coefficients.1.clone()=}); + + in_finished, delay0.out_finished, fir0.out_finished, sample.out_finished, delay1.out_finished, fir1.out_finished -> + delay0.in_finished, fir0.in_finished, sample.in_finished, delay1.in_finished, fir1.in_finished, out_finished; + + in_value, delay0.out_value, fir0.out_value, sample.out_value, delay1.out_value, fir1.out_value -> + delay0.in_value, fir0.in_value, sample.in_value, delay1.in_value, fir1.in_value, out_value; } reactor Delay(delayLength: usize(16383)) { state delay_length(delayLength); - - state myState: Vec; - state placeHolder: usize(0); - - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; - + + state my_state: Vec; + state placeholder: usize(0); + + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; + reaction(startup) {= // one time init - self.myState = vec![0.0; self.delayLength]; - self.placeHolder = 0; + self.my_state = vec![0.0; self.delay_length]; + self.placeholder = 0; =} - - reaction(inValue) -> outValue {= - let result = ctx.get(inValue).unwrap(); - ctx.set(outValue, self.myState[self.placeHolder]); - myState[self.placeHolder] = result; - self.placeHolder = (placeHolder + 1) % self.delay_length; + + reaction(in_value) -> out_value {= + let result = ctx.get(in_value).unwrap(); + ctx.set(out_value, self.my_state[self.placeholder]); + self.my_state[self.placeholder] = result; + self.placeholder = (self.placeholder + 1) % self.delay_length; =} - - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); - + + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); + // reset local state - self.myState = vec![0.0; self.delay_length]; - self.placeHolder = 0; + self.my_state = vec![0.0; self.delay_length]; + self.placeholder = 0; =} } -reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { +reactor FirFilter(bank_index: usize(0), peekLength: usize(16384), coefficients: Arc>({= panic!("") =})) { state bank_index(bank_index); state peek_length(peekLength); - + state data: Vec; - state dataIndex: usize(0); - state dataFull: bool(false); - state coefficients: {=Rc>=}; - - input setCoefficients: Rc>; - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; - + state data_index: usize(0); + state data_full: bool(false); + state coefficients: Arc>(coefficients); + + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; + preamble {= + use std::sync::Arc; use crate::matrix::Matrix; =} - + reaction(startup) {= // reset local state - data = vec![0.0; self.peek_length]; - self.dataIndex = 0; - self.dataFull = false; - =} - - reaction(setCoefficients) {= - self.coefficients = ctx.get(setCoefficients).unwrap(); + self.data = vec![0.0; self.peek_length]; + self.data_index = 0; + self.data_full = false; =} - - reaction(inValue) -> outValue {= - data[self.dataIndex] = ctx.get(inValue).unwrap(); - self.dataIndex += 1; - - if self.dataIndex == self.peek_length { - self.dataFull = true; - self.dataIndex = 0; + + reaction(in_value) -> out_value {= + self.data[self.data_index] = ctx.get(in_value).unwrap(); + self.data_index += 1; + + if self.data_index == self.peek_length { + self.data_full = true; + self.data_index = 0; } - - if self.dataFull { - let sum = 0.0; + + if self.data_full { + let mut sum = 0.0; for (i, d) in self.data.iter().enumerate() { - sum += data * self.coefficients.get(bank_index, peek_length - i - 1); + sum += self.data[i] * self.coefficients.get(self.bank_index, self.peek_length - i - 1); } - ctx.set(outValue, sum); + ctx.set(out_value, sum); } =} - - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, 0); - + + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); + // reset local state self.data = vec![0.0; self.peek_length]; self.data_index = 0; @@ -224,159 +218,153 @@ reactor FirFilter(bank_index: usize(0), peekLength: usize(16384)) { reactor SampleFilter(sampleRate: usize(16384)) { state sample_rate(sampleRate); - - state samplesReceived: usize(0); - - input inValue: f64; - input inFinished: unit; - output outValue: f64; - output outFinished: unit; - - reaction(inValue) -> outValue {= - if self.samplesReceived == 0 { - ctx.set(outValue, ctx.get(inValue).unwrap()); + + state samples_received: usize(0); + + input in_value: f64; + input in_finished: unit; + output out_value: f64; + output out_finished: unit; + + reaction(in_value) -> out_value {= + if self.samples_received == 0 { + ctx.set(out_value, ctx.get(in_value).unwrap()); } else { - ctx.set(outValue, 0.0); + ctx.set(out_value, 0.0); } - self.samplesReceived = (self.samplesReceived + 1) % self.sample_rate; + self.samples_received = (self.samples_received + 1) % self.sample_rate; =} - - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); - + + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); + // reset local state - self.samplesReceived = 0; + self.samples_received = 0; =} } -reactor Combine(numChannels: usize(8)) { - state num_channels(numChannels); - - input[numChannels] inValues: f64; - input[numChannels] inFinished: unit; - output outValue: f64; - output outFinished: unit; - - state numFinished: usize(0); - - reaction(inValues) -> outValue {= - let sum = 0; +reactor Combine(num_channels: usize(8)) { + state num_channels(num_channels); + + input[num_channels] inValues: f64; + input[num_channels] in_finished: unit; + output out_value: f64; + output out_finished: unit; + + state num_finished: usize(0); + + reaction(inValues) -> out_value {= + let mut sum = 0.0; for x in inValues { sum += ctx.get(x).unwrap(); } - ctx.set(outValue, sum); + ctx.set(out_value, sum); =} - - reaction(inFinished) -> outFinished {= - for port in inFinished { - if ctx.is_present(port) { - self.numFinished += 1; - } - } - if self.numFinished == self.num_channels { - ctx.set(outFinished, ()); + + reaction(in_finished) -> out_finished {= + self.num_finished += in_finished.iterate_set().count(); + + if self.num_finished == self.num_channels { + ctx.set(out_finished, ()); // reset local state - self.numFinished = 0; + self.num_finished = 0; } =} } reactor Sink(printRate: usize(100)) { state print_rate(printRate); - + state count: usize(0); - - input inValue: f64; - input inFinished: unit; - output outFinished: unit; - - reaction(inValue) {= - let result = ctx.get(inValue).unwrap(); - - if self.count % self.print_Rate == 0 { + + input in_value: f64; + input in_finished: unit; + output out_finished: unit; + + reaction(in_value) {= + let result = ctx.get(in_value).unwrap(); + + if self.count % self.print_rate == 0 { info!("SinkActor: result = {}", result); } self.count += 1; =} - - reaction(inFinished) -> outFinished {= - ctx.set(outFinished, ()); + + reaction(in_finished) -> out_finished {= + ctx.set(out_finished, ()); // reset local state self.count = 0; =} } -main reactor (numIterations: usize(12), numSimulations: usize(34816), numColumns: usize(16384), numChannels: usize(8)) { - state num_iterations(numIterations); - state num_simulations(numSimulations); - state num_columns(numColumns); - state num_channels(numChannels); - +main reactor (num_iterations: usize(12), num_simulations: usize(34816), num_columns: usize(16384), num_channels: usize(8)) { + state num_iterations(num_iterations); + state num_simulations(num_simulations); + state num_columns(num_columns); + state num_channels(num_channels); + preamble {= use std::sync::Arc; - =} - - reaction(startup) -> banks.setF, banks.setH {= - // initialize the coefficients of all FIR filters - let mut mH = Matrix(numChannels, numColumns); - let mut mF = Matrix(numChannels, numColumns); - - for j in 0..self.num_channels { - for i in 0..self.num_columns { - let h = (1.0 * i * numColumns) + (1.0 * j * numChannels) + j + i + j + 1; - let f = (1.0 * i * j) + (1.0 * j * j) + j + i; - mH.set(j, i, h); - mF.set(j, i, f); - } - } + pub type MatrixPair = (Arc>, Arc>); - // convert to immutable pointers before sending. This ensures that all recipients can receive a pointer - // to the same matrix and no copying is needed - let mH_r = Arc::new(mH); - let mF_r = Arc::new(mF); - - for (h, f) in banks__setH.into_iter().zip(&banks__setF) { - ctx.set(h, Arc::clone(&mH_r)); - ctx.set(f, Arc::clone(&mF_r)); + fn create_coeffs(num_channels: usize, num_columns: usize) -> MatrixPair { + // initialize the coefficients of all FIR filters + let mut mH: Matrix = Matrix::new(num_channels, num_columns); + let mut mF: Matrix = Matrix::new(num_channels, num_columns); + + for j in 0..num_channels { + for i in 0..num_columns { + let h = ((i * num_columns) + (j * num_channels) + j + i + j + 1) as f64; + let f = ((i * j) + (j * j) + j + i) as f64; + mH.set(j, i, h); + mF.set(j, i, f); + } + } + + // convert to immutable pointers before sending. This ensures that all recipients can receive a pointer + // to the same matrix and no copying is needed + (Arc::new(mH), Arc::new(mF)) } - + =} + + reaction(startup) -> banks.set_f, banks.set_h {= print_benchmark_info("FilterBankBenchmark"); print_args!( - "numIterations", + "num_iterations", self.num_iterations, - "numSimulations", + "num_simulations", self.num_simulations, - "numColumns", + "num_columns", self.num_columns, - "numChannels", - self.num_channels, + "num_channels", + self.num_channels ); print_system_info(); =} - runner = new BenchmarkRunner(num_iterations=numIterations); - producer = new Producer(numSimulations=numSimulations, numChannels=numChannels, numColumns=numColumns); + runner = new BenchmarkRunner(num_iterations=num_iterations); + producer = new Producer(num_simulations=num_simulations, num_channels=num_channels, num_columns=num_columns); source = new Source(); - banks = new[numChannels] Bank(numColumns=numColumns, numChannels=numChannels); - combine = new Combine(numChannels=numChannels); + banks = new[num_channels] Bank(num_columns=num_columns, num_channels=num_channels, coefficients={= create_coeffs(num_channels, num_columns) =}); + combine = new Combine(num_channels=num_channels); sink = new Sink(printRate=100); - + runner.start -> producer.start; - + producer.next -> source.next; - producer.finished -> source.inFinished; - - (source.value)+ -> banks.inValue; - (source.outFinished)+ -> banks.inFinished; - - banks.outValue -> combine.inValues; - banks.outFinished -> combine.inFinished - - combine.outValue -> sink.inValue; - combine.outFinished -> sink.inFinished; - - sink.outFinished -> runner.finished; - + producer.finished -> source.in_finished; + + (source.value)+ -> banks.in_value; + (source.out_finished)+ -> banks.in_finished; + + banks.out_value -> combine.inValues; + banks.out_finished -> combine.in_finished + + combine.out_value -> sink.in_value; + combine.out_finished -> sink.in_finished; + + sink.out_finished -> runner.finished; + preamble {= use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; use crate::matrix::Matrix; diff --git a/Rust/Savina/src/parallelism/MatMul.lf b/Rust/Savina/src/parallelism/MatMul.lf index 0fd957c9..b64381de 100644 --- a/Rust/Savina/src/parallelism/MatMul.lf +++ b/Rust/Savina/src/parallelism/MatMul.lf @@ -44,8 +44,29 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { state num_workers(numWorkers); state data_length(dataLength); - state A: Arc>; - state B: Arc>; + preamble {= + #[derive(Clone)] + pub struct Data(pub Arc>, pub Arc>, pub Weak>>); + =} + + state A: Arc>({= { + let mut a = Matrix::::new(data_length, data_length); + for i in 0..data_length { + for j in 0..data_length { + a.set(i, j, i as f64); + } + } + Arc::new(a) + } =}); + state B: Arc>({= { + let mut a = TransposedMatrix::::new(data_length, data_length); + for i in 0..data_length { + for j in 0..data_length { + a.set(i, j, j as f64); + } + } + Arc::new(a) + } =}); state C: {= Vec>>> =} state workQueue: VecDeque; @@ -56,31 +77,10 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { input start: unit; output finished: unit; - output[numWorkers] data: {=(Arc>, Arc>, Weak>>)=}; + output[numWorkers] data: Data; output[numWorkers] doWork: WorkItem; input[numWorkers] moreWork: {=[WorkItem; 8]=}; - reaction(startup) {= - // Fill both input arrays with data - let (a, b) = { - let mut a = Matrix::::new(self.data_length, self.data_length); - let mut b = TransposedMatrix::::new(self.data_length, self.data_length); - - for i in 0..self.data_length { - for j in 0..self.data_length { - a.set(i, j, i as f64); - b.set(i, j, j as f64); - } - } - - (Arc::new(a), Arc::new(b)) - }; - - self.A = a; - self.B = b; - self.C = Vec::new(); - =} - reaction(start) -> data, next {= // reset the result matrix C for _ in 0..self.num_workers { @@ -89,7 +89,7 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { // send pointers to all 3 matrixes to the workers for (d, c) in data.into_iter().zip(&self.C) { - ctx.set(d, (Arc::clone(&self.A), Arc::clone(&self.B), Arc::downgrade(&c))); + ctx.set(d, Data(Arc::clone(&self.A), Arc::clone(&self.B), Arc::downgrade(&c))); } // produce the first work item, instructing the worker to multiply the complete matrix @@ -171,11 +171,9 @@ reactor Manager(numWorkers: usize(20), dataLength: usize(1024)) { reactor Worker(threshold: usize(16384)) { state threshold(threshold); - state A: Arc>; - state B: Arc>; - state C: Weak>>; + state data_ref: Option; - input data: {=(Arc>, Arc>, Weak>>)=}; + input data: super::manager::Data; input doWork: WorkItem; output moreWork: {=[WorkItem; 8]=}; @@ -186,11 +184,7 @@ reactor Worker(threshold: usize(16384)) { =} reaction (data) {= - ctx.use_ref_opt(data, |(a, b, c)| { - self.A = a.clone(); - self.B = b.clone(); - self.C = c.clone(); - }); + self.data_ref = ctx.use_ref_opt(data, Clone::clone); =} reaction(doWork) -> moreWork {= @@ -214,18 +208,19 @@ reactor Worker(threshold: usize(16384)) { work_queue[7] = WorkItem{srA: wi.srA + dim, scA: wi.scA + dim, srB: wi.srB + dim, scB: wi.scB + dim, srC: wi.srC + dim, scC: wi.scC + dim, numBlocks, dim}; ctx.set(moreWork, work_queue); - } else { + } else if let Some(super::manager::Data(a, b, c)) = &self.data_ref { + // otherwise we compute the result directly let end_r = wi.srC + wi.dim; let end_c = wi.scC + wi.dim; - let upgraded = self.C.upgrade().unwrap(); + let upgraded = c.upgrade().unwrap(); let mut c = upgraded.lock().unwrap(); for i in wi.srC..end_r { for j in wi.scC..end_c { for k in 0..wi.dim { - let mut v = self.A.get(i, wi.scA + k) * self.B.get(wi.srB + k, j); + let mut v = a.get(i, wi.scA + k) * b.get(wi.srB + k, j); v += c.get(i, j); c.set(i, j, v); } diff --git a/runner/conf/benchmark/savina_parallelism_apsp.yaml b/runner/conf/benchmark/savina_parallelism_apsp.yaml index bd21af92..e379d542 100644 --- a/runner/conf/benchmark/savina_parallelism_apsp.yaml +++ b/runner/conf/benchmark/savina_parallelism_apsp.yaml @@ -45,4 +45,4 @@ targets: gen_args: num_workers: ["-D", "numNodes="] block_size: ["-D", "blockSize="] - max_edge_weight: ["-D", "maxEdgeWeight="] \ No newline at end of file + max_edge_weight: ["-D", "maxEdgeWeight="] diff --git a/runner/conf/benchmark/savina_parallelism_filterbank.yaml b/runner/conf/benchmark/savina_parallelism_filterbank.yaml index 2a46ffc4..da976239 100644 --- a/runner/conf/benchmark/savina_parallelism_filterbank.yaml +++ b/runner/conf/benchmark/savina_parallelism_filterbank.yaml @@ -37,3 +37,13 @@ targets: columns: ["-D", "columns="] simulations: ["-D", "time_steps="] channels: ["-D", "channels="] + lf-rust: + copy_sources: + - "${bench_path}/Rust/Savina/src/lib" + - "${bench_path}/Rust/Savina/src/parallelism" + lf_file: "parallelism/FilterBank.lf" + binary: "filter_bank" + run_args: + columns: ["--main-num-columns", ""] + simulations: ["--main-num-simulations", ""] + channels: ["--main-num-channels", ""]