Skip to content

Commit 21f41f4

Browse files
committed
Week 3 Day 18: Add GossipLog causal ordering, full system validation tests, and timeline update
1 parent b18b9bf commit 21f41f4

3 files changed

Lines changed: 61 additions & 25 deletions

File tree

PROJECT_TIMELINE.txt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,3 +1018,39 @@ Verification:
10181018
Flutter analyze → PASS
10191019

10201020
------------------------------------------------------------
1021+
1022+
DAY 18:
1023+
Date:
1024+
Focus: Causal Message Ordering (GossipLog)
1025+
1026+
Summary:
1027+
Implemented Lamport logical clocks for distributed event ordering and introduced message dependency tracking using `prev_msg_ids`. Built the GossipLog service to store and validate the message DAG. Added a pending queue to handle out-of-order message delivery, resulting in messages being applied only when their dependencies are satisfied. Prevented invalid state transitions in distributed incident updates and established the foundation for causal consistency and future CRDT logic.
1028+
1029+
Tasks Completed:
1030+
- Implemented Lamport logical clocks for distributed event ordering
1031+
- Introduced message dependency tracking using prev_msg_ids
1032+
- Built GossipLog service to store and validate message DAG
1033+
- Added pending queue to handle out-of-order message delivery
1034+
- Ensured messages are applied only when dependencies are satisfied
1035+
- Prevented invalid state transitions in distributed incident updates
1036+
- Established foundation for causal consistency and future CRDT logic
1037+
1038+
Technical Details:
1039+
- Go backend extended with GossipLog struct handling log, pending queue, and HEADS tracking
1040+
- Envelope extended with Clock and PrevMsgIDs
1041+
- Dart frontend mirrored GossipLog logic in GossipLogService
1042+
- P2PService routes causal messages (create, resolve) through GossipLog, while bulk syncing runs parallel
1043+
1044+
Files Created/Modified:
1045+
- backend/p2p-node/gossip_log.go
1046+
- backend/p2p-node/pubsub.go
1047+
- backend/p2p-node/main.go
1048+
- mobile_app/lib/models/network_envelope.dart
1049+
- mobile_app/lib/services/gossip_log_service.dart
1050+
- mobile_app/lib/services/p2p_service.dart
1051+
- mobile_app/test/gossip_log_service_test.dart
1052+
1053+
Notes:
1054+
- Full system verification (Day 15-18) successfully passed, confirming P2P broadcast, deduplication, sync, and causal consistency.
1055+
1056+
------------------------------------------------------------

mobile_app/lib/data/repositories/incident_repository.dart

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class IncidentRepository {
3030
}
3131

