diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index aa991fc..fae0804 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/pom.xml-unused b/pom.xml-unused deleted file mode 100644 index 233ad36..0000000 --- a/pom.xml-unused +++ /dev/null @@ -1,106 +0,0 @@ - - - 4.0.0 - - dev.oxoo2a - sim4da - 1.0-SNAPSHOT - sim4da - jar - - - 17 - 17 - - 2.9.0 - 5.8.2 - - UTF-8 - - - - - com.google.code.gson - gson - ${gson.version} - - - org.junit.jupiter - junit-jupiter-api - ${junit.version} - compile - - - org.junit.jupiter - junit-jupiter-engine - ${junit.version} - test - - - - - . - - - org.apache.maven.plugins - maven-compiler-plugin - 3.10.1 - - - - org.apache.maven.plugins - maven-surefire-plugin - 3.0.0-M6 - - - - org.apache.maven.plugins - maven-jar-plugin - 3.2.2 - - - - dev.oxoo2a - true - - - Peter Sturm - - - - - - - org.apache.maven.plugins - maven-assembly-plugin - 3.3.0 - - - jar-with-dependencies - - - - dev.oxoo2a - true - - - Peter Sturm - - - - - - assemble-all - package - - single - - - - - - - - \ No newline at end of file diff --git a/src/main/java/dev/oxoo2a/sim4da/Clock.java b/src/main/java/dev/oxoo2a/sim4da/Clock.java new file mode 100644 index 0000000..6d0276c --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/Clock.java @@ -0,0 +1,13 @@ +package dev.oxoo2a.sim4da; + +public interface Clock { + int getTimeStamp(); + void increment(); + public String printTimeStamp(); + public void update(int senderTime, Object ... args); + String getTimeVector(); +} + + + + diff --git a/src/main/java/dev/oxoo2a/sim4da/ControlMessage.java b/src/main/java/dev/oxoo2a/sim4da/ControlMessage.java new file mode 100644 index 0000000..4c392f6 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/ControlMessage.java @@ -0,0 +1,47 @@ +package dev.oxoo2a.sim4da; + +import java.util.ArrayList; +import java.util.List; + +public class ControlMessage { + + private int round; + protected int received; + protected int sent; + protected boolean isActive; + protected int id; + + protected ControlMessageType type; + + public ControlMessage(int round, ControlMessageType type, int id) + { + this.round = round; + this.type = type; + this.id = id; + } + + public ControlMessage(ControlMessageType type, int id, boolean status, int r, int s, int round) + { + this.round = round; + this.type = type; + this.id = id; + this.isActive = status; + this.received = r; + this.sent = s; + } + + public boolean isActive() { + return isActive; + } + + public int getId() { + return id; + } + + public int getRound() { return round;} + + public int getReceived() { return received;} + + public int getSent() { return sent;} + +} diff --git a/src/main/java/dev/oxoo2a/sim4da/ControlMessageQueue.java b/src/main/java/dev/oxoo2a/sim4da/ControlMessageQueue.java new file mode 100644 index 0000000..aae1b3c --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/ControlMessageQueue.java @@ -0,0 +1,51 @@ +package dev.oxoo2a.sim4da; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Semaphore; + + +public class ControlMessageQueue +{ + + public ControlMessageQueue() + { + queue = new ArrayList(); + + } + public synchronized void put ( ControlMessage r ) + { + synchronized (queue) { + queue.add(r); + } + } + public ControlMessage get(int id) { + while(true) + { + try { + synchronized (queue) { + ControlMessage result = null; + for (ControlMessage cm : queue) { + if (cm.getId() == id) { + result = cm; + break; + } + } + if (result != null) { + queue.remove(result); + return result; + } else { + return null; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + } + + protected static ArrayList queue; +} diff --git a/src/main/java/dev/oxoo2a/sim4da/ControlMessageType.java b/src/main/java/dev/oxoo2a/sim4da/ControlMessageType.java new file mode 100644 index 0000000..809e56d --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/ControlMessageType.java @@ -0,0 +1,3 @@ +package dev.oxoo2a.sim4da; + +public enum ControlMessageType {REQUEST, RESPONSE} diff --git a/src/main/java/dev/oxoo2a/sim4da/DoubleCountTerminator.java b/src/main/java/dev/oxoo2a/sim4da/DoubleCountTerminator.java new file mode 100644 index 0000000..c0c2b03 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/DoubleCountTerminator.java @@ -0,0 +1,141 @@ +package dev.oxoo2a.sim4da; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Semaphore; + +import static dev.oxoo2a.sim4da.ControlMessageType.REQUEST; +import static java.lang.Thread.sleep; + +public class DoubleCountTerminator{ + + public static int broadcast_run = 0; + private final int n_Nodes; + public Thread thread; + private int numberSent; + static int counter; + static int numberReceived; + private final Semaphore await_message; + protected ArrayList received_array; + private Node2Simulator simulator; + static boolean stop; + + public DoubleCountTerminator(int numberOfNodes) + { + n_Nodes = numberOfNodes; + numberSent = 0; + received_array = new ArrayList(); + await_message = new Semaphore(0); + thread = new Thread(this::main); + thread.setName("Observer"); + + } + public void setSimulator(Node2Simulator s ) { + this.simulator = s; + } + + public void broadcastControlMessage() { + broadcast_run++; + for(int i=0;i arr) { + int round1_received = 0; + int round2_received = 0; + int round1_sent = 0; + int round2_sent = 0; + boolean[] status = new boolean[arr.size()]; + + for(int i=0; i(); + } protected Message ( HashMap content ) { this.content = content; + } public Message add ( String key, String value ) { content.put(key,value); @@ -48,6 +52,9 @@ private static synchronized String serialize ( Map content ) { return serializer.toJson(content); // Not sure about thread safety of Gson } + + private final HashMap content; private static final Gson serializer = new Gson(); + } diff --git a/src/main/java/dev/oxoo2a/sim4da/Network.java b/src/main/java/dev/oxoo2a/sim4da/Network.java index dd21c31..cb035ca 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Network.java +++ b/src/main/java/dev/oxoo2a/sim4da/Network.java @@ -1,14 +1,18 @@ package dev.oxoo2a.sim4da; +import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.Semaphore; public class Network { - public Network ( int n_nodes, Tracer tracer ) { + public Network (int n_nodes, Tracer tracer ) { this.n_nodes = n_nodes; this.tracer = tracer; + cmq_dc = new ControlMessageQueue(); + controlVector = initializeControlVector(n_nodes); mqueues = new MessageQueue[n_nodes]; for (int i=0; i= n_nodes)) { System.err.printf("Network::unicast: unknown receiver id %d\n",receiver_id); @@ -94,7 +102,7 @@ public void unicast ( int sender_id, int receiver_id, String message ) { System.err.printf("Network::unicast: unknown sender id %d\n",sender_id); return; } - tracer.emit("Unicast:%d->%d",sender_id,receiver_id); + tracer.emit("Unicast:%d->%d","main", sender_id,receiver_id); Message raw = new Message(sender_id,receiver_id,MessageType.UNICAST,message); mqueues[receiver_id].put(raw); } @@ -104,7 +112,7 @@ public void broadcast ( int sender_id, String message ) { System.err.printf("Network::unicast: unknown sender id %d\n",sender_id); return; } - tracer.emit("Broadcast:%d->0..%d",sender_id,n_nodes-1); + tracer.emit("Broadcast:%d->0..%d","main",sender_id,n_nodes-1); Message raw = new Message(sender_id,-1,MessageType.BROADCAST,message); for ( int l=0; l initializeControlVector(int n_nodes) { + Random r = new Random(); + int r_node = r.nextInt(numberOfNodes()); + int[] vector = new int[n_nodes]; + Arrays.fill(vector, 0); + HashMap n = new HashMap(); + n.put(r_node, vector); + return n; + } + private final int n_nodes; private final Tracer tracer; private final MessageQueue[] mqueues; - + protected ControlMessageQueue cmq_dc; private static final Random rgen = new Random(); - //private static Logger logger = Logger.getRootLogger(); + protected HashMap controlVector; + } diff --git a/src/main/java/dev/oxoo2a/sim4da/Node.java b/src/main/java/dev/oxoo2a/sim4da/Node.java index af1d1f7..8411f41 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Node.java +++ b/src/main/java/dev/oxoo2a/sim4da/Node.java @@ -1,16 +1,41 @@ package dev.oxoo2a.sim4da; +import java.lang.IllegalArgumentException; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Random; public abstract class Node implements Simulator2Node { - public Node ( int my_id ) { + public Node (int my_id ) { this.myId = my_id; t_main = new Thread(this::main); + t_main.setName("Node" + Integer.toString(my_id)); } @Override public void setSimulator(Node2Simulator s ) { this.simulator = s; } + @Override + public void createClockByClass(String type, int n_nodes, int index) throws IllegalArgumentException { + + switch (type) { + case "lamport": + this.clock = new Lamport(); + break; + case "vector": + this.clock = new Vector(n_nodes, index); + break; + default: + throw new IllegalArgumentException("The class name must be either vector or lamport!"); + } + + + } + synchronized void sendControlMessage(ControlMessage controlMessage) { + simulator.passControlMessage(controlMessage); + } @Override public void start () { @@ -49,8 +74,12 @@ protected Network.Message receive () { return simulator.receive(myId); } - protected void emit ( String format, Object ... args ) { - simulator.emit(format,args); + protected ControlMessage receiveControlMessage(int id) { + return simulator.receiveControlMessage(id); + }; + + protected void emit (String format, String logType, Object ... args ) { + simulator.emit(format,logType,args); } // Module implements basic node functionality protected abstract void main (); @@ -62,8 +91,19 @@ public void stop () { } catch (InterruptedException ignored) {}; } + protected void sendControlVectorToNetwork(int randomRecipient, int[] controlVector) + { + simulator.sendControlVectorToNetwork(randomRecipient, controlVector); + } + + protected synchronized HashMap receiveControlVector(int Id) + { + return simulator.returnControlVector(Id); + } protected final int myId; - private Node2Simulator simulator; + protected int[] vector; + protected Node2Simulator simulator; private final Thread t_main; + protected Clock clock; } diff --git a/src/main/java/dev/oxoo2a/sim4da/Node2Simulator.java b/src/main/java/dev/oxoo2a/sim4da/Node2Simulator.java index 3e8b1fb..a19fe0e 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Node2Simulator.java +++ b/src/main/java/dev/oxoo2a/sim4da/Node2Simulator.java @@ -1,5 +1,7 @@ package dev.oxoo2a.sim4da; +import java.util.HashMap; + public interface Node2Simulator { int numberOfNodes(); boolean stillSimulating (); @@ -7,6 +9,15 @@ public interface Node2Simulator { void sendUnicast ( int sender_id, int receiver_id, Message m ); void sendBroadcast ( int sender_id, String m ); void sendBroadcast ( int sender_id, Message m ); + void passControlMessage(ControlMessage cm); + ControlMessage receiveControlMessage (int id); Network.Message receive ( int my_id ); - void emit ( String format, Object ... args ); + void emit ( String format, String logType, Object ... args ); + void sendControlMessage(ControlMessage controlMessage); + void updateStatus(); + boolean checkIfFinilised(); + + HashMap returnControlVector(int id); + + void sendControlVectorToNetwork(int randomRecipient, int[] controlVector); } diff --git a/src/main/java/dev/oxoo2a/sim4da/SimulationNode.java b/src/main/java/dev/oxoo2a/sim4da/SimulationNode.java new file mode 100644 index 0000000..efa7fde --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/SimulationNode.java @@ -0,0 +1,151 @@ +package dev.oxoo2a.sim4da; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Random; + +import static java.lang.Math.max; + +public class SimulationNode extends Node{ + + public SimulationNode(int my_id, int numberOfNodes) { + super(my_id); + probability = 0.7; + isActive = true; + controlVector = null; + vector = new int[numberOfNodes]; + Arrays.fill(vector, 0); + int messages_received = 0; + int messages_sent = 0; + } + @Override + protected void main() { + long time = System.currentTimeMillis(); + int loops = 0; + Random r = new Random(); + Random p = new Random(); + + while (stillSimulating()) + { + sendAMessageToARandomNode(r, p, time); + loops++; + emit("Node %d, Loop %d","main",myId,loops); + Network.Message m_raw = receive(); + HashMap controlVect = receiveControlVector(myId); + if(controlVect!=null && controlVect.size()!=0) + { + controlVector = controlVect.get(myId); + if(!isActive) + { + sendControlVectorToNetwork(r); + } + } + + if (m_raw == null ) + { + + } + else + { + decrementVector(); + messages_received++; + Message m_json = Message.fromJson(m_raw.payload); + int c = Integer.parseInt(m_json.query("Candidate")); + clock.update(Integer.parseInt(m_json.query("Time")), Integer.parseInt(m_json.query("Sender")), m_json.query("Vector")); + emit("Node %d -> Receiver %d, ClockTime on %d %s","clock", c, myId, myId, this.clock.printTimeStamp()); + isActive = true; + sendAMessageToARandomNode(r, p, time); + } + ControlMessage received_control_message = receiveControlMessage(myId); + if (received_control_message!=null) + { + ControlMessage controlMessage = new ControlMessage(ControlMessageType.RESPONSE, myId, isActive, messages_received, messages_sent, received_control_message.getRound()); + sendControlMessage(controlMessage); + emit("Status of %d %s","clock", myId, isActive); + } + } + } + private void sendAMessageToARandomNode(Random r, Random p, long time) { + + if (p.nextDouble() < this.probability && isActive) { + int randomRecipient = getRandomRecipient(r); + incrementVector(randomRecipient); + clock.increment(); + Message init_message = new Message() + .add("Sender", myId) + .add("Candidate", r.nextInt(numberOfNodes())) + .add("Time", clock.getTimeStamp()); + init_message = this.clock.getTimeVector() != null ? init_message.add("Vector", clock.getTimeVector()) : init_message; + sendUnicast(randomRecipient, init_message); + emit("Node %d, ClockTime %s", "clock", myId, this.clock.printTimeStamp()); + probability = max(0, probability - probability * 0.1*(System.currentTimeMillis()-time)/1000); + messages_sent++; + isActive = false; + if(controlVector!= null) + { + sendControlVectorToNetwork(r); + } + } + } + + private void sendControlVectorToNetwork(Random r) { + int randomRecipient; + randomRecipient = getRandomRecipient(r); + readVector(); + emit("Node %d, ControlVector %s", "clock", myId, Arrays.toString(controlVector)); + sendControlVectorToNetwork(randomRecipient, controlVector); + controlVector = null; + } + + private int getRandomRecipientOfControlVector(Random p) + { + try { + ArrayList nonZeroIndices = new ArrayList<>(); + for (int i = 0; i < vector.length; i++) { + if (vector[i] != 0) { + nonZeroIndices.add(i); + } + } + Random random = new Random(); + return nonZeroIndices.get(random.nextInt(nonZeroIndices.size())); + + } + catch (Exception e) + { + return getRandomRecipient(p); + } + } + + + private int getRandomRecipient(Random r) { + int randomRecipient; + do { + randomRecipient = r.nextInt(numberOfNodes()); + } while (randomRecipient == myId); + clock.increment(); + return randomRecipient; + } + private void decrementVector() + { + vector[myId] = vector[myId] - 1; + } + private void readVector() + { + for (int i = 0; i(n_nodes); for (int n_id = 0; n_id < n_nodes; ++n_id) @@ -32,29 +42,46 @@ public void attachNode (int id, Simulator2Node node ) { nodes.replace(id,node); } - public void runSimulation ( int duration ) throws InstantiationException { + public void runSimulation (int duration, String type) throws InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, InterruptedException { // Check that all nodes are attached - for ( Simulator2Node n : nodes.values() ) { + /*for (Simulator2Node n : nodes.values()) { + if (n == null) throw new InstantiationException(); + n.setSimulator(this); + n.createClockByClass(type, nodes.size(), n.ge); + }*/ + for (Map.Entry elem: nodes.entrySet()) + { + Simulator2Node n = elem.getValue(); if (n == null) throw new InstantiationException(); n.setSimulator(this); + n.createClockByClass(type, nodes.size(), elem.getKey()); } - tracer.emit("Simulator::runSimulation with %d nodes for %d seconds",n_nodes,duration); + tracer.emit("main","Simulator::runSimulation with %d nodes for %d seconds",n_nodes,duration); is_simulating = true; nodes.values().forEach(Simulator2Node::start); + doubleCountTerminator.start(); // Wait for the required duration try { Thread.sleep(duration * 1000L); } catch (InterruptedException ignored) {} is_simulating = false; - + doubleCountTerminator.stop(); + doubleCountTerminator.end(); + controlVectorTerminator.stop(); + controlVectorTerminator.end(); // Stop network - release nodes waiting in receive ... network.stop(); - // Tell all nodes to stop and wait for the threads to terminate nodes.values().forEach(Simulator2Node::stop); - tracer.emit("Simulator::runSimulation finished"); + + tracer.emit("main","Simulator::runSimulation finished"); + } + + public void updateStatus() + { + is_simulating = false; } @Override @@ -82,19 +109,74 @@ public void sendBroadcast ( int sender_id, Message m ) { network.broadcast(sender_id,m.toJson()); } + @Override + public synchronized void passControlMessage(ControlMessage controlMessage) { + emit("sent a control message to the observer from %d", "doublecount", controlMessage.getId()); + doubleCountTerminator.update(controlMessage); + } + + @Override + public ControlMessage receiveControlMessage(int id) { + return network.getControlMessage(id); + } + @Override public Network.Message receive ( int receiver_id ) { return network.receive(receiver_id); } + @Override + public void sendControlMessage(ControlMessage controlMessage) { + emit("sent a control message to the node %d", "clock", controlMessage.getId()); + network.addToControlQueue(controlMessage); + } + @Override + public boolean checkIfFinilised() { + + synchronized (network.controlVector){ + if(network.controlVector == null) + { + return false; + } + else{ + for (int[] value : network.controlVector.values()) + { + return (Arrays.stream(value).allMatch(element -> element == 0)); + } + } + return false; + } + + } + @Override + public void sendControlVectorToNetwork(int randomRecipient, int[] controlVector) + { + network.controlVector.put(randomRecipient, controlVector); + } @Override - public void emit ( String format, Object ... args ) { - tracer.emit(format,args); + public HashMap returnControlVector(int Id) { + // if(network.controlVector.isEmpty()) return null; + + if(network.controlVector.containsKey(Id)) + { + HashMap controlVector = new HashMap<>(network.controlVector); + network.controlVector.clear(); + return controlVector; + } + + return null; } + @Override + public void emit (String format, String logType, Object ... args) { + tracer.emit(format,logType, args); + } + private DoubleCountTerminator doubleCountTerminator; private final int n_nodes; private final Tracer tracer; private final Network network; private final HashMap nodes; private boolean is_simulating = false; + + private VectorControlTerminator controlVectorTerminator; } diff --git a/src/main/java/dev/oxoo2a/sim4da/Simulator2Node.java b/src/main/java/dev/oxoo2a/sim4da/Simulator2Node.java index 9faff36..28f2da6 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Simulator2Node.java +++ b/src/main/java/dev/oxoo2a/sim4da/Simulator2Node.java @@ -1,8 +1,12 @@ package dev.oxoo2a.sim4da; +import java.lang.reflect.InvocationTargetException; + public interface Simulator2Node { void setSimulator(Node2Simulator s ); + void createClockByClass(String type, int n_nodes, int index) throws IllegalArgumentException; + void start (); void stop (); } diff --git a/src/main/java/dev/oxoo2a/sim4da/Tracer.java b/src/main/java/dev/oxoo2a/sim4da/Tracer.java index bdcd035..54aade2 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Tracer.java +++ b/src/main/java/dev/oxoo2a/sim4da/Tracer.java @@ -7,7 +7,7 @@ public class Tracer { - public Tracer ( String name, boolean ordered, boolean enableTracing, boolean useLog4j2, PrintStream alternativeDestination ) { + public Tracer (String name, boolean ordered, boolean enableTracing, boolean useLog4j2, PrintStream alternativeDestination) { this.name = name; this.ordered = ordered; this.silent = !enableTracing; @@ -15,12 +15,20 @@ public Tracer ( String name, boolean ordered, boolean enableTracing, boolean use this.alternativeDestination = alternativeDestination; log4j2Logger = LogManager.getFormatterLogger(name); + log4j2Clock = LogManager.getFormatterLogger("clock"); } - public void emit ( String format, Object ... args ) { + public void emit (String format, String logType, Object ... args) { if (silent) return; if (useLog4j2) { - log4j2Logger.trace(format,args); + if(logType == "main") + { + log4j2Logger.trace(format,args); + } + else { + log4j2Clock.trace(format, args); + } + } if (alternativeDestination != null) { alternativeDestination.printf(format,args); @@ -34,4 +42,5 @@ public void emit ( String format, Object ... args ) { private final boolean useLog4j2; private final PrintStream alternativeDestination; private final Logger log4j2Logger; + private final Logger log4j2Clock; } diff --git a/src/main/java/dev/oxoo2a/sim4da/Vector.java b/src/main/java/dev/oxoo2a/sim4da/Vector.java new file mode 100644 index 0000000..6d89134 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/Vector.java @@ -0,0 +1,92 @@ +package dev.oxoo2a.sim4da; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; + +public class Vector implements Clock{ + + public String getTimeVector() { + return serializeVector(timeVector); + } + + public Vector(int nodes, int index) + { + HashMap timeVector = new HashMap<>(); + + for (int i = 0; i < nodes; i++) { + timeVector.put(i, 0); + } + this.timeVector = timeVector; + this.index = index; + } + @Override + public int getTimeStamp() { + return timeVector.get(index); + } + + @Override + public void increment() + { + int currentValue = getTimeStamp(); + timeVector.put(index, currentValue + 1); + } + + @Override + public String printTimeStamp() { + + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : timeVector.entrySet()) { + sb.append(entry.getKey()).append(": ").append(entry.getValue()).append(" "); + } + return sb.toString(); + } + + + public void update(int senderTime, Object... args) + { + + int senderIndex = (int) args[0]; + String vector = (String) args[1]; + if(senderIndex == index) + { + timeVector.compute(index, (k,v) -> (v==0) ? 1: v + 1); + } + else + { + HashMap received = deserialize(vector); + compareVectors(received); + increment(); + + } + + } + + private static synchronized String serializeVector ( HashMap vector ) { + return serializer.toJson(vector); + } + + public static HashMap deserialize(String vector) + { + Type contentType = new TypeToken>() {}.getType(); + HashMap deserializedHashMap = serializer.fromJson(vector, contentType); + + return deserializedHashMap; + } + + public synchronized void compareVectors(HashMap received) + { + + for (Integer key : timeVector.keySet()) { + if (received.containsKey(key) && received.get(key) > timeVector.get(key)) { + timeVector.put(key, received.get(key)); + } + } + } + protected final int index; + protected HashMap timeVector; + private static final Gson serializer = new Gson(); +} diff --git a/src/main/java/dev/oxoo2a/sim4da/VectorControlTerminator.java b/src/main/java/dev/oxoo2a/sim4da/VectorControlTerminator.java new file mode 100644 index 0000000..86c3e07 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/VectorControlTerminator.java @@ -0,0 +1,66 @@ +package dev.oxoo2a.sim4da; + +import java.util.ArrayList; +import java.util.concurrent.Semaphore; + +import static java.lang.Thread.sleep; + +public class VectorControlTerminator +{ + + public VectorControlTerminator() + { + thread = new Thread(this::main); + thread.setName("VectorControlTerminator"); + } + + public void setSimulator(Node2Simulator s ) { + this.simulator = s; + } + + private void main() + { + try { + sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + while(true) + { + if (stop) break; + + if(checkIfFinilised()) + { + simulator.updateStatus(); + simulator.emit("TERMINATING BASED ON VECTOR CONTROL", "clock"); + System.exit(0); + break; + + } + + } + } + + private boolean checkIfFinilised() + { + return simulator.checkIfFinilised(); + } + + public void start () { + + thread.start(); + } + public void end() throws InterruptedException { + thread.join(); + } + + public void stop () { + + stop = true; + + } + static boolean stop; + public Thread thread; + private Node2Simulator simulator; + private static int runs = 0; +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index bc83ce1..9e57cec 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -5,14 +5,28 @@ - - + + + + + + + + + + + + + + + + diff --git a/src/test/java/dev/oxoo2a/sim4da/BroadcastNode.java b/src/test/java/dev/oxoo2a/sim4da/BroadcastNode.java index 0f73569..b27a6b4 100644 --- a/src/test/java/dev/oxoo2a/sim4da/BroadcastNode.java +++ b/src/test/java/dev/oxoo2a/sim4da/BroadcastNode.java @@ -14,11 +14,18 @@ public void main () { int broadcasts_sent = 0; int loops = 0; // Create a message with a random candidate to send the next broadcast - Message m_broadcast = new Message().add("Sender",myId).add("Candidate",r.nextInt(numberOfNodes())); + clock.increment(); + Message m_broadcast = new Message() + .add("Sender",myId) + .add("Candidate",r.nextInt(numberOfNodes())) + .add("Time", clock.getTimeStamp()); + m_broadcast = this.clock.getTimeVector() != null ? m_broadcast.add("Vector", clock.getTimeVector()) : m_broadcast; sendBroadcast(m_broadcast); + emit("Node %d, ClockTime %s","clock",myId, this.clock.printTimeStamp()); + broadcasts_sent++; while (stillSimulating()) { loops++; - emit("Node %d, Loop %d",myId,loops); + emit("Node %d, Loop %d","main",myId,loops); Network.Message m_raw = receive(); if (m_raw == null) break; // Null == Node2Simulator time ends while waiting for a message broadcasts_received++; @@ -27,15 +34,21 @@ public void main () { // JSON encoded messages must be deserialized into a Message object Message m_json = Message.fromJson(m_raw.payload); int c = Integer.parseInt(m_json.query("Candidate")); + clock.update(Integer.parseInt(m_json.query("Time")), Integer.parseInt(m_json.query("Sender")), m_json.query("Vector")); + emit("Node %d -> Receiver %d, ClockTime on %d %s ","clock", c, myId, myId, this.clock.printTimeStamp()); // Who's the next candidate for sending a broadcast message. There's also a small probability, that we // send a broadcast message anyway :-) if ((c == myId) || (r.nextInt(100) < 5)) { // The next sender for a broadcast message is selected randomly - m_broadcast.add("Candidate",r.nextInt(numberOfNodes())); + clock.increment(); + m_broadcast.add("Candidate",r.nextInt(numberOfNodes())) + .add("Time", clock.getTimeStamp()); + m_broadcast = this.clock.getTimeVector() != null ? m_broadcast.add("Vector", clock.getTimeVector()) : m_broadcast; sendBroadcast(m_broadcast); + emit("Node %d, ClockTime %s","clock",myId, this.clock.printTimeStamp()); broadcasts_sent++; } } - emit("%d: %d broadcasts received and %d broadcasts sent",myId,broadcasts_received,broadcasts_sent); + emit("%d: %d broadcasts received and %d broadcasts sent","main",myId,broadcasts_received,broadcasts_sent); } } diff --git a/src/test/java/dev/oxoo2a/sim4da/SimulatorTest.java b/src/test/java/dev/oxoo2a/sim4da/SimulatorTest.java index 7476fdc..5808584 100644 --- a/src/test/java/dev/oxoo2a/sim4da/SimulatorTest.java +++ b/src/test/java/dev/oxoo2a/sim4da/SimulatorTest.java @@ -3,12 +3,13 @@ import static org.junit.jupiter.api.Assertions.*; import java.beans.Transient; +import java.lang.reflect.InvocationTargetException; import org.junit.jupiter.api.Test; public class SimulatorTest { private final int n_nodes = 3; - private final int duration = 2; + private final int duration = 10; @Test public void simpleSimulation() { @@ -18,11 +19,12 @@ public void simpleSimulation() { s.attachNode(id,n); } try { - s.runSimulation(duration); + s.runSimulation(duration, "lamport"); } - catch (InstantiationException ignored) { + catch (Exception ignored) { fail("Not all nodes instantiated"); } + } @Test @@ -30,6 +32,6 @@ public void someNodesNotInstantiated () { Simulator s = Simulator.createDefaultSimulator(n_nodes); s.attachNode(0,new BroadcastNode(0)); s.attachNode(1,new BroadcastNode(1)); - assertThrows(InstantiationException.class,() -> {s.runSimulation(duration);}); + assertThrows(InstantiationException.class,() -> {s.runSimulation(duration, "lamport");}); } } diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml index 820d988..9e57cec 100644 --- a/src/test/resources/log4j2-test.xml +++ b/src/test/resources/log4j2-test.xml @@ -8,11 +8,25 @@ + + + + + + + + + + + + + +