88Note: This is an OPTIONAL component. Second Brain works perfectly without Cipher.
99"""
1010
11- import asyncio
12- import json
1311from datetime import datetime
14- from typing import List , Dict , Any , Optional
12+ from typing import Any , Dict , List , Optional
13+
1514import httpx
16- from pathlib import Path
1715
16+ from app .core .logging import get_logger
1817from app .interfaces .sync_provider import (
19- ISyncProvider ,
20- SyncProviderConfig ,
21- SyncResult ,
22- SyncStatus ,
18+ ConflictResolution ,
2319 HealthStatus ,
24- ConflictResolution
20+ ISyncProvider ,
21+ SyncProviderConfig ,
22+ SyncResult ,
23+ SyncStatus ,
2524)
26- from app .core .logging import get_logger
27- from app .models .memory import Memory
2825
2926logger = get_logger (__name__ )
3027
@@ -41,130 +38,130 @@ class CipherAdapter(ISyncProvider):
4138
4239 This adapter enables bi-directional sync between Second Brain and Cipher.
4340 """
44-
41+
4542 def __init__ (self , config : SyncProviderConfig ):
4643 super ().__init__ (config )
4744 self .client : Optional [httpx .AsyncClient ] = None
4845 self .mcp_socket : Optional [Any ] = None # MCP WebSocket connection
49-
46+
5047 # Cipher-specific settings
5148 self .cipher_url = config .custom_settings .get (
52- "cipher_url" ,
49+ "cipher_url" ,
5350 "http://localhost:3000" # Default Cipher server
5451 )
5552 self .api_key = config .custom_settings .get ("api_key" )
5653 self .workspace_id = config .custom_settings .get ("workspace_id" )
5754 self .enable_mcp = config .custom_settings .get ("enable_mcp" , True )
58-
55+
5956 @property
6057 def name (self ) -> str :
6158 return "cipher"
62-
59+
6360 async def connect (self ) -> None :
6461 """Connect to Cipher server"""
6562 try :
6663 # Create HTTP client
6764 headers = {}
6865 if self .api_key :
6966 headers ["Authorization" ] = f"Bearer { self .api_key } "
70-
67+
7168 self .client = httpx .AsyncClient (
7269 base_url = self .cipher_url ,
7370 headers = headers ,
7471 timeout = self .config .timeout
7572 )
76-
73+
7774 # Test connection
7875 response = await self .client .get ("/health" )
7976 response .raise_for_status ()
80-
77+
8178 # Connect to MCP WebSocket if enabled
8279 if self .enable_mcp :
8380 await self ._connect_mcp ()
84-
81+
8582 self ._connected = True
8683 logger .info (f"Connected to Cipher at { self .cipher_url } " )
87-
84+
8885 except Exception as e :
8986 logger .error (f"Failed to connect to Cipher: { e } " )
9087 raise ConnectionError (f"Cannot connect to Cipher: { e } " )
91-
88+
9289 async def _connect_mcp (self ) -> None :
9390 """Connect to Cipher's MCP WebSocket for real-time sync"""
9491 # MCP connection implementation would go here
9592 # This is a placeholder for the actual MCP protocol implementation
9693 logger .info ("MCP connection initialized (placeholder)" )
97-
94+
9895 async def disconnect (self ) -> None :
9996 """Disconnect from Cipher"""
10097 if self .client :
10198 await self .client .aclose ()
10299 self .client = None
103-
100+
104101 if self .mcp_socket :
105102 # Close MCP WebSocket
106103 pass
107-
104+
108105 self ._connected = False
109106 logger .info ("Disconnected from Cipher" )
110-
107+
111108 async def push_memory (self , memory : Dict [str , Any ]) -> bool :
112109 """Push a single memory to Cipher"""
113110 if not self .should_sync (memory ):
114111 return True
115-
112+
116113 try :
117114 # Transform to Cipher format
118115 cipher_memory = self ._to_cipher_format (memory )
119-
116+
120117 # Determine which system (1 or 2) based on memory type
121118 endpoint = "/api/memories"
122119 if memory .get ("type" ) == "reasoning" :
123120 endpoint = "/api/reasoning"
124-
121+
125122 response = await self .client .post (
126123 endpoint ,
127124 json = cipher_memory
128125 )
129126 response .raise_for_status ()
130-
127+
131128 return True
132-
129+
133130 except Exception as e :
134131 logger .error (f"Failed to push memory to Cipher: { e } " )
135132 return False
136-
133+
137134 async def push_batch (self , memories : List [Dict [str , Any ]]) -> SyncResult :
138135 """Push multiple memories to Cipher"""
139136 result = SyncResult (status = SyncStatus .SUCCESS )
140-
137+
141138 # Filter memories
142139 memories_to_sync = [m for m in memories if self .should_sync (m )]
143-
140+
144141 # Split into batches
145142 for i in range (0 , len (memories_to_sync ), self .config .batch_size ):
146143 batch = memories_to_sync [i :i + self .config .batch_size ]
147-
144+
148145 try :
149146 # Transform batch
150147 cipher_batch = [self ._to_cipher_format (m ) for m in batch ]
151-
148+
152149 # Bulk upload
153150 response = await self .client .post (
154151 "/api/memories/bulk" ,
155152 json = {"memories" : cipher_batch }
156153 )
157154 response .raise_for_status ()
158-
155+
159156 result .pushed += len (batch )
160-
157+
161158 except Exception as e :
162159 logger .error (f"Failed to push batch to Cipher: { e } " )
163160 result .errors .append (str (e ))
164161 result .status = SyncStatus .PARTIAL
165-
162+
166163 return result
167-
164+
168165 async def pull_changes (self , since : Optional [datetime ] = None ) -> List [Dict [str , Any ]]:
169166 """Pull changes from Cipher"""
170167 try :
@@ -173,30 +170,30 @@ async def pull_changes(self, since: Optional[datetime] = None) -> List[Dict[str,
173170 params ["since" ] = since .isoformat ()
174171 if self .workspace_id :
175172 params ["workspace_id" ] = self .workspace_id
176-
173+
177174 response = await self .client .get (
178175 "/api/memories/changes" ,
179176 params = params
180177 )
181178 response .raise_for_status ()
182-
179+
183180 cipher_memories = response .json ().get ("memories" , [])
184-
181+
185182 # Transform from Cipher format
186183 return [self ._from_cipher_format (m ) for m in cipher_memories ]
187-
184+
188185 except Exception as e :
189186 logger .error (f"Failed to pull changes from Cipher: { e } " )
190187 return []
191-
188+
192189 async def resolve_conflict (
193- self ,
194- local : Dict [str , Any ],
190+ self ,
191+ local : Dict [str , Any ],
195192 remote : Dict [str , Any ]
196193 ) -> Dict [str , Any ]:
197194 """Resolve conflicts between local and remote memories"""
198195 strategy = self .config .conflict_resolution
199-
196+
200197 if strategy == ConflictResolution .LOCAL_WINS :
201198 return local
202199 elif strategy == ConflictResolution .REMOTE_WINS :
@@ -211,22 +208,22 @@ async def resolve_conflict(
211208 return local if local_updated >= remote_updated else remote
212209 else :
213210 # Manual resolution - for now, default to local
214- logger .warning (f "Manual conflict resolution not implemented, using local" )
211+ logger .warning ("Manual conflict resolution not implemented, using local" )
215212 return local
216-
213+
217214 async def health_check (self ) -> HealthStatus :
218215 """Check Cipher connection health"""
219216 if not self ._connected :
220217 return HealthStatus (
221218 healthy = False ,
222219 error_message = "Not connected"
223220 )
224-
221+
225222 try :
226223 start = datetime .now ()
227224 response = await self .client .get ("/health" )
228225 latency_ms = (datetime .now () - start ).total_seconds () * 1000
229-
226+
230227 if response .status_code == 200 :
231228 data = response .json ()
232229 return HealthStatus (
@@ -240,13 +237,13 @@ async def health_check(self) -> HealthStatus:
240237 healthy = False ,
241238 error_message = f"Health check failed: { response .status_code } "
242239 )
243-
240+
244241 except Exception as e :
245242 return HealthStatus (
246243 healthy = False ,
247244 error_message = f"Health check error: { str (e )} "
248245 )
249-
246+
250247 def _to_cipher_format (self , memory : Dict [str , Any ]) -> Dict [str , Any ]:
251248 """Convert Second Brain memory to Cipher format"""
252249 # Map our format to Cipher's expected format
@@ -262,22 +259,22 @@ def _to_cipher_format(self, memory: Dict[str, Any]) -> Dict[str, Any]:
262259 "updated_at" : memory .get ("updated_at" ),
263260 }
264261 }
265-
262+
266263 # Add embedding if available
267264 if "embedding" in memory :
268265 cipher_memory ["embedding" ] = memory ["embedding" ]
269-
266+
270267 # Add workspace ID if configured
271268 if self .workspace_id :
272269 cipher_memory ["workspace_id" ] = self .workspace_id
273-
270+
274271 # Map specific fields based on memory type
275272 if memory .get ("type" ) == "code" :
276273 cipher_memory ["language" ] = memory .get ("metadata" , {}).get ("language" , "unknown" )
277274 cipher_memory ["file_path" ] = memory .get ("metadata" , {}).get ("file_path" )
278-
275+
279276 return cipher_memory
280-
277+
281278 def _from_cipher_format (self , cipher_memory : Dict [str , Any ]) -> Dict [str , Any ]:
282279 """Convert Cipher memory to Second Brain format"""
283280 # Map Cipher format to our format
@@ -291,17 +288,17 @@ def _from_cipher_format(self, cipher_memory: Dict[str, Any]) -> Dict[str, Any]:
291288 "synced_at" : datetime .now ().isoformat (),
292289 }
293290 }
294-
291+
295292 # Preserve timestamps if available
296293 if "created_at" in cipher_memory :
297294 memory ["created_at" ] = cipher_memory ["created_at" ]
298295 if "updated_at" in cipher_memory :
299296 memory ["updated_at" ] = cipher_memory ["updated_at" ]
300-
297+
301298 # Add embedding if available
302299 if "embedding" in cipher_memory :
303300 memory ["embedding" ] = cipher_memory ["embedding" ]
304-
301+
305302 return memory
306303
307304
@@ -337,4 +334,4 @@ def create_cipher_config(
337334 "workspace_id" : workspace_id ,
338335 "enable_mcp" : enable_mcp
339336 }
340- )
337+ )
0 commit comments