11"""
22Ethereum to NeoFS/IPFS Event Indexer
3- Listens to ERC-8004 registry events and ensures off-chain storage is synchronized
3+ Listens to ERC-8004 IdentityRegistry events and ensures off-chain storage is synchronized
44"""
55
66import time
1414
1515class EthereumNeoFSIndexer :
1616 """
17- Event indexer that syncs Ethereum registry events to NeoFS/IPFS
18- Ensures content hash verification and storage consistency
17+ Event indexer that syncs Ethereum IdentityRegistry events to NeoFS/IPFS.
18+ Ensures content hash verification and storage consistency.
1919 """
2020
2121 def __init__ (
@@ -28,7 +28,7 @@ def __init__(
2828 self .erc8004_client = erc8004_client
2929 self .storage_client = storage_client
3030 self .w3 = erc8004_client .w3
31- self .agent_registry = erc8004_client .agent_registry
31+ self .identity_registry = erc8004_client .identity_registry
3232 self .from_block = from_block
3333 self .current_block = from_block # Track current block position
3434 self .poll_interval = poll_interval
@@ -72,21 +72,19 @@ def start_indexing(self, block_limit: Optional[int] = None):
7272 print (f"Block limit reached ({ block_limit } ), stopping" )
7373 break
7474 max_blocks_in_batch = min (max_blocks_in_batch , remaining_blocks )
75-
75+
7676 # Process blocks in batches
7777 to_block = min (self .current_block + max_blocks_in_batch - 1 , latest_block )
7878
7979 print (f"Processing blocks { self .current_block } to { to_block } " )
8080
81- # Fetch events
82- self ._process_agent_registered_events (self .current_block , to_block )
83- self ._process_uris_updated_events (self .current_block , to_block )
84- self ._process_capabilities_updated_events (self .current_block , to_block )
81+ # Fetch IdentityRegistry events
82+ self ._process_identity_registered_events (self .current_block , to_block )
8583
8684 # Calculate blocks processed before updating current_block
8785 blocks_in_batch = to_block - self .current_block + 1
8886 blocks_processed += blocks_in_batch
89-
87+
9088 self .current_block = to_block + 1
9189
9290 # Stop if block limit reached
@@ -105,115 +103,12 @@ def stop_indexing(self):
105103 """Stop the indexer"""
106104 self .running = False
107105
108- def _process_agent_registered_events (self , from_block : int , to_block : int ):
109- """Process AgentRegistered events"""
110- try :
111- if not hasattr (self .agent_registry .events , 'AgentRegistered' ):
112- # Event not defined in ABI, skip silently
113- return
114-
115- event_filter = self .agent_registry .events .AgentRegistered .create_filter (
116- fromBlock = from_block ,
117- toBlock = to_block
118- )
119-
120- events = event_filter .get_all_entries ()
121-
122- for event in events :
123- try :
124- did_hash = event ['args' ]['didHash' ]
125- controller = event ['args' ]['controller' ]
126- agent_card_uri = event ['args' ]['agentCardURI' ]
127- did_doc_uri = event ['args' ]['didDocURI' ]
128- block_number = event ['blockNumber' ]
129- tx_hash = event ['transactionHash' ].hex ()
130-
131- print (f"AgentRegistered: DID={ did_hash .hex ()} , Controller={ controller } " )
132-
133- # Verify URIs are accessible
134- self ._verify_and_replicate (did_hash .hex (), agent_card_uri , did_doc_uri )
135-
136- # Call custom handler if registered
137- if 'AgentRegistered' in self .event_handlers :
138- self .event_handlers ['AgentRegistered' ](event )
139-
140- except Exception as e :
141- print (f"Error processing AgentRegistered event: { e } " )
142- except Exception as e :
143- # Silently skip if event is not available (e.g., ABI doesn't include events)
144- pass
145-
146- def _process_uris_updated_events (self , from_block : int , to_block : int ):
147- """Process URIsUpdated events"""
148- try :
149- if not hasattr (self .agent_registry .events , 'URIsUpdated' ):
150- # Event not defined in ABI, skip silently
151- return
152-
153- event_filter = self .agent_registry .events .URIsUpdated .create_filter (
154- fromBlock = from_block ,
155- toBlock = to_block
156- )
157-
158- events = event_filter .get_all_entries ()
159-
160- for event in events :
161- try :
162- did_hash = event ['args' ]['didHash' ]
163- agent_card_uri = event ['args' ]['agentCardURI' ]
164- did_doc_uri = event ['args' ]['didDocURI' ]
165-
166- print (f"URIsUpdated: DID={ did_hash .hex ()} " )
167-
168- # Verify and replicate new URIs
169- self ._verify_and_replicate (did_hash .hex (), agent_card_uri , did_doc_uri )
170-
171- if 'URIsUpdated' in self .event_handlers :
172- self .event_handlers ['URIsUpdated' ](event )
173-
174- except Exception as e :
175- print (f"Error processing URIsUpdated event: { e } " )
176- except Exception as e :
177- # Silently skip if event is not available (e.g., ABI doesn't include events)
178- pass
179-
180- def _process_capabilities_updated_events (self , from_block : int , to_block : int ):
181- """Process CapabilitiesUpdated events"""
182- try :
183- if not hasattr (self .agent_registry .events , 'CapabilitiesUpdated' ):
184- # Event not defined in ABI, skip silently
185- return
186-
187- event_filter = self .agent_registry .events .CapabilitiesUpdated .create_filter (
188- fromBlock = from_block ,
189- toBlock = to_block
190- )
191-
192- events = event_filter .get_all_entries ()
193-
194- for event in events :
195- try :
196- did_hash = event ['args' ]['didHash' ]
197- capabilities = event ['args' ]['capabilities' ]
198-
199- print (f"CapabilitiesUpdated: DID={ did_hash .hex ()} , Caps={ capabilities } " )
200-
201- if 'CapabilitiesUpdated' in self .event_handlers :
202- self .event_handlers ['CapabilitiesUpdated' ](event )
203-
204- except Exception as e :
205- print (f"Error processing CapabilitiesUpdated event: { e } " )
206- except Exception as e :
207- # Silently skip if event is not available (e.g., ABI doesn't include events)
208- pass
209-
210106 def _process_identity_registered_events (self , from_block : int , to_block : int ):
211107 """Process IdentityRegistry Registered events"""
212108 try :
213109 if not hasattr (self .identity_registry .events , 'Registered' ):
214- # Event not defined in ABI, skip silently
215110 return
216-
111+
217112 event_filter = self .identity_registry .events .Registered .create_filter (
218113 fromBlock = from_block ,
219114 toBlock = to_block
@@ -229,7 +124,11 @@ def _process_identity_registered_events(self, from_block: int, to_block: int):
229124 block_number = event ['blockNumber' ]
230125 tx_hash = event ['transactionHash' ].hex ()
231126
232- print (f"📝 IdentityRegistry Registered: AgentId={ agent_id } , Owner={ owner } , TokenURI={ token_uri } " )
127+ print (f"Registered: AgentId={ agent_id } , Owner={ owner } , TokenURI={ token_uri } " )
128+
129+ # Verify token URI is accessible and replicate if needed
130+ if token_uri :
131+ self ._verify_and_replicate_uri (token_uri )
233132
234133 # Call custom handler if registered
235134 if 'IdentityRegistered' in self .event_handlers :
@@ -238,34 +137,28 @@ def _process_identity_registered_events(self, from_block: int, to_block: int):
238137 except Exception as e :
239138 print (f"Error processing IdentityRegistry Registered event: { e } " )
240139 except Exception as e :
241- # Silently skip if event is not available (e.g., ABI doesn't include events)
242140 pass
243141
244- def _verify_and_replicate (self , did_hash : str , agent_card_uri : str , did_doc_uri : str ):
142+ def _verify_and_replicate_uri (self , uri : str ):
245143 """
246- Verify URIs are accessible and replicate if needed
144+ Verify a URI is accessible and replicate to IPFS backup if needed.
247145 """
248146 try :
249- # Try to fetch DID document
250- did_doc = self .storage_client .fetch_did_document (did_doc_uri )
251- print (f"✓ DID document verified: { did_doc_uri } " )
252-
253- # Try to fetch agent card
254- agent_card = self .storage_client .fetch_did_document (agent_card_uri )
255- print (f"✓ Agent card verified: { agent_card_uri } " )
147+ content = self .storage_client .fetch_did_document (uri )
148+ print (f" Verified: { uri } " )
256149
257150 # If primary is NeoFS, ensure IPFS backup exists
258- if did_doc_uri .startswith ("neofs://" ):
151+ if uri .startswith ("neofs://" ):
259152 try :
260153 ipfs_cid = self .storage_client ._publish_to_ipfs (
261- json .dumps (did_doc ).encode ('utf-8' )
154+ json .dumps (content ).encode ('utf-8' )
262155 )
263- print (f"✓ IPFS backup created: { ipfs_cid } " )
156+ print (f" IPFS backup created: { ipfs_cid } " )
264157 except Exception as e :
265- print (f"IPFS replication warning: { e } " )
158+ print (f" IPFS replication warning: { e } " )
266159
267160 except Exception as e :
268- print (f"⚠ Content verification/replication failed: { e } " )
161+ print (f" Content verification/replication failed for { uri } : { e } " )
269162
270163 def get_indexer_status (self ) -> Dict :
271164 """Get current indexer status"""
@@ -277,19 +170,3 @@ def get_indexer_status(self) -> Dict:
277170 "sync_lag" : latest_block - self .current_block ,
278171 "blocks_processed" : self .current_block - self .from_block
279172 }
280-
281-
282-
283-
284-
285-
286-
287-
288-
289-
290-
291-
292-
293-
294-
295-
0 commit comments