11package convex .node ;
22
3- import java .io .IOException ;
43import java .net .InetSocketAddress ;
54import java .util .ArrayList ;
65import java .util .List ;
2120/**
2221 * Demo application showing automatic lattice synchronization across multiple nodes.
2322 *
24- * This demo:
25- * - Creates 3 NodeServers with automatic propagation
26- * - Performs multiple merge operations on node 1
27- * - Verifies all nodes converge to the same lattice value
23+ * <h2>What This Demo Shows</h2>
24+ * This demonstrates how Convex lattices automatically synchronize across a decentralized network:
25+ * <ul>
26+ * <li>Multiple independent nodes can share the same lattice data structure</li>
27+ * <li>Updates made on one node automatically propagate to all connected peers</li>
28+ * <li>All nodes eventually converge to the same state (eventual consistency)</li>
29+ * <li>No central coordinator needed - fully peer-to-peer</li>
30+ * </ul>
31+ *
32+ * <h2>How It Works</h2>
33+ * <ol>
34+ * <li>Each NodeServer has a LatticePropagator that watches for local changes</li>
35+ * <li>When a change is detected, it broadcasts a delta (only the new data) to peers</li>
36+ * <li>Peers receive the delta and merge it into their local lattice copy</li>
37+ * <li>Lattice merge is mathematically guaranteed to converge (CRDT properties)</li>
38+ * <li>Missing data is automatically requested and filled in as needed</li>
39+ * </ol>
40+ *
41+ * <h2>Key Concepts</h2>
42+ * <ul>
43+ * <li><b>Lattice</b>: A data structure with a merge operation that is commutative, associative, and idempotent</li>
44+ * <li><b>Delta Broadcast</b>: Only new/changed data is sent, not the entire state</li>
45+ * <li><b>Convergence</b>: All nodes reach the same final state regardless of message ordering</li>
46+ * <li><b>Self-Healing</b>: Missing data is detected and automatically fetched from peers</li>
47+ * </ul>
2848 */
2949public class LatticeDemo {
3050
31- private static final int NUM_NODES = 3 ;
32- private static final int MERGES = 100 ;
33- private static final int MODS = 100 ;
34- private static final int BASE_PORT = 19700 ;
51+ // Configuration: How many nodes and operations to run
52+ private static final int NUM_NODES = 3 ; // Number of independent nodes in the network
53+ private static final int MERGES = 100 ; // Number of merge batches to perform
54+ private static final int MODS = 100 ; // Number of modifications per merge batch
55+ private static final int BASE_PORT = 19700 ; // Starting port number for nodes
3556
3657 public static void main (String [] args ) throws Exception {
3758 System .out .println ("=== Lattice Synchronization Demo ===" );
3859 System .out .println ("Creating " + NUM_NODES + " nodes with automatic propagation...\n " );
3960
40- // Create nodes and stores
61+ // Collections to hold our nodes and their storage
4162 List <NodeServer <?>> servers = new ArrayList <>();
4263 List <AStore > stores = new ArrayList <>();
64+
65+ // Use Lattice.ROOT - the standard Convex lattice with :data keyword for storage
4366 ALattice <?> lattice = Lattice .ROOT ;
4467
4568 try {
46- // Launch all servers
69+ // STEP 1: Create and launch independent nodes
70+ // Each node has its own memory store and listens on a different port
4771 for (int i = 0 ; i < NUM_NODES ; i ++) {
72+ // Create an in-memory store for this node's data
4873 AStore store = new MemoryStore ();
4974 stores .add (store );
5075
76+ // Create and launch a NodeServer
77+ // - Takes the lattice definition (what data structure to use)
78+ // - Takes a store (where to persist data)
79+ // - Takes a port number (how other nodes can connect to it)
5180 NodeServer <?> server = new NodeServer <>(lattice , store , BASE_PORT + i );
52- server .launch ();
81+ server .launch (); // This automatically starts the LatticePropagator
5382 servers .add (server );
5483
5584 System .out .println ("Node " + (i + 1 ) + " started on port " + (BASE_PORT + i ));
5685 }
5786
5887 System .out .println ("\n Establishing peer connections (full mesh)..." );
5988
60- // Establish full mesh peer connections
89+ // STEP 2: Connect all nodes to each other (full mesh topology)
90+ // Each node connects to every other node as a peer
91+ // This creates a fully decentralized network with no single point of failure
6192 for (int i = 0 ; i < NUM_NODES ; i ++) {
6293 NodeServer <?> server = servers .get (i );
6394 for (int j = 0 ; j < NUM_NODES ; j ++) {
64- if (i != j ) {
95+ if (i != j ) { // Don't connect to self
96+ // Create a connection to the remote node
6597 InetSocketAddress peerAddress = new InetSocketAddress ("localhost" , BASE_PORT + j );
6698 Convex peer = ConvexRemote .connect (peerAddress );
99+
100+ // Add this peer to the node's peer list
101+ // Now this node can send broadcasts to this peer
67102 server .addPeer (peer );
68103 }
69104 }
@@ -73,32 +108,51 @@ public static void main(String[] args) throws Exception {
73108 System .out .println ("\n === Starting Merge Operations ===" );
74109 System .out .println ("Performing " + MERGES + " merges with " + MODS + " modifications each on Node 1...\n " );
75110
76- Random random = new Random (12345 ); // Fixed seed for reproducibility
111+ // Use fixed random seed so the demo is reproducible
112+ Random random = new Random (12345 );
113+
114+ // The :data keyword is where we store our key-value pairs in the lattice
115+ // Think of it like a path: lattice[:data][hash] = value
77116 Keyword dataKeyword = Keyword .intern ("data" );
117+
118+ // We'll make all our changes on node 1
119+ // The propagator will automatically broadcast them to nodes 2 and 3
78120 NodeServer <?> node1 = servers .get (0 );
79121
80122 long startTime = System .currentTimeMillis ();
81123
82- // Perform merge operations
124+ // STEP 3: Perform many modifications on Node 1
125+ // This simulates real-world usage where nodes are constantly updating their state
83126 for (int merge = 0 ; merge < MERGES ; merge ++) {
84- // Get current lattice value from node 1
127+ // Get the current data index from node 1's lattice
128+ // The index is like a HashMap that maps Hash -> Value
85129 @ SuppressWarnings ("unchecked" )
86130 Index <Hash , ACell > dataIndex = (Index <Hash , ACell >) node1 .getCursor ().get (dataKeyword );
87131 if (dataIndex == null ) {
132+ // Start with empty index if nothing exists yet
88133 @ SuppressWarnings ("unchecked" )
89134 Index <Hash , ACell > emptyIndex = (Index <Hash , ACell >) Index .EMPTY ;
90135 dataIndex = emptyIndex ;
91136 }
92137
93- // Make modifications
138+ // Add many new key-value pairs to the index
139+ // Each modification is a new entry: Hash(value) -> value
94140 for (int mod = 0 ; mod < MODS ; mod ++) {
95141 long value = random .nextLong (1000000 );
96142 ACell cellValue = CVMLong .create (value );
97143 Hash valueHash = Hash .get (cellValue );
144+
145+ // assoc() creates a new index with the added entry
146+ // Convex data structures are immutable - we get a new version each time
98147 dataIndex = dataIndex .assoc (valueHash , cellValue );
99148 }
100149
101- // Update node 1 with modified value (triggers automatic broadcast)
150+ // Update node 1's lattice with the new data
151+ // This triggers the LatticePropagator to:
152+ // 1. Detect the change
153+ // 2. Compute what's new (delta)
154+ // 3. Broadcast the delta to all connected peers
155+ // All of this happens automatically in the background!
102156 node1 .updateLocalPath (dataIndex , dataKeyword );
103157
104158 if ((merge + 1 ) % 10 == 0 ) {
@@ -111,10 +165,16 @@ public static void main(String[] args) throws Exception {
111165
112166 System .out .println ("\n === Synchronizing All Nodes ===" );
113167
114- // Sync all nodes to ensure convergence
168+ // STEP 4: Ensure all nodes have caught up
169+ // The automatic broadcasts happen in the background, but we can
170+ // explicitly sync to ensure everything is caught up right now
115171 long syncStart = System .currentTimeMillis ();
116172 for (int i = 0 ; i < NUM_NODES ; i ++) {
117173 NodeServer <?> server = servers .get (i );
174+
175+ // sync() queries all peers for their current state and merges it locally
176+ // This ensures this node has the latest data from everyone
177+ // In production, this happens automatically - we're just being explicit here
118178 boolean syncResult = server .sync ();
119179 System .out .println ("Node " + (i + 1 ) + " sync: " + (syncResult ? "SUCCESS" : "FAILED" ));
120180 }
@@ -123,15 +183,23 @@ public static void main(String[] args) throws Exception {
123183
124184 System .out .println ("\n === Verifying Convergence ===" );
125185
126- // Get final values from all nodes
186+ // STEP 5: Check that all nodes have identical state
187+ // This is the key property of lattices - eventual consistency
188+ // No matter what order messages arrived, all nodes should have the same final state
189+
190+ // Get the final lattice value from each node
127191 List <ACell > finalValues = new ArrayList <>();
128192 for (int i = 0 ; i < NUM_NODES ; i ++) {
129193 ACell value = servers .get (i ).getLocalValue ();
130194 finalValues .add (value );
195+
196+ // Show the hash of each node's value
197+ // If hashes match, the entire data structure is identical
131198 System .out .println ("Node " + (i + 1 ) + " final value hash: " + Hash .get (value ));
132199 }
133200
134- // Verify all nodes have the same value
201+ // Compare all nodes to node 1
202+ // If all equal, convergence succeeded!
135203 boolean allEqual = true ;
136204 ACell reference = finalValues .get (0 );
137205 for (int i = 1 ; i < finalValues .size (); i ++) {
@@ -144,7 +212,7 @@ public static void main(String[] args) throws Exception {
144212 if (allEqual ) {
145213 System .out .println ("\n ✓ SUCCESS: All nodes converged to the same lattice value!" );
146214
147- // Calculate statistics
215+ // Show what we accomplished
148216 @ SuppressWarnings ("unchecked" )
149217 Index <Hash , ACell > finalDataIndex = (Index <Hash , ACell >) node1 .getCursor ().get (dataKeyword );
150218 long entryCount = finalDataIndex != null ? finalDataIndex .count () : 0 ;
@@ -154,41 +222,41 @@ public static void main(String[] args) throws Exception {
154222 System .out .println (" Modifications per merge: " + MODS );
155223 System .out .println (" Total modifications: " + (MERGES * MODS ));
156224 System .out .println (" Final lattice entries: " + entryCount );
225+ System .out .println (" (Note: Less than " + (MERGES * MODS ) + " due to random hash collisions)" );
157226 System .out .println (" Merge time: " + mergeTime + "ms" );
158227 System .out .println (" Sync time: " + syncTime + "ms" );
159228 System .out .println (" Total time: " + (mergeTime + syncTime ) + "ms" );
160229
161- // Show propagator statistics
162- System .out .println ("\n Propagator Statistics:" );
230+ // Show how efficient the propagation was
231+ // The key insight: We made 10,000 changes but only needed a handful of broadcasts!
232+ // This is because the propagator intelligently batches and only sends deltas
233+ System .out .println ("\n Propagator Statistics (how automatic sync worked):" );
163234 for (int i = 0 ; i < NUM_NODES ; i ++) {
164235 LatticePropagator <?> propagator = servers .get (i ).getPropagator ();
165236 System .out .println (" Node " + (i + 1 ) + ": " +
166237 propagator .getBroadcastCount () + " delta broadcasts, " +
167238 propagator .getRootSyncCount () + " root syncs" );
168239 }
240+ System .out .println ("\n Delta broadcasts = sending only new data to peers" );
241+ System .out .println (" Root syncs = sending just the top hash to check if peers need updates" );
169242 } else {
170243 System .out .println ("\n ✗ FAILURE: Nodes did not converge!" );
171244 System .exit (1 );
172245 }
173246
174247 } finally {
175- // Cleanup
248+ // CLEANUP: Always close resources properly
176249 System .out .println ("\n Shutting down nodes..." );
250+
251+ // Close all servers (stops network listeners and propagators)
177252 for (int i = 0 ; i < servers .size (); i ++) {
178- try {
179- servers .get (i ).close ();
180- System .out .println ("Node " + (i + 1 ) + " shutdown complete" );
181- } catch (Exception e ) {
182- System .err .println ("Error closing node " + (i + 1 ) + ": " + e .getMessage ());
183- }
253+ servers .get (i ).close ();
254+ System .out .println ("Node " + (i + 1 ) + " shutdown complete" );
184255 }
185256
257+ // Close all stores (releases any resources held by storage)
186258 for (int i = 0 ; i < stores .size (); i ++) {
187- try {
188- stores .get (i ).close ();
189- } catch (Exception e ) {
190- System .err .println ("Error closing store " + (i + 1 ) + ": " + e .getMessage ());
191- }
259+ stores .get (i ).close ();
192260 }
193261 }
194262
0 commit comments