Skip to content

Commit f5f429d

Browse files
committed
Week 3 Day 17: Implement peer state sync with sync_request/response and late-join consistency
1 parent 1404011 commit f5f429d

5 files changed

Lines changed: 328 additions & 27 deletions

File tree

PROJECT_TIMELINE.txt

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,3 +939,82 @@ Verification:
939939
Flutter analyze → PASS (only pre-existing info-level naming warnings)
940940

941941
------------------------------------------------------------
942+
943+
DAY 17 — Initial State Synchronization Layer
944+
945+
Summary:
946+
Implemented the initial state synchronization layer so that when a new device joins the mesh network, it automatically fetches existing incidents from connected peers. This ensures late-joining devices have a complete view of the current incident landscape without relying on a central server.
947+
948+
Architecture Components Added:
949+
950+
• Peer Connection Hook (Go daemon)
951+
- When a new peer is discovered and connected via mDNS, the daemon
952+
automatically broadcasts a sync_request message to the network.
953+
- PubSubManager is now initialized before mDNS discovery to enable this.
954+
- sync_request is triggered once per peer connection event.
955+
956+
• Sync Request Handling (Flutter)
957+
- P2PService listens for sync_request messages via dedicated stream.
958+
- IncidentRepository responds by fetching all local incidents from the
959+
Drift database and sending them as batched sync_response messages.
960+
961+
• Sync Response Format
962+
- Incidents are sent in batches of 50 per message to avoid network flooding.
963+
- Each sync_response envelope contains:
964+
msg_id, msg_type: "sync_response", origin_peer, payload: { incidents: [...] }
965+
966+
• Sync Response Handling (Flutter)
967+
- P2PService routes sync_response messages to a dedicated stream.
968+
- IncidentRepository parses incident payloads and inserts each one
969+
through the existing _handleIncomingP2PIncident deduplication logic.
970+
- DB-level dedup prevents duplicate insertions.
971+
972+
• Peer Sync State Map
973+
- P2PService maintains a _peerSyncState map (peer_id → sync_completed).
974+
- Prevents responding to duplicate sync_request messages from the same peer.
975+
- Prevents infinite sync loops across the network.
976+
977+
• Deduplication Compatibility
978+
- All sync messages pass through MessageCache (Flutter) and dedupCache (Go).
979+
- sync_response msg_ids are added to the dedup cache before sending.
980+
981+
Message Flow (Late-Join Sync):
982+
Device B connects to mesh via mDNS
983+
984+
Go daemon broadcasts sync_request (SYNC_REQUEST_SENT)
985+
986+
Device A Flutter receives sync_request
987+
988+
IncidentRepository fetches all local incidents
989+
990+
P2PService sends batched sync_response (SYNC_RESPONSE_SENT)
991+
992+
Device B Flutter receives sync_response (SYNC_RESPONSE_RECEIVED)
993+
994+
IncidentRepository merges incidents via dedup logic (INCIDENT_MERGED)
995+
996+
Incidents appear on Device B map
997+
998+
Logging Added:
999+
Go daemon:
1000+
[Discovery] SYNC_REQUEST_SENT
1001+
Flutter:
1002+
[P2P] SYNC_REQUEST_RECEIVED
1003+
[P2P] SYNC_RESPONSE_RECEIVED
1004+
[P2P] SYNC_RESPONSE_SENT
1005+
[IncidentRepo] SYNC_RESPONSE_SENT
1006+
[IncidentRepo] SYNC_RESPONSE_RECEIVED
1007+
[IncidentRepo] INCIDENT_MERGED
1008+
1009+
Files Modified:
1010+
- backend/p2p-node/main.go (reordered PubSub init before Discovery)
1011+
- backend/p2p-node/discovery.go (sync_request broadcast on peer connect)
1012+
- mobile_app/lib/services/p2p_service.dart (sync streams, peer state map, batch response)
1013+
- mobile_app/lib/data/repositories/incident_repository.dart (sync handlers, getAllIncidents)
1014+
- PROJECT_TIMELINE.txt (this update)
1015+
1016+
Verification:
1017+
Go build → PASS
1018+
Flutter analyze → PASS
1019+
1020+
------------------------------------------------------------