3232
Stream<List<domain.Incident>> watchIncidents() {
33-
return (_db.select(_db.incidents)..where((t) => t.deleted_flag.equals(false)))
33+
return (_db.select(_db.incidents)..where((t) => t.deletedFlag.equals(false)))
3434
.watch()
3535
.map((rows) => rows.map((r) => incidentFromDb(r)).toList());
3636
}
@@ -43,7 +43,7 @@ class IncidentRepository {
4343
/// Fetches all non-deleted incidents from the local database.
4444
Future<List<domain.Incident>> getAllIncidents() async {
4545
final rows = await (_db.select(_db.incidents)
46-
..where((t) => t.deleted_flag.equals(false)))
46+
..where((t) => t.deletedFlag.equals(false)))
4747
.get();
4848
return rows.map((r) => incidentFromDb(r)).toList();
4949
}
@@ -54,41 +54,41 @@ class IncidentRepository {
5454
// Broadcast immediately to peers
5555
final incidentToBroadcast = domain.Incident(
5656
id: localDocId,
57-
reporter_id: 'local_user', // This might be pulled from auth/context
57+
reporterId: 'local_user', // This might be pulled from auth/context
5858
type: dto.type,
5959
lat: dto.lat,
6060
lon: dto.lon,
6161
priority: dto.priority,
6262
status: dto.status,
63-
client_id: dto.client_id,
64-
sequence_num: dto.sequence_num,
65-
updated_at: DateTime.now(),
63+
clientId: dto.clientId,
64+
sequenceNum: dto.sequenceNum,
65+
updatedAt: DateTime.now(),
6666
);
6767
_p2pService.broadcastIncident(incidentToBroadcast);
6868

6969
await _db.transaction(() async {
7070
await _db.into(_db.incidents).insert(
7171
db.IncidentsCompanion.insert(
7272
id: localDocId,
73-
reporter_id: 'local_user', // This might be pulled from auth/context
73+
reporterId: 'local_user', // This might be pulled from auth/context
7474
type: dto.type,
7575
lat: dto.lat,
7676
lon: dto.lon,
7777
priority: dto.priority,
78-
status_enum: dto.status,
79-
client_id: dto.client_id,
80-
sequence_num: dto.sequence_num,
81-
updated_at: DateTime.now(),
78+
statusEnum: dto.status,
79+
clientId: dto.clientId,
80+
sequenceNum: dto.sequenceNum,
81+
updatedAt: DateTime.now(),
8282
),
8383
);
8484

8585
await _db.into(_db.syncQueue).insert(
8686
db.SyncQueueCompanion.insert(
87-
entity_type: 'Incident',
88-
entity_id: localDocId,
87+
entityType: 'Incident',
88+
entityId: localDocId,
8989
operation: 'CREATE',
9090
data: jsonEncode(dto.toJson()),
91-
sequence_num: dto.sequence_num,
91+
sequenceNum: dto.sequenceNum,
9292
timestamp: DateTime.now(),
9393
),
9494
);
@@ -100,11 +100,11 @@ class IncidentRepository {
100100
if (pendingChanges.isEmpty) return;
101101

102102
final changes = pendingChanges.map((q) => domain.LocalChange(
103-
entity_type: q.entity_type,
104-
entity_id: q.entity_id,
103+
entityType: q.entityType,
104+
entityId: q.entityId,
105105
operation: q.operation,
106106
data: jsonDecode(q.data),
107-
sequence_num: q.sequence_num,
107+
sequenceNum: q.sequenceNum,
108108
timestamp: q.timestamp,
109109
)).toList();
110110

@@ -163,15 +163,15 @@ class IncidentRepository {
163163
await _db.into(_db.incidents).insert(
164164
db.IncidentsCompanion.insert(
165165
id: incidentId,
166-
reporter_id: dto.client_id,
166+
reporterId: dto.clientId,
167167
type: dto.type,
168168
lat: dto.lat,
169169
lon: dto.lon,
170170
priority: dto.priority,
171-
status_enum: dto.status,
172-
client_id: dto.client_id,
173-
sequence_num: dto.sequence_num,
174-
updated_at: DateTime.now(),
171+
statusEnum: dto.status,
172+
clientId: dto.clientId,
173+
sequenceNum: dto.sequenceNum,
174+
updatedAt: DateTime.now(),
175175
),
176176
);
177177

@@ -229,8 +229,8 @@ class IncidentRepository {
229229
lon: (incMap['lon'] as num?)?.toDouble() ?? 0.0,
230230
priority: incMap['priority'] as String? ?? 'medium',
231231
status: incMap['status'] as String? ?? 'new',
232-
client_id: incMap['reporter_id'] as String? ?? 'synced_peer',
233-
sequence_num: 1,
232+
clientId: incMap['reporter_id'] as String? ?? 'synced_peer',
233+
sequenceNum: 1,
234234
data: {'incident_id': incidentId},
235235
);
236236

mobile_app/lib/widgets/navigation_steps_panel.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class NavigationStepsPanel extends StatelessWidget {
8080
: ListView.separated(
8181
controller: scrollController,
8282
itemCount: steps.length,
83-
separatorBuilder: (_, _a) => const Divider(height: 1),
83+
separatorBuilder: (_, __) => const Divider(height: 1),
8484
itemBuilder: (context, index) {
8585
final step = steps[index];
8686
final distanceStr = step.distance >= 1000

0 commit comments

Comments
 (0)