Skip to content

Commit 17a76d2

Browse files
nickmeinholdclaude
andauthored
feat(crdt): Firestore sync_log transport + DualWrite removal (#41) (#126)
* feat(crdt): Firestore sync_log transport + DualWrite removal (#41) Replace fire-and-forget DualWriteGraphRepository with proper CRDT changeset sync via Firestore sync_log subcollection. Each device pushes serialized GraphChangeset docs; other devices pull and merge using existing LWW merge infrastructure. - Add maxHlc computed property to GraphChangeset - Add sync metadata CRUD (getLastSyncedHlc/updateLastSyncedHlc) - Create FirestoreSyncTransport (push/pull/cleanup) - Create CrdtSyncState + CrdtSyncNotifier with periodic sync - Rewire graphRepositoryProvider to always return DriftGraphRepository - Extract seedFromFirestoreIfNeeded for pre-sync user migration - Wire sync lifecycle into NavigationShell (init/resume/pause) - Add sync_log Firestore security rules - Delete DualWriteGraphRepository and its tests Co-Authored-By: Claude <noreply@anthropic.com> * fix(crdt): use maxHlc ordering in pullChangesets query Firestore requires the orderBy field to match the range filter field. The previous query ordered by createdAt but filtered on maxHlc, which would require a composite index in production (fake_cloud_firestore doesn't enforce this). Switching to orderBy('maxHlc') is actually better — HLC strings are lexicographically ordered, so this gives correct causal ordering without needing a composite index. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 6e05196 commit 17a76d2

16 files changed

+1112
-590
lines changed

firestore.rules

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ service cloud.firestore {
2020
match /friends/{friendId} {
2121
allow read, write: if request.auth != null && request.auth.uid == userId;
2222
}
23+
24+
// CRDT sync log: owner reads/writes
25+
match /sync_log/{entryId} {
26+
allow read, write: if request.auth != null && request.auth.uid == userId;
27+
}
2328
}
2429

2530
// --- Wiki groups ---
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import 'package:cloud_firestore/cloud_firestore.dart';
2+
3+
import 'graph_changeset.dart';
4+
5+
/// Transport layer that exchanges [GraphChangeset]s between devices via a
6+
/// Firestore `sync_log` subcollection.
7+
///
8+
/// Each device pushes serialized changesets as documents; other devices pull
9+
/// and merge. The sync_log is per-user and typically has 1-3 devices, so
10+
/// filtering by `nodeId` in Dart (rather than a compound Firestore query)
11+
/// avoids composite index management.
12+
///
13+
/// ```
14+
/// users/{uid}/sync_log/{auto-id}
15+
/// nodeId: String
16+
/// changeset: Map<String, dynamic>
17+
/// maxHlc: String
18+
/// createdAt: Timestamp (server)
19+
/// ```
20+
///
21+
/// This is a plain Dart class (no Riverpod) — testable with
22+
/// `fake_cloud_firestore`.
23+
class FirestoreSyncTransport {
24+
FirestoreSyncTransport({
25+
required FirebaseFirestore firestore,
26+
required String userId,
27+
}) : _syncLog = firestore
28+
.collection('users')
29+
.doc(userId)
30+
.collection('sync_log');
31+
32+
final CollectionReference _syncLog;
33+
34+
/// Pushes a changeset to the sync_log. Skips empty changesets.
35+
///
36+
/// Uses `FieldValue.serverTimestamp()` for `createdAt` so ordering is
37+
/// consistent across devices regardless of clock skew.
38+
Future<void> pushChangeset({
39+
required GraphChangeset changeset,
40+
required String nodeId,
41+
}) async {
42+
if (changeset.isEmpty) return;
43+
44+
await _syncLog.add({
45+
'nodeId': nodeId,
46+
'changeset': changeset.toJson(),
47+
'maxHlc': changeset.maxHlc,
48+
'createdAt': FieldValue.serverTimestamp(),
49+
});
50+
}
51+
52+
/// Pulls changesets from other devices written after [sinceHlc].
53+
///
54+
/// Orders by `maxHlc` (lexicographic HLC ordering) which doubles as the
55+
/// range filter field — this avoids needing a composite Firestore index.
56+
/// Entries from [localNodeId] are excluded in Dart rather than via a
57+
/// compound query (sync_log is per-user, typically 1-3 devices).
58+
Future<List<PulledChangeset>> pullChangesets({
59+
required String sinceHlc,
60+
required String localNodeId,
61+
}) async {
62+
// Order by maxHlc — Firestore requires the orderBy field to match
63+
// the range filter field. HLC strings are lexicographically ordered
64+
// (ISO 8601 + hex counter), so this gives correct causal ordering.
65+
Query query = _syncLog.orderBy('maxHlc');
66+
67+
// Filter by maxHlc > sinceHlc if we have a bookmark.
68+
if (sinceHlc.isNotEmpty) {
69+
query = query.where('maxHlc', isGreaterThan: sinceHlc);
70+
}
71+
72+
final snapshot = await query.get();
73+
74+
final results = <PulledChangeset>[];
75+
for (final doc in snapshot.docs) {
76+
final data = doc.data()! as Map<String, dynamic>;
77+
78+
// Skip our own entries (filtered in Dart, not Firestore).
79+
if (data['nodeId'] == localNodeId) continue;
80+
81+
final changesetJson = data['changeset'] as Map<String, dynamic>;
82+
results.add(PulledChangeset(
83+
changeset: GraphChangeset.fromJson(changesetJson),
84+
maxHlc: data['maxHlc'] as String,
85+
));
86+
}
87+
88+
return results;
89+
}
90+
91+
/// Deletes old sync_log entries with `maxHlc <= beforeHlc`.
92+
///
93+
/// Call periodically to prevent unbounded growth of the sync_log.
94+
/// Safe to call at any time — only affects entries that all devices
95+
/// have already processed (confirmed by their sync metadata).
96+
Future<int> cleanup({required String beforeHlc}) async {
97+
final snapshot = await _syncLog
98+
.where('maxHlc', isLessThanOrEqualTo: beforeHlc)
99+
.get();
100+
101+
if (snapshot.docs.isEmpty) return 0;
102+
103+
final batch = _syncLog.firestore.batch();
104+
for (final doc in snapshot.docs) {
105+
batch.delete(doc.reference);
106+
}
107+
await batch.commit();
108+
109+
return snapshot.docs.length;
110+
}
111+
}
112+
113+
/// A changeset pulled from the sync_log, paired with its `maxHlc` bookmark.
114+
class PulledChangeset {
115+
const PulledChangeset({
116+
required this.changeset,
117+
required this.maxHlc,
118+
});
119+
120+
final GraphChangeset changeset;
121+
final String maxHlc;
122+
}

lib/src/crdt/graph_changeset.dart

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,37 @@ class GraphChangeset {
6565
topics.length +
6666
topicDocuments.length;
6767

68+
/// The lexicographically highest HLC across all rows, or empty string if
69+
/// the changeset is empty.
70+
///
71+
/// Used by the sync transport layer as a bookmark — the Firestore sync_log
72+
/// stores this alongside each changeset so pull queries can filter by
73+
/// `maxHlc > sinceHlc` without deserializing the full changeset.
74+
String get maxHlc {
75+
if (isEmpty) return '';
76+
77+
var best = '';
78+
for (final c in concepts) {
79+
if (c.hlc.compareTo(best) > 0) best = c.hlc;
80+
}
81+
for (final r in relationships) {
82+
if (r.hlc.compareTo(best) > 0) best = r.hlc;
83+
}
84+
for (final q in quizItems) {
85+
if (q.hlc.compareTo(best) > 0) best = q.hlc;
86+
}
87+
for (final d in documents) {
88+
if (d.hlc.compareTo(best) > 0) best = d.hlc;
89+
}
90+
for (final t in topics) {
91+
if (t.hlc.compareTo(best) > 0) best = t.hlc;
92+
}
93+
for (final td in topicDocuments) {
94+
if (td.hlc.compareTo(best) > 0) best = td.hlc;
95+
}
96+
return best;
97+
}
98+
6899
/// Serializes to wire format using SQL column names.
69100
///
70101
/// TypeConverter columns (`tags`, `type`) are serialized through their
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import 'package:meta/meta.dart';
2+
3+
/// Phase of the CRDT sync lifecycle.
4+
enum CrdtSyncPhase { idle, pushing, pulling, merging, error }
5+
6+
/// Immutable state for the CRDT changeset sync process.
7+
///
8+
/// Tracks the current phase, last successful sync time, and aggregate
9+
/// counts for push/pull/merge operations (useful for diagnostics and
10+
/// dashboard display).
11+
@immutable
12+
class CrdtSyncState {
13+
const CrdtSyncState({
14+
this.phase = CrdtSyncPhase.idle,
15+
this.lastSyncedAt,
16+
this.pushedCount = 0,
17+
this.pulledCount = 0,
18+
this.mergedCount = 0,
19+
this.errorMessage = '',
20+
});
21+
22+
static const initial = CrdtSyncState();
23+
24+
final CrdtSyncPhase phase;
25+
26+
/// When the last successful sync completed (null if never synced).
27+
final DateTime? lastSyncedAt;
28+
29+
/// Number of rows pushed in the last sync cycle.
30+
final int pushedCount;
31+
32+
/// Number of changesets pulled in the last sync cycle.
33+
final int pulledCount;
34+
35+
/// Number of rows actually merged (won LWW) in the last sync cycle.
36+
final int mergedCount;
37+
38+
/// Error message from the last failed sync, or empty string.
39+
final String errorMessage;
40+
41+
/// Whether a sync is currently in progress.
42+
bool get isSyncing =>
43+
phase == CrdtSyncPhase.pushing ||
44+
phase == CrdtSyncPhase.pulling ||
45+
phase == CrdtSyncPhase.merging;
46+
47+
CrdtSyncState copyWith({
48+
CrdtSyncPhase? phase,
49+
DateTime? lastSyncedAt,
50+
int? pushedCount,
51+
int? pulledCount,
52+
int? mergedCount,
53+
String? errorMessage,
54+
}) {
55+
return CrdtSyncState(
56+
phase: phase ?? this.phase,
57+
lastSyncedAt: lastSyncedAt ?? this.lastSyncedAt,
58+
pushedCount: pushedCount ?? this.pushedCount,
59+
pulledCount: pulledCount ?? this.pulledCount,
60+
mergedCount: mergedCount ?? this.mergedCount,
61+
errorMessage: errorMessage ?? this.errorMessage,
62+
);
63+
}
64+
}

0 commit comments

Comments
 (0)