Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ The simulator strives to abstract from most of the details in network programmin
The distributed algorithm simulation is controlled by an instance of class `Simulator`:

```Java
Simulator s = new Simulator(numberOfNodes);
Simulator s = new Simulator(numberOfNodes, TimestampType.NONE, 0);
for (int id = 0; id<numberOfNodes; id++) {
Node n = new ApplicationNode(s, id);
s.attachNode(n);
}
s.runSimulation(duration);
```

By instantiating a Simulator object, a network of `numberOfNodes` nodes is created. For each node `id` between `[0, numberOfNodes)` the code for the given `id` must be attached to the simulator. This `ApplicationNode` is derived from the abstract class `Node` which provides all the required functionality for implementing the algorithm simulation (see below). Finally, the simulation can be executed for `duration` seconds.
By instantiating a Simulator object, a network of `numberOfNodes` nodes is created. For each node `id` between `[0, numberOfNodes)` the code for the given `id` must be attached to the simulator. This `ApplicationNode` is derived from the abstract class `Node` which provides all the required functionality for implementing the algorithm simulation (see below). Finally, the simulation can be executed for `duration` milliseconds.

Extending class `Node` enables the implementation of the intended distributed algorithm by implementing the method `run`:

Expand Down
28 changes: 18 additions & 10 deletions src/main/java/dev/oxoo2a/sim4da/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@

