Skip to content

Commit 4fd88cd

Browse files
michaelgptclaude
andcommitted
feat(network): implement real Gossipsub broadcasting (Phase 2 Task 2.1)
Replaces stub broadcast handlers with real gossipsub publishing via SwarmCommand channel. Handlers updated: - BroadcastBlock: Uses SwarmCommand::PublishGossip with priority-based topics - Priority true: "alys/blocks/priority" - Priority false: "alys/blocks" - BroadcastTransaction: Publishes to "alys/transactions" topic - BroadcastAuxPow: Publishes to "alys/auxpow" topic (with validation) Implementation pattern: - Non-blocking command send with try_send - Immediate metrics recording and success response - Spawned async task for handling publish confirmation - Proper error handling and logging throughout Technical details: - Auto-subscription in swarm task if not already subscribed to topic - Active subscription tracking with timestamps - Correlation IDs for AuxPoW broadcast tracing - Validates AuxPoW data format before broadcasting Integration: - Phase 1 test still passes - All handlers use consistent pattern with SwarmCommand channel - Ready for Phase 2 Task 2.2 (request-response codec) Co-Authored-By: Claude <[email protected]>
1 parent 2107bd6 commit 4fd88cd

File tree

1 file changed

+246
-9
lines changed

1 file changed

+246
-9
lines changed

app/src/actors_v2/network/network_actor.rs

Lines changed: 246 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -830,15 +830,179 @@ impl Handler<NetworkMessage> for NetworkActor {
830830
}
831831

832832
NetworkMessage::BroadcastBlock { block_data, priority } => {
833-
// TODO: Phase 2 Task 2.1 - Implement via SwarmCommand channel
834-
tracing::warn!("BroadcastBlock not yet implemented with SwarmCommand - Phase 2 Task 2.1");
835-
Ok(NetworkResponse::Broadcasted { message_id: "stub".to_string() })
833+
// Phase 2 Task 2.1: Real gossipsub broadcasting via SwarmCommand channel
834+
835+
// Validate network is running
836+
if !self.is_running {
837+
tracing::error!("Network not running, cannot broadcast block");
838+
return Err(NetworkError::NotStarted);
839+
}
840+
841+
// Get command channel
842+
let cmd_tx = match self.swarm_cmd_tx.as_ref() {
843+
Some(tx) => tx.clone(),
844+
None => {
845+
tracing::error!("Swarm command channel not available");
846+
return Err(NetworkError::Internal("Command channel not available".to_string()));
847+
}
848+
};
849+
850+
let topic = if priority {
851+
"alys/blocks/priority".to_string()
852+
} else {
853+
"alys/blocks".to_string()
854+
};
855+
856+
let data_len = block_data.len();
857+
858+
tracing::debug!(
859+
topic = %topic,
860+
size = data_len,
861+
priority = priority,
862+
"Broadcasting block via gossipsub"
863+
);
864+
865+
// Create oneshot channel for response
866+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
867+
868+
// Send publish command (non-blocking)
869+
let cmd = SwarmCommand::PublishGossip {
870+
topic: topic.clone(),
871+
data: block_data,
872+
response_tx,
873+
};
874+
875+
match cmd_tx.try_send(cmd) {
876+
Ok(_) => {
877+
// Update metrics immediately
878+
self.metrics.record_message_sent(data_len);
879+
self.metrics.record_gossip_published();
880+
self.active_subscriptions.insert(topic.clone(), Instant::now());
881+
882+
// Spawn task to handle async response
883+
tokio::spawn(async move {
884+
match response_rx.await {
885+
Ok(Ok(message_id)) => {
886+
tracing::info!(
887+
message_id = %message_id,
888+
topic = %topic,
889+
"Block broadcast successful"
890+
);
891+
}
892+
Ok(Err(e)) => {
893+
tracing::error!(
894+
topic = %topic,
895+
error = %e,
896+
"Block broadcast failed"
897+
);
898+
}
899+
Err(_) => {
900+
tracing::error!(
901+
topic = %topic,
902+
"Block broadcast response channel closed"
903+
);
904+
}
905+
}
906+
});
907+
908+
// Return immediately with pending status
909+
Ok(NetworkResponse::Broadcasted {
910+
message_id: format!("broadcast-{}", uuid::Uuid::new_v4())
911+
})
912+
}
913+
Err(e) => {
914+
tracing::error!(
915+
error = ?e,
916+
"Failed to send broadcast command"
917+
);
918+
Err(NetworkError::Internal(format!("Failed to send command: {}", e)))
919+
}
920+
}
836921
}
837922

