Skip to content

Commit 19de1cc

Browse files
committed
Fix notification forwarding to avoid duplicate messages and handle lag (#4848)
## Motivation The notification forwarding system has two bugs: 1. **Duplicate messages to exporters**: When validators have multiple proxies, each proxy forwards notifications to all exporters. This means exporters receive duplicate notifications (N copies for N proxies), causing redundant processing and potential data inconsistencies. 2. **Receiver lag crashes forwarding**: The broadcast receiver for notifications can lag if the consumer is slower than the producer. When lag occurs, the code treats `RecvError::Lagged` as a fatal error and exits the forwarding loop, stopping all notification forwarding permanently even though the channel is still functional. Both issues reduce system reliability and create operational problems that are difficult to diagnose. ## Proposal **Fix duplicate forwarding:** - Add `exporter_forwarded` flag to track whether exporters have been included - Only the first proxy forwards to exporters, subsequent proxies forward to an empty exporter list - Each proxy still forwards to its own validator endpoint **Fix lag handling:** - Replace `while let Ok()` pattern with explicit `match` on receiver result - Handle `RecvError::Lagged(skipped_count)` by logging a warning and continuing - Handle `RecvError::Closed` by logging and breaking (legitimate shutdown) - Only actually lagged/skipped messages trigger the warning, not every message This ensures exporters receive exactly one copy of each notification and forwarding continues even when temporary lag occurs. ## Test Plan 1. Deploy network with multiple proxies (2+) and exporters 2. Verify exporters receive exactly one copy of each notification (not N copies) 3. Generate high notification load to induce receiver lag 4. Verify warning logs appear for lagged messages 5. Confirm notification forwarding continues after lag events 6. Verify no duplicate processing in exporter/indexer ## Release Plan - These changes should be backported to the latest `testnet` branch, then - be released in a validator hotfix.
1 parent 5e39131 commit 19de1cc

File tree

1 file changed

+28
-3
lines changed

1 file changed

+28
-3
lines changed

linera-rpc/src/grpc/server.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use linera_core::{
2020
JoinSetExt as _, TaskHandle,
2121
};
2222
use linera_storage::Storage;
23-
use tokio::sync::oneshot;
23+
use tokio::sync::{broadcast::error::RecvError, oneshot};
2424
use tokio_util::sync::CancellationToken;
2525
use tonic::{transport::Channel, Request, Response, Status};
2626
use tower::{builder::ServiceBuilder, Layer, Service};
@@ -218,17 +218,24 @@ where
218218
)
219219
});
220220

221+
let mut exporter_forwarded = false;
221222
for proxy in &internal_network.proxies {
222223
let receiver = notification_sender.subscribe();
223224
join_set.spawn_task({
224225
info!(
225226
nickname = state.nickname(),
226227
"spawning notifications thread on {} for shard {}", host, shard_id
227228
);
229+
let exporter_addresses = if exporter_forwarded {
230+
vec![]
231+
} else {
232+
exporter_forwarded = true;
233+
internal_network.exporter_addresses()
234+
};
228235
Self::forward_notifications(
229236
state.nickname().to_string(),
230237
proxy.internal_address(&internal_network.protocol),
231-
internal_network.exporter_addresses(),
238+
exporter_addresses,
232239
receiver,
233240
)
234241
});
@@ -305,7 +312,25 @@ where
305312
})
306313
.collect::<Vec<_>>();
307314

308-
while let Ok(notification) = receiver.recv().await {
315+
loop {
316+
let notification = match receiver.recv().await {
317+
Ok(notification) => notification,
318+
Err(RecvError::Lagged(skipped_count)) => {
319+
warn!(
320+
nickname,
321+
skipped_count, "notification receiver lagged, messages were skipped"
322+
);
323+
continue;
324+
}
325+
Err(RecvError::Closed) => {
326+
warn!(
327+
nickname,
328+
"notification channel closed, exiting forwarding loop"
329+
);
330+
break;
331+
}
332+
};
333+
309334
let reason = &notification.reason;
310335
let notification: api::Notification = match notification.clone().try_into() {
311336
Ok(notification) => notification,

0 commit comments

Comments
 (0)