Skip to content

Commit b1b864f

Browse files
MementoRCclaude
andcommitted
feat: implement Task 19 - Implement Real-Time Synchronization
- Created comprehensive synchronization system for bi-directional sync - Implemented SyncManager with full, incremental, and selective sync modes - Built ConflictResolver with vector clock-based conflict detection - Added SyncQueue for offline mode with priority-based queuing - Integrated WebSocket support for real-time updates - Created conflict resolution strategies (local wins, server wins, merge, manual) - Added progress monitoring and sync status reporting - Implemented retry logic and failure handling - Built vector clock system for concurrent modification detection Key Components: - SyncManager: Main orchestrator for bi-directional synchronization - ConflictResolver: Intelligent conflict detection and resolution - SyncQueue: Offline-capable queue with priority and retry logic - Vector clocks for distributed conflict detection - WebSocket integration for real-time pattern updates - Comprehensive error handling and logging ✅ Quality: All critical lint checks passing, zero F,E9 violations ✅ Tests: Core sync functionality verified with unit tests 📋 TaskMaster: Task 19 marked complete (18/25 tasks done - 72% progress) 🎯 Next: Task 20 - Create Team Management and Access Control 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent fbe0c13 commit b1b864f

File tree

5 files changed

+1337
-0
lines changed

5 files changed

+1337
-0
lines changed