backend/p2p-node/discovery.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ const discoveryServiceTag = "openrescue.p2p"
1515

1616
// discoveryNotifee gets notified when we find a new peer via mDNS discovery
1717
type discoveryNotifee struct {
18-
h host.Host
18+
h host.Host
19+
psm *PubSubManager
1920
}
2021

2122
// HandlePeerFound connects to peers discovered via mDNS
@@ -36,12 +37,23 @@ func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {
3637
log.Printf("[Discovery] Failed to connect to peer %s: %s", pi.ID.String(), err)
3738
} else {
3839
log.Printf("[Discovery] Peer connected: %s (addrs: %v)", pi.ID.String(), pi.Addrs)
40+
41+
// Broadcast sync_request to the network
42+
syncReq := NetworkEnvelope{
43+
MsgType: "sync_request",
44+
Payload: map[string]interface{}{},
45+
}
46+
47+
log.Printf("[Discovery] SYNC_REQUEST_SENT: Requesting state sync after connecting to peer %s", pi.ID.String())
48+
if err := n.psm.Broadcast(syncReq); err != nil {
49+
log.Printf("[Discovery] Failed to broadcast sync_request: %v", err)
50+
}
3951
}
4052
}
4153

4254
// setupDiscovery creates an mDNS discovery service
43-
func setupDiscovery(h host.Host) error {
44-
s := mdns.NewMdnsService(h, discoveryServiceTag, &discoveryNotifee{h: h})
55+
func setupDiscovery(h host.Host, psm *PubSubManager) error {
56+
s := mdns.NewMdnsService(h, discoveryServiceTag, &discoveryNotifee{h: h, psm: psm})
4557
if s == nil {
4658
return fmt.Errorf("failed creating mDNS service")
4759
}

backend/p2p-node/main.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,20 @@ func main() {
3131
fmt.Printf("Listening on: %s/p2p/%s\n", addr.String(), host.ID().String())
3232
}
3333

34-
// 2. Setup mDNS discovery
35-
if err := setupDiscovery(host); err != nil {
36-
log.Fatalf("Failed to setup mDNS discovery: %v", err)
37-
}
38-
3934
// Channel for messages received from GossipSub to be sent to WebSocket clients
4035
msgChan := make(chan NetworkEnvelope, 100)
4136

42-
// 3. Setup PubSub (GossipSub)
37+
// 2. Setup PubSub (GossipSub) first so we can pass it to discovery
4338
pubSubManager, err := setupPubSub(ctx, host, msgChan)
4439
if err != nil {
4540
log.Fatalf("Failed to setup PubSub: %v", err)
4641
}
4742

43+
// 3. Setup mDNS discovery
44+
if err := setupDiscovery(host, pubSubManager); err != nil {
45+
log.Fatalf("Failed to setup mDNS discovery: %v", err)
46+
}
47+
4848
// 4. Start HTTP/WS API server on port 7000
4949
apiServer := NewAPIServer(7000, pubSubManager, msgChan)
5050
if err := apiServer.Start(); err != nil {

mobile_app/lib/data/repositories/incident_repository.dart

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,20 @@ class IncidentRepository {
1313
final P2PService _p2pService;
1414

1515
IncidentRepository(this._db, this._apiClient, this._p2pService) {
16+
// Day-16: Listen for incoming incident_create messages
1617
_p2pService.incomingIncidents.listen((dto) {
1718
_handleIncomingP2PIncident(dto);
1819
});
20+
21+
// Day-17: Listen for sync_request messages — respond with local incidents
22+
_p2pService.syncRequests.listen((envelope) {
23+
_handleSyncRequest(envelope);
24+
});
25+
26+
// Day-17: Listen for sync_response messages — merge received incidents
27+
_p2pService.syncResponses.listen((envelope) {
28+
_handleSyncResponse(envelope);
29+
});
1930
}
2031

2132
Stream<List<domain.Incident>> watchIncidents() {
@@ -29,6 +40,14 @@ class IncidentRepository {
2940
return incidentFromDb(row);
3041
}
3142

43+
/// Fetches all non-deleted incidents from the local database.
44+
Future<List<domain.Incident>> getAllIncidents() async {
45+
final rows = await (_db.select(_db.incidents)
46+
..where((t) => t.deleted_flag.equals(false)))
47+
.get();
48+
return rows.map((r) => incidentFromDb(r)).toList();
49+
}
50+
3251
Future<void> createIncident(domain.IncidentCreateDto dto) async {
3352
final localDocId = 'temp_${DateTime.now().millisecondsSinceEpoch}';
3453

@@ -159,4 +178,70 @@ class IncidentRepository {
159178
debugPrint(
160179
'[IncidentRepo] P2P incident inserted: $incidentId');
161180
}
181+
182+
// ─── Day-17: State Synchronization Handlers ────────────────────────────
183+
184+
/// Handles a sync_request: fetches all local incidents and sends them back
185+
/// as batched sync_response messages via P2PService.
186+
Future<void> _handleSyncRequest(dynamic envelope) async {
187+
debugPrint('[IncidentRepo] Processing sync_request — fetching local incidents');
188+
189+
try {
190+
final incidents = await getAllIncidents();
191+
debugPrint(
192+
'[IncidentRepo] Found ${incidents.length} local incidents to sync');
193+
194+
// Send the incidents back as a sync_response via P2PService
195+
await _p2pService.sendSyncResponse(incidents);
196+
197+
debugPrint(
198+
'[IncidentRepo] SYNC_RESPONSE_SENT: ${incidents.length} incidents sent');
199+
} catch (e) {
200+
debugPrint('[IncidentRepo] Failed to handle sync_request: $e');
201+
}
202+
}
203+
204+
/// Handles a sync_response: extracts incidents from the payload and inserts
205+
/// them using the existing deduplication logic.
206+
Future<void> _handleSyncResponse(dynamic envelope) async {
207+
try {
208+
final payload = (envelope as dynamic).payload as Map<String, dynamic>;
209+
final incidentsList = payload['incidents'] as List<dynamic>? ?? [];
210+
211+
debugPrint(
212+
'[IncidentRepo] SYNC_RESPONSE_RECEIVED: ${incidentsList.length} incidents in batch');
213+
214+
int mergedCount = 0;
215+
for (final incidentData in incidentsList) {
216+
final Map<String, dynamic> incMap =
217+
incidentData is Map<String, dynamic>
218+
? incidentData
219+
: Map<String, dynamic>.from(incidentData as Map);
220+
221+
final incidentId =
222+
incMap['incident_id'] as String? ??
223+
'sync_${DateTime.now().millisecondsSinceEpoch}_$mergedCount';
224+
225+
// Use IncidentCreateDto and pass through existing dedup logic
226+
final dto = domain.IncidentCreateDto(
227+
type: incMap['type'] as String? ?? 'unknown',
228+
lat: (incMap['lat'] as num?)?.toDouble() ?? 0.0,
229+
lon: (incMap['lon'] as num?)?.toDouble() ?? 0.0,
230+
priority: incMap['priority'] as String? ?? 'medium',
231+
status: incMap['status'] as String? ?? 'new',
232+
client_id: incMap['reporter_id'] as String? ?? 'synced_peer',
233+
sequence_num: 1,
234+
data: {'incident_id': incidentId},
235+
);
236+
237+
await _handleIncomingP2PIncident(dto);
238+
mergedCount++;
239+
}
240+
241+
debugPrint(
242+
'[IncidentRepo] INCIDENT_MERGED: processed $mergedCount incidents from sync_response');
243+
} catch (e) {
244+
debugPrint('[IncidentRepo] Failed to handle sync_response: $e');
245+
}
246+
}
162247
}

0 commit comments

Comments
 (0)