public abstract class Node implements Runnable {

protected final int id;
private final Random random = new Random();
private final MessageQueue messageQueue = new MessageQueue();

protected final int id;
private final Simulator simulator;
private final Thread thread = new Thread(this);
private final Random random = new Random();
private final Thread thread;

private LogicalTimestamp localTimestamp;

public Node(Simulator simulator, int id) {
this.simulator = simulator;
this.id = id;
thread = new Thread(this, "Node-"+id);
localTimestamp = simulator.getInitialTimestamp(id);
}

public void start() {
void start() {
thread.start();
}

public void stop() {
void stop() {
thread.interrupt(); // Stop waiting in receive
try {
thread.join();
Expand All @@ -40,7 +42,7 @@ protected boolean isStillSimulating() {
return simulator.isStillSimulating();
}

public LogicalTimestamp getLocalTimestamp() {
protected LogicalTimestamp getLocalTimestamp() {
return localTimestamp;
}

Expand Down Expand Up @@ -75,8 +77,10 @@ protected Message receive() {
incrementLocalTimestamp(); // receive-event
if (localTimestamp!=null && m.getTimestamp()!=null)
localTimestamp = localTimestamp.getAdjusted(m.getTimestamp()); // forward local clock if necessary
String messageTypeString = m.getType()==MessageType.BROADCAST ? "Broadcast" : "Unicast";
simulator.emitToTracer("Receive %s:%d<-%d", messageTypeString, m.getReceiverId(), m.getSenderId());
if (simulator.isTraceMessages()) {
String messageTypeString = m.getType()==MessageType.BROADCAST ? "Broadcast" : "Unicast";
simulator.emitToTracer("Receive %s:%d<-%d", messageTypeString, m.getReceiverId(), m.getSenderId());
}
}
return m;
}
Expand Down Expand Up @@ -104,7 +108,11 @@ private void put(Message message) {
}
private Message await() {
try {
return queue.take().message;
Message m = queue.take().message;
int maxLatency = simulator.getMaxMessageLatency();
if (maxLatency>0)
Thread.sleep(random.nextInt(maxLatency)); // simulate latencies by sleeping for a random duration
return m;
} catch (InterruptedException ignored) {}
return null; // Simulation time ended before a message was received
}
Expand All @@ -113,7 +121,7 @@ private class MessageWithPriority implements Comparable<MessageWithPriority> {
private final int priority;
private MessageWithPriority(Message message) {
this.message = message;
this.priority = random.nextInt(100); //limit range to avoid int overflows when comparing
this.priority = random.nextInt(100); // limit range to avoid int overflows when comparing
}
@Override
public int compareTo(MessageWithPriority other) {
Expand Down
44 changes: 26 additions & 18 deletions src/main/java/dev/oxoo2a/sim4da/Simulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ public class Simulator {
private final Node[] nodes;
private final Tracer tracer;
private final TimestampType timestampType;
private final int maxMessageLatency;
private final boolean traceMessages;

// This is only changed to false once in the main thread, but still needs to be volatile
// to ensure that all Node threads can actually see that change.
private volatile boolean stillSimulating = true;

public Simulator(int numberOfNodes, TimestampType timestampType, String name,
boolean orderedTracing, boolean useLog4j2, PrintStream alternativeTracingDestination) {
public Simulator(int numberOfNodes, TimestampType timestampType, int maxMessageLatency) { // without tracing
this(numberOfNodes, timestampType, maxMessageLatency, null, false, null, false);
}

public Simulator(int numberOfNodes, TimestampType timestampType, int maxMessageLatency, String tracerName,
boolean useLog4j2, PrintStream alternativeTracingDestination, boolean traceMessages) {
nodes = new Node[numberOfNodes];
if (useLog4j2 || alternativeTracingDestination!=null)
tracer = new Tracer(name, orderedTracing, useLog4j2, alternativeTracingDestination);
tracer = new Tracer(tracerName, false, useLog4j2, alternativeTracingDestination);
else tracer = null; //no tracing
this.timestampType = timestampType;
}

public static Simulator createDefaultSimulator(int numberOfNodes) {
return new Simulator(numberOfNodes, TimestampType.EXTENDED_LAMPORT, "sim4da", true, true, System.out);
}

public static Simulator createSimulatorUsingLog4j2(int numberOfNodes) {
return new Simulator(numberOfNodes, TimestampType.EXTENDED_LAMPORT, "sim4da", true, true, null);
this.maxMessageLatency = maxMessageLatency;
this.traceMessages = traceMessages;
}

public void attachNode(Node node) throws IllegalArgumentException {
Expand All @@ -47,12 +47,12 @@ public void runSimulation(int duration) throws InstantiationException {
for (Node node : nodes) {
if (node==null) throw new InstantiationException();
}
emitToTracer("Simulator::runSimulation with %d nodes for %d seconds", nodes.length, duration);
emitToTracer("Simulator::runSimulation with %d nodes for %d ms", nodes.length, duration);
for (Node node : nodes) {
node.start();
}
try {
Thread.sleep(duration*1000L); // Wait for the required duration
Thread.sleep(duration); // Wait for the required duration
} catch (InterruptedException ignored) {}
stillSimulating=false;
for (Node node : nodes) { // Tell all nodes to stop and wait for the threads to terminate
Expand All @@ -65,30 +65,38 @@ public int getNumberOfNodes() {
return nodes.length;
}

public int getMaxMessageLatency() {
return maxMessageLatency;
}

public boolean isTraceMessages() {
return traceMessages;
}

public boolean isStillSimulating() {
return stillSimulating;
}

public void sendUnicast(int senderId, int receiverId, LogicalTimestamp timestamp, String payload) {
if (receiverId<0 || receiverId>=nodes.length) {
System.err.printf("Simulator::sendUnicast: unknown receiverId %d\n", receiverId);
emitToTracer("Simulator::sendUnicast: unknown receiverId %d", receiverId);
return;
}
if (senderId<0 || senderId>=nodes.length) {
System.err.printf("Simulator::sendUnicast: unknown senderId %d\n", senderId);
emitToTracer("Simulator::sendUnicast: unknown senderId %d", senderId);
return;
}
emitToTracer("Unicast:%d->%d", senderId, receiverId);
if (traceMessages) emitToTracer("Unicast:%d->%d", senderId, receiverId);
Message raw = new Message(senderId, receiverId, MessageType.UNICAST, timestamp, payload);
nodes[receiverId].putInMessageQueue(raw);
}

public void sendBroadcast(int senderId, LogicalTimestamp timestamp, String payload) {
if (senderId<0 || senderId>=nodes.length) {
System.err.printf("Simulator::sendBroadcast: unknown senderId %d\n", senderId);
emitToTracer("Simulator::sendBroadcast: unknown senderId %d", senderId);
return;
}
emitToTracer("Broadcast:%d->0..%d", senderId, nodes.length-1);
if (traceMessages) emitToTracer("Broadcast:%d->0..%d", senderId, nodes.length-1);
Message raw = new Message(senderId, -1, MessageType.BROADCAST, timestamp, payload);
for (int i = 0; i<nodes.length; i++) {
if (i==senderId) continue; //don't send broadcast back to sender
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/dev/oxoo2a/sim4da/Tracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

public class Tracer {

@SuppressWarnings({"unused", "FieldCanBeLocal"})
private final String name;
@SuppressWarnings({"unused", "FieldCanBeLocal"})
private final boolean orderedTracing; //currently unused
private final boolean useLog4j2;
Expand All @@ -17,18 +15,21 @@ public class Tracer {

public Tracer(String name, boolean orderedTracing, boolean useLog4j2,
PrintStream alternativeTracingDestination) {
this.name = name;
this.orderedTracing = orderedTracing;
this.useLog4j2 = useLog4j2;
this.alternativeTracingDestination = alternativeTracingDestination;
log4j2Logger = LogManager.getFormatterLogger(name);
if (useLog4j2) log4j2Logger = LogManager.getFormatterLogger(name);
else log4j2Logger = null; //must initialize final fields
}

public void emit(String format, Object... args) {
if (useLog4j2) log4j2Logger.trace(format, args);
if (alternativeTracingDestination!=null) {
alternativeTracingDestination.printf(format, args);
alternativeTracingDestination.println();
synchronized (alternativeTracingDestination) { //to ensure that the line termination always comes
//immediately after the printed string and that no other thread can write data in between
alternativeTracingDestination.printf(format, args);
alternativeTracingDestination.println();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
public class BroadcastMessageCopiesTest {

private static final int NUMBER_OF_NODES = 5;
private static final int DURATION = 2;
private static final int DURATION = 2000;

@Test
public void areMessagesCopied () {
Simulator s = new Simulator(NUMBER_OF_NODES, TimestampType.EXTENDED_LAMPORT, "amc", true, true, System.out);
Simulator s = new Simulator(NUMBER_OF_NODES, TimestampType.EXTENDED_LAMPORT, 0, "amc", true, System.out, true);
for (int id = 0; id<NUMBER_OF_NODES; id++) {
Node n = new TestNode(s, id);
s.attachNode(n);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/dev/oxoo2a/sim4da/LeLannTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
public class LeLannTest {

private static final int NUMBER_OF_NODES = 3;
private static final int DURATION = 2;
private static final int DURATION = 2000;

@Test
public void runLeLannSimulation() {
Simulator s = new Simulator(NUMBER_OF_NODES, TimestampType.VECTOR, "LeLann", true, true, System.out);
Simulator s = new Simulator(NUMBER_OF_NODES, TimestampType.VECTOR, 100, "LeLann", true, System.out, true);
for (int id = 0; id<NUMBER_OF_NODES; id++) {
Node n = new LeLannNode(s, id);
s.attachNode(n);
Expand Down
7 changes: 4 additions & 3 deletions src/test/java/dev/oxoo2a/sim4da/SimulatorTest.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package dev.oxoo2a.sim4da;

import dev.oxoo2a.sim4da.Simulator.TimestampType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SimulatorTest {

private static final int NUMBER_OF_NODES = 3;
private static final int DURATION = 2;
private static final int DURATION = 2000;

@Test
public void simpleSimulation() {
Assertions.assertEquals(NUMBER_OF_NODES, 3);
Simulator s = Simulator.createDefaultSimulator(NUMBER_OF_NODES);
Simulator s = new Simulator(NUMBER_OF_NODES, TimestampType.NONE, 0);
for (int id = 0; id<NUMBER_OF_NODES; id++) {
Node n = new BroadcastNode(s, id);
s.attachNode(n);
Expand All @@ -25,7 +26,7 @@ public void simpleSimulation() {

@Test
public void someNodesNotInstantiated () {
Simulator s = Simulator.createDefaultSimulator(NUMBER_OF_NODES);
Simulator s = new Simulator(NUMBER_OF_NODES, TimestampType.NONE, 0);
s.attachNode(new BroadcastNode(s, 0));
s.attachNode(new BroadcastNode(s, 1));
Assertions.assertThrows(InstantiationException.class,() -> s.runSimulation(DURATION));
Expand Down