838923
NetworkMessage::BroadcastTransaction { tx_data } => {
839-
// TODO: Phase 2 Task 2.1 - Implement via SwarmCommand channel
840-
tracing::warn!("BroadcastTransaction not yet implemented with SwarmCommand - Phase 2 Task 2.1");
841-
Ok(NetworkResponse::Broadcasted { message_id: "stub".to_string() })
924+
// Phase 2 Task 2.1: Real gossipsub broadcasting via SwarmCommand channel
925+
926+
// Validate network is running
927+
if !self.is_running {
928+
tracing::error!("Network not running, cannot broadcast transaction");
929+
return Err(NetworkError::NotStarted);
930+
}
931+
932+
// Get command channel
933+
let cmd_tx = match self.swarm_cmd_tx.as_ref() {
934+
Some(tx) => tx.clone(),
935+
None => {
936+
tracing::error!("Swarm command channel not available");
937+
return Err(NetworkError::Internal("Command channel not available".to_string()));
938+
}
939+
};
940+
941+
let topic = "alys/transactions".to_string();
942+
let data_len = tx_data.len();
943+
944+
tracing::debug!(
945+
topic = %topic,
946+
size = data_len,
947+
"Broadcasting transaction via gossipsub"
948+
);
949+
950+
// Create oneshot channel for response
951+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
952+
953+
// Send publish command (non-blocking)
954+
let cmd = SwarmCommand::PublishGossip {
955+
topic: topic.clone(),
956+
data: tx_data,
957+
response_tx,
958+
};
959+
960+
match cmd_tx.try_send(cmd) {
961+
Ok(_) => {
962+
// Update metrics immediately
963+
self.metrics.record_message_sent(data_len);
964+
self.metrics.record_gossip_published();
965+
self.active_subscriptions.insert(topic.clone(), Instant::now());
966+
967+
// Spawn task to handle async response
968+
tokio::spawn(async move {
969+
match response_rx.await {
970+
Ok(Ok(message_id)) => {
971+
tracing::info!(
972+
message_id = %message_id,
973+
topic = %topic,
974+
"Transaction broadcast successful"
975+
);
976+
}
977+
Ok(Err(e)) => {
978+
tracing::error!(
979+
topic = %topic,
980+
error = %e,
981+
"Transaction broadcast failed"
982+
);
983+
}
984+
Err(_) => {
985+
tracing::error!(
986+
topic = %topic,
987+
"Transaction broadcast response channel closed"
988+
);
989+
}
990+
}
991+
});
992+
993+
// Return immediately with pending status
994+
Ok(NetworkResponse::Broadcasted {
995+
message_id: format!("broadcast-{}", uuid::Uuid::new_v4())
996+
})
997+
}
998+
Err(e) => {
999+
tracing::error!(
1000+
error = ?e,
1001+
"Failed to send broadcast command"
1002+
);
1003+
Err(NetworkError::Internal(format!("Failed to send command: {}", e)))
1004+
}
1005+
}
8421006
}
8431007

8441008
NetworkMessage::ConnectToPeer { peer_addr } => {
@@ -952,9 +1116,82 @@ impl Handler<NetworkMessage> for NetworkActor {
9521116
return Err(NetworkError::Protocol(format!("Invalid AuxPoW format: {}", e)));
9531117
}
9541118

955-
// TODO: Phase 2 Task 2.1 - Implement via SwarmCommand channel
956-
tracing::warn!("AuxPoW broadcast not yet implemented with SwarmCommand - Phase 2 Task 2.1");
957-
Ok(NetworkResponse::AuxPowBroadcasted { peer_count })
1119+
// Phase 2 Task 2.1: Real gossipsub broadcasting via SwarmCommand channel
1120+
1121+
// Get command channel
1122+
let cmd_tx = match self.swarm_cmd_tx.as_ref() {
1123+
Some(tx) => tx.clone(),
1124+
None => {
1125+
tracing::error!(correlation_id = %correlation_id, "Swarm command channel not available");
1126+
return Err(NetworkError::Internal("Command channel not available".to_string()));
1127+
}
1128+
};
1129+
1130+
let topic = "alys/auxpow".to_string();
1131+
1132+
tracing::debug!(
1133+
correlation_id = %correlation_id,
1134+
topic = %topic,
1135+
peer_count = peer_count,
1136+
"Broadcasting AuxPoW via gossipsub"
1137+
);
1138+
1139+
// Create oneshot channel for response
1140+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1141+
1142+
// Send publish command (non-blocking)
1143+
let cmd = SwarmCommand::PublishGossip {
1144+
topic: topic.clone(),
1145+
data: auxpow_data,
1146+
response_tx,
1147+
};
1148+
1149+
match cmd_tx.try_send(cmd) {
1150+
Ok(_) => {
1151+
// Update active subscriptions
1152+
self.active_subscriptions.insert(topic.clone(), Instant::now());
1153+
1154+
// Spawn task to handle async response
1155+
tokio::spawn(async move {
1156+
match response_rx.await {
1157+
Ok(Ok(message_id)) => {
1158+
tracing::info!(
1159+
correlation_id = %correlation_id,
1160+
message_id = %message_id,
1161+
topic = %topic,
1162+
"AuxPoW broadcast successful"
1163+
);
1164+
}
1165+
Ok(Err(e)) => {
1166+
tracing::error!(
1167+
correlation_id = %correlation_id,
1168+
topic = %topic,
1169+
error = %e,
1170+
"AuxPoW broadcast failed"
1171+
);
1172+
}
1173+
Err(_) => {
1174+
tracing::error!(
1175+
correlation_id = %correlation_id,
1176+
topic = %topic,
1177+
"AuxPoW broadcast response channel closed"
1178+
);
1179+
}
1180+
}
1181+
});
1182+
1183+
// Return immediately with success
1184+
Ok(NetworkResponse::AuxPowBroadcasted { peer_count })
1185+
}
1186+
Err(e) => {
1187+
tracing::error!(
1188+
correlation_id = %correlation_id,
1189+
error = ?e,
1190+
"Failed to send AuxPoW broadcast command"
1191+
);
1192+
Err(NetworkError::Internal(format!("Failed to send command: {}", e)))
1193+
}
1194+
}
9581195
}
9591196

9601197
NetworkMessage::RequestBlocks { start_height, count, correlation_id } => {

0 commit comments

Comments
 (0)