Skip to content

Commit baedfb0

Browse files
committed
Week 3 Day 21: Implement CRDT-based conflict resolution and full system validation
1 parent d5ffd92 commit baedfb0

File tree

3 files changed

+482
-7
lines changed

3 files changed

+482
-7
lines changed

mobile_app/lib/data/repositories/incident_repository.dart

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,24 @@ class IncidentRepository {
138138
);
139139
}
140140

141+
int _getIncidentStatePriority(String state) {
142+
switch (state.toUpperCase()) {
143+
case 'RESOLVED':
144+
return 3;
145+
case 'ASSIGNED':
146+
return 2;
147+
case 'PENDING':
148+
return 1;
149+
default:
150+
return 0; // e.g. 'new' or unknown
151+
}
152+
}
153+
141154
/// Handles an incoming P2P incident from the network.
142155
///
143156
/// Uses the incident_id from the envelope payload as the DB primary key
144157
/// to enable DB-level deduplication. If an incident with the same ID already
145-
/// exists, the insert is skipped.
158+
/// exists, the CRDT merge logic is applied.
146159
Future<void> _handleIncomingP2PIncident(domain.IncidentCreateDto dto) async {
147160
// Extract incident_id from envelope metadata (passed through dto.data)
148161
final String incidentId = dto.data?['incident_id'] as String? ??
@@ -154,8 +167,44 @@ class IncidentRepository {
154167
.get();
155168

156169
if (existing.isNotEmpty) {
157-
debugPrint(
158-
'[IncidentRepo] Duplicate incident skipped: $incidentId (already in DB)');
170+
final current = incidentFromDb(existing.first);
171+
final currentPriority = _getIncidentStatePriority(current.status);
172+
final incomingPriority = _getIncidentStatePriority(dto.status);
173+
174+
bool shouldUpdate = false;
175+
if (incomingPriority > currentPriority) {
176+
shouldUpdate = true;
177+
} else if (incomingPriority == currentPriority) {
178+
if (dto.sequenceNum > current.sequenceNum) {
179+
shouldUpdate = true;
180+
}
181+
}
182+
183+
if (shouldUpdate) {
184+
debugPrint(
185+
'[IncidentRepo] CRDT_MERGE_APPLIED: Updating $incidentId state to ${dto.status}');
186+
debugPrint(
187+
'[IncidentRepo] STATE_UPDATED: $incidentId from ${current.status} to ${dto.status}');
188+
debugPrint('[IncidentRepo] CONFLICT_RESOLVED: incoming wins');
189+
190+
await (_db.update(_db.incidents)..where((t) => t.id.equals(incidentId)))
191+
.write(
192+
db.IncidentsCompanion(
193+
statusEnum: Value(dto.status),
194+
sequenceNum: Value(dto.sequenceNum),
195+
updatedAt: Value(DateTime.now()),
196+
lat: Value(dto.lat),
197+
lon: Value(dto.lon),
198+
priority: Value(dto.priority),
199+
type: Value(dto.type),
200+
clientId: Value(dto.clientId),
201+
),
202+
);
203+
} else {
204+
debugPrint(
205+
'[IncidentRepo] CRDT_MERGE_APPLIED: Local state kept for $incidentId');
206+
debugPrint('[IncidentRepo] CONFLICT_RESOLVED: local wins');
207+
}
159208
return;
160209
}
161210

@@ -228,9 +277,9 @@ class IncidentRepository {
228277
lat: (incMap['lat'] as num?)?.toDouble() ?? 0.0,
229278
lon: (incMap['lon'] as num?)?.toDouble() ?? 0.0,
230279
priority: incMap['priority'] as String? ?? 'medium',
231-
status: incMap['status'] as String? ?? 'new',
280+
status: incMap['status'] as String? ?? incMap['state'] as String? ?? 'new',
232281
clientId: incMap['reporter_id'] as String? ?? 'synced_peer',
233-
sequenceNum: 1,
282+
sequenceNum: incMap['clock'] as int? ?? 1,
234283
data: {'incident_id': incidentId},
235284
);
236285

mobile_app/lib/services/p2p_service.dart

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,9 +415,9 @@ class P2PService {
415415
lat: (payload['lat'] as num?)?.toDouble() ?? 0.0,
416416
lon: (payload['lon'] as num?)?.toDouble() ?? 0.0,
417417
priority: payload['priority'] as String? ?? 'medium',
418-
status: 'new',
418+
status: payload['status'] as String? ?? payload['state'] as String? ?? 'new',
419419
clientId: payload['device_id'] as String? ?? envelope.originPeer,
420-
sequenceNum: 1,
420+
sequenceNum: payload['clock'] as int? ?? envelope.clock,
421421
data: {
422422
'msg_id': envelope.msgId,
423423
'origin_peer': envelope.originPeer,
@@ -478,8 +478,10 @@ class P2PService {
478478
'lon': incident.lon,
479479
'priority': incident.priority,
480480
'status': incident.status,
481+
'state': incident.status,
481482
'reporter_id': incident.reporterId,
482483
'timestamp': incident.updatedAt.millisecondsSinceEpoch ~/ 1000,
484+
'clock': incident.sequenceNum,
483485
}).toList();
484486

485487
final envelope = NetworkEnvelope(
@@ -580,8 +582,11 @@ class P2PService {
580582
'lat': incident.lat,
581583
'lon': incident.lon,
582584
'priority': incident.priority,
585+
'status': incident.status,
586+
'state': incident.status,
583587
'timestamp': incident.updatedAt.millisecondsSinceEpoch ~/ 1000,
584588
'device_id': incident.reporterId,
589+
'clock': incident.sequenceNum,
585590
},
586591
);
587592

0 commit comments

Comments
 (0)