diff --git a/Rust/Savina/src/parallelism/Apsp.lf b/Rust/Savina/src/parallelism/Apsp.lf new file mode 100644 index 00000000..c71099ef --- /dev/null +++ b/Rust/Savina/src/parallelism/Apsp.lf @@ -0,0 +1,280 @@ +/** + * Copyright (C) 2020 TU Dresden + * + * This benchmark implements a parallel all pairs shortest path algorithm. In + * order to split the workload, the large input matrix of size graph_size x + * graph_size is split into smaller blocks of size block_size x block_size. Each of + * the worker reactors (ApspFloydWarshallBlock) processes one of these blocks. + * The worker reactors are organized in the same matrix pattern, replication the + * structure of the blocks within the large input matrix. Each of the workers + * operates on its local block data, and sends results to all other workers in + * the same column or in the same row. The data from the neighbors is then used + * to compute the next intermediate result and to update the local state + * accordingly. + * + * @author Christian Menard + * @author Hannes Klein + * @author Johannes Hayeß + */ + +target Rust { + build-type : Release, + cargo-features: [ "cli" ], + rust-include: [ "../lib/matrix.rs", "../lib/pseudo_random.rs"], +}; + +import BenchmarkRunner from "../lib/BenchmarkRunner.lf"; + +reactor ApspFloydWarshallBlock( + bank_index: usize(0), + row_index: usize(0), + graph_size: usize(300), + block_size: usize(50), + dimension: usize(6) +) { + + state bank_index(bank_index); + state row_index(row_index); + state graph_size(graph_size); + state block_size(block_size); + state dimension(dimension); + + state num_neighbors: usize({=2 * (dimension - 1)=}); + state row_offset: usize({=row_index * block_size=}); // row offset of the block of this reactor + state col_offset: usize({=bank_index * block_size=}); // column offset of the block of this reactor + + state k: usize(0); // iteration counter + state reportedFinish: bool(false); + + input start: Matrix; + + input[dimension] frow_row: Matrix; + input[dimension] frow_col: Matrix; + + output toNeighbors: Matrix; + output finished: unit; + + logical action notify_neighbors: Matrix; + + preamble {= + use crate::matrix::Matrix; + + fn get_element_at( + row: usize, + col: usize, + row_ports: &Multiport>, + col_ports: &Multiport>, + ctx: &ReactionCtx, + block_size: usize, + row_index: usize, + bank_index: usize, + ) -> u64 { + let dest_row = row / block_size; + let dest_col = col / block_size; + let local_row = row % block_size; + let local_col = col % block_size; + + if dest_row == row_index { + *ctx.get_ref(&row_ports[dest_col]) + .unwrap() + .get(local_row, local_col) + } else if dest_col == bank_index { + *ctx.get_ref(&col_ports[dest_row]) + .unwrap() + .get(local_row, local_col) + } else { + panic!("Error: unexpected target location ({},{})", dest_col, dest_row); + } + + } + =} + + // @label block_start + reaction(start) -> notify_neighbors {= + // reset local state + self.k = 0; + self.reportedFinish = false; + + // start execution + let matrix = ctx.get_ref(start).unwrap().clone(); + ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap); + =} + + reaction(notify_neighbors) -> toNeighbors {= + //notify all neighbors + ctx.set(toNeighbors, ctx.use_ref_opt(notify_neighbors, Clone::clone).unwrap()); + =} + + reaction(frow_row, frow_col) -> notify_neighbors, finished {= + // do nothing if complete + if self.k == self.graph_size { + return; + } + + // perform computation + let mut matrix: Matrix = Matrix::new(self.block_size, self.block_size); + let bs = self.block_size; + let ri = self.row_index; + let bi = self.bank_index; + + for i in 0..self.block_size { + for j in 0..self.block_size { + let gi = self.row_offset + i; + let gj = self.col_offset + j; + + let result = get_element_at(gi, self.k, frow_row, frow_col, &ctx, bs, ri, bi) + + get_element_at(self.k, gj, frow_row, frow_col, &ctx, bs, ri, bi); + matrix.set(i, j, result.min(get_element_at(gi, gj, frow_row, frow_col, &ctx, bs, ri, bi))); + } + } + + // increment iteration count + self.k += 1; + + if self.k == self.graph_size { + if self.bank_index == 0 && self.row_index == 0 { + debug!("{}", matrix); + } + ctx.set(finished, ()); + } + + // send the result to all neighbors in the next iteration + ctx.schedule_with_v(notify_neighbors, Some(matrix), Asap); + =} +} + +reactor ApspRow( + bank_index: usize(0), + block_size: usize(50), + num_nodes: usize(300), + dimension: usize(6), + dimension_sq: usize(36) +) { + preamble {= + use crate::matrix::Matrix; + =} + + input start: Matrix; + output[dimension] finished: unit; + + input[dimension_sq] frow_col: Matrix; + output[dimension] to_col: Matrix; + + blocks = new[dimension] ApspFloydWarshallBlock( + row_index=bank_index, + block_size=block_size, + graph_size=num_nodes, + dimension=dimension + ); + + // connect all blocks within the row + (blocks.toNeighbors)+ -> blocks.frow_row; + + // block output to all column neighbours + blocks.toNeighbors -> to_col; + // block input from all column neighbours + frow_col -> interleaved(blocks.frow_col); + + // broadcast the incoming matrix to all blocks + (start)+ -> blocks.start; + // collect and forward finished signals from all blocks + blocks.finished -> finished; +} + +reactor ApspMatrix( + block_size: usize(50), + num_nodes: usize(300), + dimension: usize(6), + dimension_sq: usize(36) +) { + preamble {= + use crate::matrix::Matrix; + =} + input start: Matrix; + output[dimension_sq] finished: unit; + + rows = new[dimension] ApspRow(block_size=block_size, num_nodes=num_nodes, dimension=dimension, dimension_sq=dimension_sq); + + // broadcast the incoming matrix to all rows + (start)+ -> rows.start; + // collect and forward finished signals from all blocks + rows.finished -> finished; + + (rows.to_col)+ -> rows.frow_col; +} + +main reactor ( + num_iterations: usize(12), + max_edge_weight: usize(100), + block_size: usize(50), + num_nodes: usize(300) +) { + state num_iterations(num_iterations); + state max_edge_weight(max_edge_weight); + state block_size(block_size); + state num_nodes(num_nodes); + + state num_blocks_finished: usize(0); + + runner = new BenchmarkRunner(num_iterations=num_iterations); + matrix = new ApspMatrix( + block_size=block_size, + num_nodes=num_nodes, + dimension={=num_nodes / block_size=}, + dimension_sq={=(num_nodes / block_size)*(num_nodes / block_size)=} + ); + + reaction(startup) {= + print_benchmark_info("ApspBenchmark"); + print_args!( + "num_iterations", + self.num_iterations, + "max_edge_weight", + self.max_edge_weight, + "num_nodes", + self.num_nodes, + "block_size", + self.block_size + ); + print_system_info(); + =} + + // @label dostart + reaction(runner.start) -> matrix.start {= + // reset local state + self.num_blocks_finished = 0; + let graph_data = generate_graph(self.num_nodes, self.max_edge_weight); + // start execution + ctx.set(matrix__start, graph_data); + =} + + reaction (matrix.finished) -> runner.finished {= + self.num_blocks_finished += matrix__finished.iterate_set().count(); + let dimension = self.num_nodes / self.block_size; + if self.num_blocks_finished == dimension * dimension { + ctx.set(runner__finished, ()); + } + =} + + preamble {= + use crate::matrix::Matrix; + use crate::{print_args,reactors::benchmark_runner::{print_system_info, print_benchmark_info}}; + use crate::pseudo_random::PseudoRandomGenerator; + use std::os::raw::c_long; + + fn generate_graph(n: usize, w: usize) -> Matrix { + let mut random = PseudoRandomGenerator::from(n as c_long); + let mut local_data: Matrix = Matrix::new(n, n); + + for i in 0..n { + for j in (i+1)..n { + let r = u64::from(random.next_in_range(0..w as c_long)) + 1; + local_data.set(i, j, r); + local_data.set(j, i, r); + } + } + + local_data + } + =} +} diff --git a/runner/conf/benchmark/savina_parallelism_apsp.yaml b/runner/conf/benchmark/savina_parallelism_apsp.yaml index e379d542..6b3a383d 100644 --- a/runner/conf/benchmark/savina_parallelism_apsp.yaml +++ b/runner/conf/benchmark/savina_parallelism_apsp.yaml @@ -46,3 +46,13 @@ targets: num_workers: ["-D", "numNodes="] block_size: ["-D", "blockSize="] max_edge_weight: ["-D", "maxEdgeWeight="] + lf-rust: + copy_sources: + - "${bench_path}/Rust/Savina/src/lib" + - "${bench_path}/Rust/Savina/src/parallelism" + lf_file: "parallelism/Apsp.lf" + binary: "apsp" + run_args: + block_size: ["--main-block-size", ""] + max_edge_weight: ["--main-max-edge-weight", ""] + num_workers: ["--main-num-nodes", ""]