src/uckn/sync/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""
2+
UCKN Real-Time Synchronization System
3+
Provides bi-directional sync between local and server knowledge stores.
4+
"""
5+
6+
from .sync_manager import SyncManager
7+
from .conflict_resolver import ConflictResolver
8+
from .sync_queue import SyncQueue
9+
10+
__all__ = ["SyncManager", "ConflictResolver", "SyncQueue"]

src/uckn/sync/conflict_resolver.py

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
"""
2+
UCKN Conflict Resolution System
3+
Handles conflict detection and resolution for pattern synchronization.
4+
"""
5+
6+
import logging
7+
from datetime import datetime
8+
from typing import Dict, Any, Optional
9+
from enum import Enum
10+
11+
12+
class ConflictType(Enum):
13+
"""Types of synchronization conflicts."""
14+
CONCURRENT_EDIT = "concurrent_edit"
15+
VERSION_MISMATCH = "version_mismatch"
16+
SCHEMA_CONFLICT = "schema_conflict"
17+
CONTENT_CONFLICT = "content_conflict"
18+
19+
20+
class ResolutionStrategy(Enum):
21+
"""Conflict resolution strategies."""
22+
LOCAL_WINS = "local_wins"
23+
SERVER_WINS = "server_wins"
24+
MERGE = "merge"
25+
MANUAL = "manual"
26+
NEWEST_WINS = "newest_wins"
27+
28+
29+
class ConflictResolver:
30+
"""
31+
Handles conflict detection and resolution for pattern synchronization.
32+
33+
Features:
34+
- Vector clock-based conflict detection
35+
- Multiple resolution strategies
36+
- Content-aware merging
37+
- Interactive conflict resolution
38+
"""
39+
40+
def __init__(self):
41+
self.logger = logging.getLogger(__name__)
42+
self.default_strategy = ResolutionStrategy.MANUAL
43+
44+
def detect_conflict(
45+
self,
46+
local_pattern: Dict[str, Any],
47+
server_pattern: Dict[str, Any]
48+
) -> Optional[Dict[str, Any]]:
49+
"""
50+
Detect conflicts between local and server patterns.
51+
52+
Args:
53+
local_pattern: Local version of the pattern
54+
server_pattern: Server version of the pattern
55+
56+
Returns:
57+
Conflict description if conflict detected, None otherwise
58+
"""
59+
if not local_pattern or not server_pattern:
60+
return None
61+
62+
# Check vector clocks
63+
local_clock = local_pattern.get("vector_clock", {})
64+
server_clock = server_pattern.get("vector_clock", {})
65+
66+
if self._is_concurrent_modification(local_clock, server_clock):
67+
conflict_type = self._determine_conflict_type(local_pattern, server_pattern)
68+
69+
return {
70+
"type": conflict_type.value,
71+
"pattern_id": local_pattern.get("id"),
72+
"local_version": local_pattern,
73+
"server_version": server_pattern,
74+
"local_clock": local_clock,
75+
"server_clock": server_clock,
76+
"detected_at": datetime.now().isoformat()
77+
}
78+
79+
return None
80+
81+
def _is_concurrent_modification(
82+
self,
83+
clock1: Dict[str, int],
84+
clock2: Dict[str, int]
85+
) -> bool:
86+
"""Check if two vector clocks indicate concurrent modifications."""
87+
# Two clocks are concurrent if neither dominates the other
88+
clock1_dominates = all(clock1.get(k, 0) >= v for k, v in clock2.items())
89+
clock2_dominates = all(clock2.get(k, 0) >= v for k, v in clock1.items())
90+
91+
# If neither dominates, it's a concurrent modification
92+
return not (clock1_dominates or clock2_dominates)
93+
94+
def _determine_conflict_type(
95+
self,
96+
local_pattern: Dict[str, Any],
97+
server_pattern: Dict[str, Any]
98+
) -> ConflictType:
99+
"""Determine the type of conflict based on pattern differences."""
100+
local_content = local_pattern.get("document", "")
101+
server_content = server_pattern.get("document", "")
102+
103+
local_meta = local_pattern.get("metadata", {})
104+
server_meta = server_pattern.get("metadata", {})
105+
106+
# Check for content conflicts
107+
if local_content != server_content:
108+
return ConflictType.CONTENT_CONFLICT
109+
110+
# Check for metadata/schema conflicts
111+
if set(local_meta.keys()) != set(server_meta.keys()):
112+
return ConflictType.SCHEMA_CONFLICT
113+
114+
# Check for value conflicts in metadata
115+
for key in local_meta:
116+
if local_meta[key] != server_meta.get(key):
117+
return ConflictType.CONCURRENT_EDIT
118+
119+
return ConflictType.VERSION_MISMATCH
120+
121+
def resolve_conflict(
122+
self,
123+
conflict: Dict[str, Any],
124+
strategy: Optional[ResolutionStrategy] = None
125+
) -> Dict[str, Any]:
126+
"""
127+
Resolve a conflict using the specified strategy.
128+
129+
Args:
130+
conflict: Conflict description from detect_conflict
131+
strategy: Resolution strategy to use
132+
133+
Returns:
134+
Resolution result with resolved pattern
135+
"""
136+
strategy = strategy or self.default_strategy
137+
138+
try:
139+
if strategy == ResolutionStrategy.LOCAL_WINS:
140+
return self._resolve_local_wins(conflict)
141+
elif strategy == ResolutionStrategy.SERVER_WINS:
142+
return self._resolve_server_wins(conflict)
143+
elif strategy == ResolutionStrategy.NEWEST_WINS:
144+
return self._resolve_newest_wins(conflict)
145+
elif strategy == ResolutionStrategy.MERGE:
146+
return self._resolve_merge(conflict)
147+
else: # MANUAL
148+
return self._resolve_manual(conflict)
149+
150+
except Exception as e:
151+
self.logger.error(f"Error resolving conflict: {e}")
152+
return {
153+
"success": False,
154+
"error": str(e),
155+
"conflict": conflict
156+
}
157+
158+
def _resolve_local_wins(self, conflict: Dict[str, Any]) -> Dict[str, Any]:
159+
"""Resolve conflict by keeping local version."""
160+
local_pattern = conflict["local_version"]
161+
162+
# Update vector clock to indicate resolution
163+
new_clock = self._merge_vector_clocks(
164+
conflict["local_clock"],
165+
conflict["server_clock"]
166+
)
167+
168+
resolved_pattern = {
169+
**local_pattern,
170+
"vector_clock": new_clock,
171+
"resolved_at": datetime.now().isoformat(),
172+
"resolution_strategy": "local_wins"
173+
}
174+
175+
return {
176+
"success": True,
177+
"strategy": "local_wins",
178+
"resolved_pattern": resolved_pattern
179+
}
180+
181+
def _resolve_server_wins(self, conflict: Dict[str, Any]) -> Dict[str, Any]:
182+
"""Resolve conflict by keeping server version."""
183+
server_pattern = conflict["server_version"]
184+
185+
# Update vector clock
186+
new_clock = self._merge_vector_clocks(
187+
conflict["local_clock"],
188+
conflict["server_clock"]
189+
)
190+
191+
resolved_pattern = {
192+
**server_pattern,
193+
"vector_clock": new_clock,
194+
"resolved_at": datetime.now().isoformat(),
195+
"resolution_strategy": "server_wins"
196+
}
197+
198+
return {
199+
"success": True,
200+
"strategy": "server_wins",
201+
"resolved_pattern": resolved_pattern
202+
}
203+
204+
def _resolve_newest_wins(self, conflict: Dict[str, Any]) -> Dict[str, Any]:
205+
"""Resolve conflict by keeping the newest version."""
206+
local_pattern = conflict["local_version"]
207+
server_pattern = conflict["server_version"]
208+
209+
# Compare timestamps
210+
local_time = local_pattern.get("updated_at")
211+
server_time = server_pattern.get("updated_at")
212+
213+
if not local_time or not server_time:
214+
# Fall back to local wins if timestamps unavailable
215+
return self._resolve_local_wins(conflict)
216+
217+
try:
218+
local_dt = datetime.fromisoformat(local_time.replace('Z', '+00:00'))
219+
server_dt = datetime.fromisoformat(server_time.replace('Z', '+00:00'))
220+
221+
if local_dt >= server_dt:
222+
return self._resolve_local_wins(conflict)
223+
else:
224+
return self._resolve_server_wins(conflict)
225+
226+
except Exception:
227+
# Fall back to local wins if timestamp parsing fails
228+
return self._resolve_local_wins(conflict)
229+
230+
def _resolve_merge(self, conflict: Dict[str, Any]) -> Dict[str, Any]:
231+
"""Resolve conflict by merging local and server versions."""
232+
local_pattern = conflict["local_version"]
233+
server_pattern = conflict["server_version"]
234+
235+
try:
236+
# Merge metadata (server values take precedence for conflicts)
237+
merged_metadata = {**local_pattern.get("metadata", {})}
238+
merged_metadata.update(server_pattern.get("metadata", {}))
239+
240+
# For document content, prefer the longer version
241+
local_doc = local_pattern.get("document", "")
242+
server_doc = server_pattern.get("document", "")
243+
244+
merged_doc = local_doc if len(local_doc) > len(server_doc) else server_doc
245+
246+
# Create merged pattern
247+
merged_pattern = {
248+
"id": local_pattern["id"],
249+
"document": merged_doc,
250+
"metadata": merged_metadata,
251+
"vector_clock": self._merge_vector_clocks(
252+
conflict["local_clock"],
253+
conflict["server_clock"]
254+
),
255+
"resolved_at": datetime.now().isoformat(),
256+
"resolution_strategy": "merge",
257+
"merge_source": "auto_merge"
258+
}
259+
260+
# Keep other fields from local version
261+
for key, value in local_pattern.items():
262+
if key not in merged_pattern:
263+
merged_pattern[key] = value
264+
265+
return {
266+
"success": True,
267+
"strategy": "merge",
268+
"resolved_pattern": merged_pattern
269+
}
270+
271+
except Exception as e:
272+
self.logger.error(f"Error in merge resolution: {e}")
273+
# Fall back to local wins
274+
return self._resolve_local_wins(conflict)
275+
276+
def _resolve_manual(self, conflict: Dict[str, Any]) -> Dict[str, Any]:
277+
"""Return conflict for manual resolution."""
278+
return {
279+
"success": False,
280+
"strategy": "manual",
281+
"requires_manual_resolution": True,
282+
"conflict": conflict,
283+
"resolution_options": [
284+
"local_wins",
285+
"server_wins",
286+
"newest_wins",
287+
"merge"
288+
]
289+
}
290+
291+
def _merge_vector_clocks(
292+
self,
293+
clock1: Dict[str, int],
294+
clock2: Dict[str, int]
295+
) -> Dict[str, int]:
296+
"""Merge two vector clocks by taking the maximum value for each key."""
297+
merged = clock1.copy()
298+
299+
for key, value in clock2.items():
300+
merged[key] = max(merged.get(key, 0), value)
301+
302+
return merged
303+
304+
def suggest_resolution_strategy(
305+
self,
306+
conflict: Dict[str, Any]
307+
) -> ResolutionStrategy:
308+
"""Suggest the best resolution strategy for a conflict."""
309+
conflict_type = ConflictType(conflict.get("type", "concurrent_edit"))
310+
311+
# Strategy suggestions based on conflict type
312+
if conflict_type == ConflictType.VERSION_MISMATCH:
313+
return ResolutionStrategy.NEWEST_WINS
314+
elif conflict_type == ConflictType.SCHEMA_CONFLICT:
315+
return ResolutionStrategy.MANUAL
316+
elif conflict_type == ConflictType.CONTENT_CONFLICT:
317+
# Check if content can be safely merged
318+
local_doc = conflict["local_version"].get("document", "")
319+
server_doc = conflict["server_version"].get("document", "")
320+
321+
if self._can_auto_merge_content(local_doc, server_doc):
322+
return ResolutionStrategy.MERGE
323+
else:
324+
return ResolutionStrategy.MANUAL
325+
else:
326+
return ResolutionStrategy.NEWEST_WINS
327+
328+
def _can_auto_merge_content(self, content1: str, content2: str) -> bool:
329+
"""Check if two content strings can be safely auto-merged."""
330+
# Simple heuristic: if one is a subset of the other, merge is safe
331+
if content1 in content2 or content2 in content1:
332+
return True
333+
334+
# If contents are similar (>80% similarity), merge might be safe
335+
similarity = self._calculate_similarity(content1, content2)
336+
return similarity > 0.8
337+
338+
def _calculate_similarity(self, text1: str, text2: str) -> float:
339+
"""Calculate similarity between two text strings."""
340+
if not text1 or not text2:
341+
return 0.0
342+
343+
# Simple Jaccard similarity on words
344+
words1 = set(text1.lower().split())
345+
words2 = set(text2.lower().split())
346+
347+
intersection = words1 & words2
348+
union = words1 | words2
349+
350+
return len(intersection) / len(union) if union else 0.0

0 commit comments

Comments
 (0)