@@ -92,11 +92,6 @@ public class LatticePropagator<V extends ACell> implements Closeable {
9292 */
9393 private volatile boolean running = false ;
9494
95- /**
96- * Flag indicating if a broadcast is currently in progress
97- */
98- private final AtomicBoolean broadcasting = new AtomicBoolean (false );
99-
10095 /**
10196 * Queue for receiving broadcast trigger notifications.
10297 * Uses LatestUpdateQueue which only stores the most recent trigger,
@@ -167,7 +162,7 @@ public synchronized void start() {
167162 propagationThread .setDaemon (true );
168163 propagationThread .start ();
169164
170- log .info ("LatticePropagator started for NodeServer on port {}" , nodeServer .getPort ());
165+ log .debug ("LatticePropagator started for NodeServer on port {}" , nodeServer .getPort ());
171166 }
172167
173168 /**
@@ -213,7 +208,7 @@ private void propagationLoop() {
213208 }
214209 } finally {
215210 Stores .setCurrent (savedStore );
216- log .info ("LatticePropagator stopped" );
211+ log .debug ("LatticePropagator stopped" );
217212 }
218213 }
219214
@@ -251,47 +246,35 @@ private void sendDeltaIfReady(V value, long currentTime) {
251246 return ;
252247 }
253248
254- // Set broadcasting flag
255- if (!broadcasting .compareAndSet (false , true )) {
256- // Already broadcasting, skip
257- log .trace ("Already broadcasting, skipping" );
258- return ;
259- }
260-
261- try {
262- // Send delta broadcasts in a loop until no more changes
263- do {
264- try {
265- broadcast (value );
266- lastAnnouncedValue = value ;
267- lastBroadcastTime = currentTime ;
268- broadcastCount ++;
269- log .debug ("Delta broadcast sent (count: {})" , broadcastCount );
270-
271- // After broadcast, check if value changed again
272- value = nodeServer .getLocalValue ();
273- currentTime = Utils .getCurrentTimestamp ();
274-
275- // Poll queue to consume any trigger (LatestUpdateQueue only stores 1 item)
276- triggerQueue .poll ();
277-
278- // If value changed, continue broadcasting
279- if (!hasValueChanged (value )) {
280- break ; // No more changes, exit loop
281- }
249+ // Send delta broadcasts in a loop until no more changes
250+ do {
251+ try {
252+ broadcast (value );
253+ lastAnnouncedValue = value ;
254+ lastBroadcastTime = currentTime ;
255+ broadcastCount ++;
256+ log .debug ("Delta broadcast sent (count: {})" , broadcastCount );
257+
258+ // After broadcast, check if value changed again
259+ value = nodeServer .getLocalValue ();
260+ currentTime = Utils .getCurrentTimestamp ();
261+
262+ // Poll queue to consume any trigger (LatestUpdateQueue only stores 1 item)
263+ triggerQueue .poll ();
264+
265+ // If value changed, continue broadcasting
266+ if (!hasValueChanged (value )) {
267+ break ; // No more changes, exit loop
268+ }
282269
283- log .debug ("Value changed during broadcast, sending another delta" );
270+ log .debug ("Value changed during broadcast, sending another delta" );
284271
285- } catch (IOException e ) {
286- log .warn ("Error during lattice broadcast" , e );
287- break ;
288- }
289- } while (running && !Thread .currentThread ().isInterrupted ());
272+ } catch (IOException e ) {
273+ log .warn ("Error during lattice broadcast" , e );
274+ break ;
275+ }
276+ } while (running && !Thread .currentThread ().isInterrupted ());
290277
291- } finally {
292- // Clear broadcasting flag
293- broadcasting .set (false );
294- }
295278 }
296279
297280 /**
@@ -491,15 +474,6 @@ public boolean isRunning() {
491474 return running ;
492475 }
493476
494- /**
495- * Checks if a broadcast is currently in progress.
496- *
497- * @return true if broadcasting, false otherwise
498- */
499- public boolean isBroadcasting () {
500- return broadcasting .get ();
501- }
502-
503477 /**
504478 * Gets the number of broadcasts sent by this propagator.
505479 *
@@ -575,7 +549,7 @@ public void close() {
575549 }
576550 }
577551
578- log .info ("LatticePropagator stopped (sent {} delta broadcasts, {} root syncs)" ,
552+ log .debug ("LatticePropagator stopped (sent {} delta broadcasts, {} root syncs)" ,
579553 broadcastCount , rootSyncCount );
580554 }
581555}
0 commit comments