diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index aa991fc..fae0804 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/java/dev/oxoo2a/sim4da/Node.java b/src/main/java/dev/oxoo2a/sim4da/Node.java index af1d1f7..063f4a3 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Node.java +++ b/src/main/java/dev/oxoo2a/sim4da/Node.java @@ -64,6 +64,6 @@ public void stop () { } protected final int myId; - private Node2Simulator simulator; + protected Node2Simulator simulator; private final Thread t_main; } diff --git a/src/main/java/dev/oxoo2a/sim4da/Simulator.java b/src/main/java/dev/oxoo2a/sim4da/Simulator.java index 4372846..a73ad7d 100644 --- a/src/main/java/dev/oxoo2a/sim4da/Simulator.java +++ b/src/main/java/dev/oxoo2a/sim4da/Simulator.java @@ -36,6 +36,7 @@ public void runSimulation ( int duration ) throws InstantiationException { // Check that all nodes are attached for ( Simulator2Node n : nodes.values() ) { if (n == null) throw new InstantiationException(); + System.out.println("set simulator"); n.setSimulator(this); } diff --git a/src/main/java/dev/oxoo2a/sim4da/TokenRingNode.java b/src/main/java/dev/oxoo2a/sim4da/TokenRingNode.java new file mode 100644 index 0000000..2755651 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/TokenRingNode.java @@ -0,0 +1,26 @@ +package dev.oxoo2a.sim4da; + +public class TokenRingNode extends Node{ + public TokenRingNode(int my_id) { + super(my_id); + } + + @Override + protected void main() { + Message m = new Message(); + if(myId == 0){ + m.add("counter", 0); + sendUnicast(1, m); + } + while (true){ + Network.Message m_raw = receive(); + if(m_raw == null) break; + m = Message.fromJson(m_raw.payload); + int counter = Integer.parseInt(m.query("counter")); + emit("%d: counter == %d", myId, counter); + counter++; + m.add("counter", counter); + sendUnicast((myId + 1) % numberOfNodes(),m); + } + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/clock/ClockType.java b/src/main/java/dev/oxoo2a/sim4da/clock/ClockType.java new file mode 100644 index 0000000..5e7fdc0 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/clock/ClockType.java @@ -0,0 +1,6 @@ +package dev.oxoo2a.sim4da.clock; + +public enum ClockType { + LAMPORT, + VECTOR +} \ No newline at end of file diff --git a/src/main/java/dev/oxoo2a/sim4da/clock/ClockUtil.java b/src/main/java/dev/oxoo2a/sim4da/clock/ClockUtil.java new file mode 100644 index 0000000..87b7d72 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/clock/ClockUtil.java @@ -0,0 +1,12 @@ +package dev.oxoo2a.sim4da.clock; + +public class ClockUtil { + + public static LogicClock create(int id, ClockType type){ + return switch (type) { + case LAMPORT -> new LampertClock(id); + case VECTOR -> new VectorClock(id); + }; + } + +} diff --git a/src/main/java/dev/oxoo2a/sim4da/clock/LampertClock.java b/src/main/java/dev/oxoo2a/sim4da/clock/LampertClock.java new file mode 100644 index 0000000..b326a50 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/clock/LampertClock.java @@ -0,0 +1,52 @@ +package dev.oxoo2a.sim4da.clock; + +public class LampertClock extends LogicClock{ + + public LampertClock(int nodeId){ + super(nodeId, ClockType.LAMPORT); + this.time = 0; + } + + @Override + public void synchronize(String timeStamp) { + super.synchronize(timeStamp); + + + if(this.tempTimestamps.keySet().size()>1){ + System.err.println("more than one timestamp was extracted from string, should not happen with lamportClock"); + System.out.println(timeStamp); + } + for (Integer senderId : tempTimestamps.keySet()){ + System.out.println("Node " + this.nodeId + " syncing " + this.getTime() + " with "+ tempTimestamps.get(senderId)); + + + this.time = Math.max(this.time, tempTimestamps.get(senderId)); + } + + } + + + + @Override + public void tick() { + this.time++; + } + + @Override + public ClockType getType() { + return type; + } + + @Override + protected void printTimeStamps(){ + System.out.println("TimeStampts of Node " + this.nodeId + " | Time " + this.time); + } + + public int getTime() { + return time; + } + + public int getNodeId() { + return this.getNodeId(); + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/clock/LogicClock.java b/src/main/java/dev/oxoo2a/sim4da/clock/LogicClock.java new file mode 100644 index 0000000..ba6251a --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/clock/LogicClock.java @@ -0,0 +1,78 @@ +package dev.oxoo2a.sim4da.clock; + + +import java.util.HashMap; +import java.util.StringTokenizer; + +/** + * LogicClock is instantiated for each TimedNode. A LogicClock saves its NodeID, timestamp, and temporary timestamps + * as received from other Nodes to synchronize. Further data and functionality is handled by Subclasses LamportClock/VectorClock. + */ + +public abstract class LogicClock { + + protected int nodeId; + protected int time; + public final ClockType type; + protected HashMap tempTimestamps = new HashMap<>(); + + public LogicClock(int nodeId, ClockType type){ + this.nodeId = nodeId; + this.type = type; + this.time =0; + } + + public ClockType getType(){ + return this.type; + } + + + public void tick(){ + this.time++; + } + + /** + * Parent Method to synchronize a Clock with a Time String as received in a Message (raw payload). + * synchronize() utilizes String Tokenization to extract all Timestamps from the Payload string. A Timestamp in the + * Payload always leads with the Characters '%T' + NodeID. + * The IDs and Times are saved in a Hashmap 'tempTimestamps' that is cleared with every new call of the method. + * tempTimestamps is used in the child class method to access all timestamps. When using Lamport time, + * the tempTimestamps Map always contains exactly one entry. + * Subclasses Override this Function by adding statements regarding the handling of the extracted Time information. + * @param payload the entire Payload string from the Message + */ + public void synchronize(String payload){ + //First Tokenizer to collect all entrys from the payload + StringTokenizer tokenizer = new StringTokenizer(payload, ","); + // clear temporary timestamp field. + tempTimestamps.clear(); + while (tokenizer.hasMoreTokens()){ + int senderId = -1; + String token = tokenizer.nextToken(); + String s = ""; + // if token is marked as containing a timestamp + if (token.contains("%T")) { + // Seconds Tokenizer splits entry into Node ID and associated timestamp + StringTokenizer subTokenizer = new StringTokenizer(token, ":"); + s = subTokenizer.nextToken(); + for (int i = 0; i < s.length(); i++) { + if (s.charAt(i) == 'T') { + senderId = Integer.parseInt(s.substring(i + 1, s.length() - 1)); + break; + } + + } + + s = subTokenizer.nextToken(); + int senderTime = Integer.parseInt(s.substring(1, s.length()-1)); + tempTimestamps.put(senderId, senderTime); + } + } + + } + protected void printTimeStamps(){} + public int getTime(){ + return this.time; + } + +} diff --git a/src/main/java/dev/oxoo2a/sim4da/clock/TimedNode.java b/src/main/java/dev/oxoo2a/sim4da/clock/TimedNode.java new file mode 100644 index 0000000..756fb4d --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/clock/TimedNode.java @@ -0,0 +1,73 @@ +package dev.oxoo2a.sim4da.clock; + +import dev.oxoo2a.sim4da.Message; +import dev.oxoo2a.sim4da.Network; +import dev.oxoo2a.sim4da.Node; + + +/** + * Basic Parent class for implementing Nodes with Logic Clocks. + * Time keeping is handled in send and receive methods using an Instance of LogicClock for each Node. + * Sending includes adding the nodes timestamp according to the used ClockType. + * Receiving includes parsing the time Information and updating a node's timestamp (synchronize(), see LogicClock.java) + */ +public class TimedNode extends Node { + LogicClock lc; + + public TimedNode(int my_id, ClockType type) { + super(my_id); + this.lc = ClockUtil.create(my_id, type); + } + + protected void sendUnicast ( int receiver_id, Message m ) { + + this.lc.tick(); + emit("%d ticked to %d",this.myId ,this.lc.getTime()); + + + // clear all entries from message containing the timestamp identifier '%T'. + m.getMap().entrySet().removeIf(entry -> entry.getKey().contains("%T")); + + if(this.lc instanceof LampertClock){ + m.add("%T"+ myId, lc.getTime()); + } + if(this.lc instanceof VectorClock){ + for (Integer id: ((VectorClock) lc).getTimeVector().keySet()){ + m.add("%T"+id, ((VectorClock) lc).getTimeVector().get(id)); + } + } + if(this.lc instanceof VectorClock){ + ((VectorClock) this.lc).printVectorLine(((VectorClock) this.lc).getTimeVector()); + } + System.out.println(); + + this.simulator.sendUnicast(myId,receiver_id, m.toJson()); + } + + protected Network.Message receive () { + + Network.Message m = simulator.receive(myId); + + lc.tick(); + emit("%d ticked to %d",this.myId ,this.lc.getTime()); + + // synchronisation of clocks + if(m != null){ + this.lc.synchronize(m.payload); + } + + + return m; + } + + + public LogicClock getLc() { + return lc; + } + + @Override + protected void main() {} + + + +} diff --git a/src/main/java/dev/oxoo2a/sim4da/clock/VectorClock.java b/src/main/java/dev/oxoo2a/sim4da/clock/VectorClock.java new file mode 100644 index 0000000..746c6c2 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/clock/VectorClock.java @@ -0,0 +1,85 @@ +package dev.oxoo2a.sim4da.clock; + +import dev.oxoo2a.sim4da.Simulator; + +import java.util.HashMap; + + +public class VectorClock extends LogicClock{ + private final HashMap timeVector; + + public VectorClock (int nodeId){ + super(nodeId, ClockType.VECTOR); + this.timeVector = new HashMap<>(); + this.timeVector.put(nodeId, 0); + } + + @Override + public void tick() { + this.timeVector.replace(this.nodeId, this.timeVector.get(this.nodeId) + 1); + } + + @Override + public void synchronize(String timeStamp) { + super.synchronize(timeStamp); + System.out.print("Node " +this.getNodeId() + " is synchronizing " ); + this.printVectorLine(this.getTimeVector()); + System.out.print("(own) and "); + this.printVectorLine(this.tempTimestamps); + + for(Integer id: this.tempTimestamps.keySet()){ + if(!this.timeVector.containsKey(id)){ + this.timeVector.put(id, this.tempTimestamps.get(id)); + }else{ + this.timeVector.put(id, Math.max(this.timeVector.get(id), this.tempTimestamps.get(id))); + } + } + + System.out.println(); + + this.printTimeStamps(); + this.time = this.timeVector.get(getNodeId()); + } + + @Override + public ClockType getType() { + return this.type; + } + @Override + public int getTime() { + return this.timeVector.get(nodeId); + } + public int getTime(int id){ + return this.timeVector.get(id); + } + + public HashMap getTimeVector() { + return this.timeVector; + } + + @Override + protected void printTimeStamps(){ + System.out.print("CurrentVector of Node " + this.nodeId + " | Time = "); + printVectorLine(this.timeVector); + System.out.println(); + } + + + protected void printVectorLine(HashMap vector){ + System.out.print("["); + int counter = 0; + int max = vector.size(); + for (Integer k : vector.keySet()){ + System.out.print(k + ":" + vector.get(k)); + if(!(vector.size()== ++counter)){ + System.out.print(","); + } + } + System.out.print("] "); + } + + + public int getNodeId() { + return this.nodeId; + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/example/Main.java b/src/main/java/dev/oxoo2a/sim4da/example/Main.java new file mode 100644 index 0000000..1854d41 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/example/Main.java @@ -0,0 +1,25 @@ +package dev.oxoo2a.sim4da.example; + +import dev.oxoo2a.sim4da.Node; +import dev.oxoo2a.sim4da.Simulator; +import dev.oxoo2a.sim4da.clock.ClockType; + +public class Main { + public static void main(String[] args) { + int n_nodes = 5; + Simulator s = Simulator.createDefaultSimulator(n_nodes); + + for (int id=0; id Constructor expects ClockType + Node n = new TimedTokenRingNode(id, ClockType.VECTOR); + s.attachNode(id,n); + } + + try{ + s.runSimulation(1); + }catch (InstantiationException e){ + System.err.println("Instantiation failed. Time to investigate."); + } + + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/example/TimedTokenRingNode.java b/src/main/java/dev/oxoo2a/sim4da/example/TimedTokenRingNode.java new file mode 100644 index 0000000..1b1eb8d --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/example/TimedTokenRingNode.java @@ -0,0 +1,52 @@ +package dev.oxoo2a.sim4da.example; + +import dev.oxoo2a.sim4da.Message; +import dev.oxoo2a.sim4da.Network; +import dev.oxoo2a.sim4da.clock.ClockType; +import dev.oxoo2a.sim4da.clock.TimedNode; + +/** + * Example. Class extends TimedNode. Aside from adding the very first timestamp and specifying the ClockType in the + * constructor, this class is identical to the TokenRingNode example; + */ +public class TimedTokenRingNode extends TimedNode { + public TimedTokenRingNode(int my_id, ClockType type) { + super(my_id, type); + } + + @Override + public void main(){ + Message m = new Message(); + + if(myId == 0){ + m.add("counter", 0); + sendUnicast(1, m); + m.add("%T"+myId, 0); + System.out.println("sending Message"); + } + while (true){ + /* + long startTime = System.currentTimeMillis(); + long delayTime = 100; // 2000 milliseconds = 2 seconds + while (System.currentTimeMillis() - startTime < delayTime) { + // This loop will keep executing until the specified delay time has elapsed + } + */ + Network.Message m_raw = receive(); + + if(m_raw == null){ + System.out.println("Message contents empty. Node dies."); + break; + } + + m = Message.fromJson(m_raw.payload); + int counter = Integer.parseInt(m.query("counter")); + emit("%d: counter == %d", myId, counter); + counter++; + m.add("counter", counter); + sendUnicast((myId + 1) % numberOfNodes(),m); + } + emit("Node %d ended with internal timestamp %d", this.myId, this.getLc().getTime()); + + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/termination/ControlVectorCoordinator.java b/src/main/java/dev/oxoo2a/sim4da/termination/ControlVectorCoordinator.java new file mode 100644 index 0000000..10ca59f --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/termination/ControlVectorCoordinator.java @@ -0,0 +1,87 @@ +package dev.oxoo2a.sim4da.termination; + +import dev.oxoo2a.sim4da.Message; +import dev.oxoo2a.sim4da.Network; +import dev.oxoo2a.sim4da.Node; + +import java.util.StringTokenizer; + +/** + * Control Vector Coodinator: + * Schickt (wenn der boolean 'prompt' wahr ist), eine Nachricht mit einem Nullvektor an den ersten Basisaktor und + * wartet dann auf eine Antwort. Jeder Basisaktor schickt diese Vektornachricht an den nächsten Basisaktor, bis der + * letzte die Nachricht wieder zurück an diesen Coordinator schickt. Erst dann wird geprüft, ob der Vektor, der + * zurückgekommen ist, ein Nullvektor ist. Ansonsten wird 'prompt' wieder auf wahr gesetzt und eine neue Nachricht + * wird losgeschickt und macht die Runde. VectorStrings in den Nachrichten entsprechen der Form + * "id1:value1;id2:value2;id3:value3;..." und können so einfach mit einem Tokenizer ausgelesen werden. + * + * Anmerkung: Mir ist im Nachhinein aufgefallen, dass ich das Controlvektor Verfahren nicht genau wie in der Vorlesung + * beschrieben umgesetzt hab. In der VL behält der Controlvector durchlaufübergreifend seine Einträge und die lokalen + * Vektoren der Basisaktoren werden auf 0 gesetzt, und somit immer vom Controlvector "eingesammelt". In meiner Implementierung + * behalten die Basisaktoren ihre lokalen Vektoren über den ganzen Simulationsverlauf, der Controlvector beginnt allerdings + * in jedem neuen Durchlauf immer als 0-Vektor. Das Verfahren ist im Ergebnis also Äquivalent. + */ +public class ControlVectorCoordinator extends Node { + + boolean prompt = true; + boolean finished = false; + + public ControlVectorCoordinator(int my_id) { + super(my_id); + } + + @Override + protected void main() { + while (true) { + Message m; + //send round message only after the message arrived back at coordinator (and in the beginning) + if(prompt){ + prompt=false; + m=new Message(); + String vectorString=""; + // At the start of the control vector round trip, initialize all entries to 0 + for (int i = 0; i < TerminationMain.n_nodes-1; i++) { + String subString = i+":"+0+";"; + vectorString = vectorString+subString; + } + vectorString = vectorString + (TerminationMain.n_nodes-1)+ ":" + 0; + m.add("type" , "control_vector"); + m.add("vector", vectorString); + sendUnicast(0,m); + } + + //wait for response + Network.Message m_raw = receive(); + if(m_raw!=null){ + /** + * Hier kommt nur eine Nachricht an, wenn sie zuvor bei allen Basisnodes gewesen ist. In jeder Basisnode wird + * der Vektor mit dem lokalen Vektor verrechnet und weitergeschickt. Wenn hier wieder der Nullvektor zurückkommt + * ist das System terminiert. + */ + + m = Message.fromJson(m_raw.payload); + String vectorString = m.query("vector"); + StringTokenizer tokenizer = new StringTokenizer(vectorString, ";"); + finished = true; + //check if vector came back as zero + while(tokenizer.hasMoreTokens()){ + String entry = tokenizer.nextToken(); + StringTokenizer subTokenizer = new StringTokenizer(entry,":"); + int id = Integer.parseInt(subTokenizer.nextToken()); + int value = Integer.parseInt(subTokenizer.nextToken()); + if(value != 0){ + finished = false; + break; + } + } + if(finished){ + System.out.println("CONTROL VECTOR SAYS SYSTEM TERMINATED"); + System.exit(0); + }else{ + // restart trailing control vector + prompt = true; + } + } + } + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/termination/DoubleCountingCoordinator.java b/src/main/java/dev/oxoo2a/sim4da/termination/DoubleCountingCoordinator.java new file mode 100644 index 0000000..86beac2 --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/termination/DoubleCountingCoordinator.java @@ -0,0 +1,104 @@ +package dev.oxoo2a.sim4da.termination; + +import dev.oxoo2a.sim4da.Message; +import dev.oxoo2a.sim4da.Network; +import dev.oxoo2a.sim4da.Node; + +/** + * Double Counting Coordinator: + * Speichert Felder mit der Summe empfangener/gesendeter Nachrichten für beide Durchläufe. Zusätzlich zwei booleans 'prompt' + * um zu kontrollieren, wann ein Doppelzählverfahren angestoßen wird, und seconds prompt um zu unterscheiden um + * welchen Durchlauf es sich handelt. int promptsReceived zählt mit, wie viele Nachrichten von den Basisaktoren + * zurückkommen und vergleicht dies mit der Anzahl von Basisaktoren um zu prüfen, wann ein Durchlauf abgeschlossen ist. + * + */ +public class DoubleCountingCoordinator extends Node { + + int messagesSent1 = 0; + int messagesReceived1=0; + int messagesSent2=0; + int messagesReceived2=0; + + int promptsReceived; + boolean secondPrompt=false; + boolean prompt= true; + + public DoubleCountingCoordinator(int my_id) { + super(my_id); + } + + @Override + protected void main() { + Message m; + + /** + * Schickt eine Aufforderung an alle Basisklassen. Hier ist kein broadcast() verwendet, damit die Nachricht + * nicht auch an den ControlVectorCoordinator geht. Diese ließe sich aber auch einfach dort abfangen. + */ + while(true){ + if(prompt){ + for (int i = 0; i < TerminationMain.n_nodes; i++) { + m = new Message(); + m.add("type", "double_counting"); + sendUnicast(i,m); + } + prompt = false; + } + + Network.Message m_raw = receive(); + if(m_raw == null){ + System.out.println("Coordinator got null message"); + /** + * Routine für das Empfangen von Nachrichten. Werte auslesen und in die entsprechenden Felder speichern, + * je nachdem ob dies der erste oder zweite Zähldurchlauf ist. + */ + }else{ + promptsReceived++; + m = Message.fromJson(m_raw.payload); + String sent = m.query("sent"); + String received = m.query("received"); + + if(!secondPrompt){ + messagesSent1 += Integer.parseInt(sent); + messagesReceived1 += Integer.parseInt(received); + }else{ + messagesSent2 += Integer.parseInt(sent); + messagesReceived2 += Integer.parseInt(received); + } + /** + * Falls alle Basisaktoren Nachrichten zurückgeschickt haben: Prüfe ob die Werte übereinstimmen + * -> Entweder terminieren oder boolean 'prompt' auf true setzten, damit das nächste Doppelzählverfahren startet. + */ + if(promptsReceived == TerminationMain.n_nodes){ + promptsReceived = 0; + //nur was machen falls dies der zweite durchlauf war. Ansonsten boolean 'secondPrompt' setzen um + // signalisieren, dass der zweite Durchlauf jetzt stattfindet. + if(secondPrompt){ + System.out.println("SECOND ROUND IS OVER: VALUES \n" + + messagesSent1 + "\n" + + messagesReceived1 + "\n" + + messagesSent2 + "\n" + + messagesReceived2); + if (messagesSent1 == messagesSent2 + && messagesReceived1 == messagesReceived2 + && messagesSent1 == messagesReceived1){ + System.out.println("DOUBLE COUNTING SAYS SYSTEM TERMINATED"); + //System.exit(0); + break; + } + + messagesSent1=0; + messagesReceived1=0; + messagesSent2=0; + messagesReceived2=0; + + secondPrompt = false; + }else{ + secondPrompt = true; + } + prompt=true; + } + } + } + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/termination/ProbabilisticNode.java b/src/main/java/dev/oxoo2a/sim4da/termination/ProbabilisticNode.java new file mode 100644 index 0000000..f30503c --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/termination/ProbabilisticNode.java @@ -0,0 +1,152 @@ +package dev.oxoo2a.sim4da.termination; + +import dev.oxoo2a.sim4da.Message; +import dev.oxoo2a.sim4da.Network; +import dev.oxoo2a.sim4da.Node; + +import java.util.*; + +/** + * Basisaktor: Da diese Nodes sowhol mit beiden Coordinatoren interagieren speichern sie jeweils sowohl die Anzahl + * gesendeter/empfangener Nachrichten als auch ihren lokalen Vektor V. + */ +public class ProbabilisticNode extends Node { + private boolean active; + private int messagesSent; + private int messagesReceived; + private int[] V; // Kann ein int-array sein, solange das Indexing stimmt. Deshalb Basisnodes vor Coordinators initialisieren. + + public ProbabilisticNode(int my_id) { + super(my_id); + active = true; + V = new int[TerminationMain.n_nodes]; + } + + @Override + protected void main() { + Message m = new Message(); + int receiver; + + /** + * Für den Start der zufallsbeeinflussten Nachrichten schicken zunächst alle Aktoren eine Basisnachricht an immer + * einen zufälligen anderen Basisaktor. Bei jedem Sende/Empfangsereignis von Basisnachrichten werden die entsprechenden + * Felder aktualisiert. + */ + m.add("type", "base"); + m.add("counter", 1); + receiver = generateRandomNumber(TerminationMain.n_nodes, myId); + sendUnicast(receiver, m); + V[receiver]++; + messagesSent++; + + while (true){ + /** + * Die Schleife wartet nur auf Nachrichten. Entsprechend des "type" einer Nachricht werden unterschiedliche Dinge gemacht. + * Node wird nicht wieder aktiv und es werden keine Counter verändert. + */ + Network.Message m_raw = receive(); + if(m_raw == null) { + System.out.println("breaking"); + } + m = Message.fromJson(m_raw.payload); + String type = m.query("type"); + + /** + * Double Counting Nachrichten: Sende eigene Counter für sent/received an Coordinator. + * Node wird nicht wieder aktiv und es werden keine Counter verändert. + */ + if(Objects.equals(type, "double_counting")) { + m = new Message(); + m.add("sent", messagesSent); + m.add("received", messagesReceived); + sendUnicast(TerminationMain.double_count_coordinator_id, m); + + /** + * Control Vector Nachrichten: + * parseVector(): Lese den Vector aus dem Message content, addiere zu lokalem Vector und erzeuge einen neuen Vectorstring + * für das Ergebnis. Dieser String wird an die nächste Basisnode gesendet, oder zurück an den Coordinator, + * falls die betrachtete Instanz die letzte Node (id == TerminationMain.n_nodes-1) ist. Node wird wieder aktiv. + * + */ + }else if(Objects.equals(type, "control_vector")){ + String vector = m.query("vector"); + // Parse Vector String, add with local vector V and create new Vector String + String newVectorString = parseVector(vector); + + m = new Message(); + m.add("type", "control_vector"); + m.add("vector", newVectorString); + //Send message in round trip as in TokenRingNode.java, except the last Node sends back to coordinator + //This architecture only works if all base nodes are initialized before coordinators. + receiver = myId== TerminationMain.n_nodes-1 ? TerminationMain.control_vector_coordinator_id : myId+1; + sendUnicast(receiver,m); + /** + * Falls diese Condition greift handelt es sich um eine Basisnachricht. Die Counter werden aktualisiert, + * die Node wird aktiv und es wird mit der festgelegten Wahrscheinlichkeit eine neue Nachricht an eine zufällig ausgewählte + * Basisnode geschickt. + */ + }else{ + int counter = Integer.parseInt(m.query("counter")); + if(Objects.equals(type, "base")){ + //decrease own entries + V[myId]--; + messagesReceived++; + active = true; + } + counter++; + m.add("counter", counter); + //Receiver zufällig bestimmen + receiver = generateRandomNumber(TerminationMain.n_nodes, myId); + if(active){ + Random rand = new Random(); + if(rand.nextDouble()< TerminationMain.probability){ + //increase entries + messagesSent++; + V[receiver]++; + sendUnicast(receiver,m); + }else{ + System.out.println(myId + " missed probability"); + } + active = false; + } + } + } + } + + /** + * Erzeuge eine Zufallszahl zwischen 0 und range, ohne exclude. Somit senden Nodes ihre Basisnachrichten an andere Basisnodes, + * aber nicht nochmal an sich selbst. + */ + public static int generateRandomNumber(int range, int exclude) { + Random rand = new Random(); + int randomNum; + do { + randomNum = rand.nextInt(range); + } while (randomNum == exclude); + return randomNum; + } + + /** + * + * @param vectorString Vector der Form "id1:val1;id2:val2;..:" + * @return neuer Vector (Summe des übergebenen und des lokalen vektors) als String in der entsprechenden Form. + */ + private String parseVector(String vectorString){ + StringTokenizer tokenizer = new StringTokenizer(vectorString, ";"); + int[] newV = new int[TerminationMain.n_nodes]; + while(tokenizer.hasMoreTokens()){ + String vectorField = tokenizer.nextToken(); + StringTokenizer subTokenizer = new StringTokenizer(vectorField, ":"); + int id = Integer.parseInt(subTokenizer.nextToken()); + int val = Integer.parseInt(subTokenizer.nextToken()); + + newV[id] = V[id] + val; + } + String newVectorString =""; + for (int i = 0; i < newV.length-1; i++) { + newVectorString = newVectorString + i +":" + newV[i] + ";"; + } + newVectorString = newVectorString + (newV.length-1) +":" + newV[newV.length-1]; + return newVectorString; + } +} diff --git a/src/main/java/dev/oxoo2a/sim4da/termination/TerminationMain.java b/src/main/java/dev/oxoo2a/sim4da/termination/TerminationMain.java new file mode 100644 index 0000000..3c4296d --- /dev/null +++ b/src/main/java/dev/oxoo2a/sim4da/termination/TerminationMain.java @@ -0,0 +1,46 @@ +package dev.oxoo2a.sim4da.termination; + +import dev.oxoo2a.sim4da.Node; +import dev.oxoo2a.sim4da.Simulator; + +public class TerminationMain { + /** + * Die statischen Variablen werden benutzt um aus allen Nodes auf die relevanten größen zuzugreifen. In einem echten + * verteilen System ginge das so natürlich nicht, aber man könnte diese Informationen zB Initial an alles Aktoren schicken + * und als Aktor mit dem Beginn der eigentlichen Funktionalität auf Erhalt dieser Initialisierungsnachricht warten. + * Damit die IDs stimmen ist es in meiner Implementierung wichtig, dass die Basisaktoren zuerst initialisiert werden. + * + * Die Simulation läuft für 100 sekunden, sobald die Terminierung festegestellt wurde wird das Programm aber mit + * System.exit(0) beendet. Dieser Aufruf ist momentan in der ControlVectorCoordinator Klasse, da diese bisher immer + * ein kleines bisschen langsamer ist. + * Beide Coordinator melden die Terminierung in der Konsole, sodass die korrekte Terminierung von beiden geprüft werden kann. + * Der Print ist "[...] SAYS SYSTEM TERMINATED" + */ + static int n_nodes = 150; + static double probability =0.99; + static int double_count_coordinator_id = n_nodes; + static int control_vector_coordinator_id = double_count_coordinator_id+1; + + public static void main(String[] args) { + Node n; + Simulator s = Simulator.createDefaultSimulator(n_nodes+2); + + for (int id=0; id