@@ -22,8 +22,8 @@ use db3_event::event_processor::EventProcessor;
22
22
use db3_event:: event_processor:: EventProcessorConfig ;
23
23
use db3_proto:: db3_indexer_proto:: indexer_node_server:: IndexerNode ;
24
24
use db3_proto:: db3_indexer_proto:: {
25
- ContractSyncStatus , GetContractSyncStatusRequest , GetContractSyncStatusResponse ,
26
- RunQueryRequest , RunQueryResponse ,
25
+ ContractSyncStatus , GetCollectionOfDatabaseRequest , GetCollectionOfDatabaseResponse ,
26
+ GetContractSyncStatusRequest , GetContractSyncStatusResponse , RunQueryRequest , RunQueryResponse ,
27
27
} ;
28
28
use db3_proto:: db3_mutation_v2_proto:: MutationAction ;
29
29
use db3_proto:: db3_storage_proto:: block_response:: MutationWrapper ;
@@ -201,9 +201,10 @@ impl IndexerNodeImpl {
201
201
contract_address : & str ,
202
202
start_block : u64 ,
203
203
) -> Result < ( ) > {
204
+ let db_addr = db. to_hex ( ) ;
204
205
let config = EventProcessorConfig {
205
206
evm_node_url : evm_node_url. to_string ( ) ,
206
- db_addr : db . to_hex ( ) ,
207
+ db_addr : db_addr . to_string ( ) ,
207
208
abi : abi. to_string ( ) ,
208
209
target_events : tables. iter ( ) . map ( |t| t. to_string ( ) ) . collect ( ) ,
209
210
contract_addr : contract_address. to_string ( ) ,
@@ -217,14 +218,10 @@ impl IndexerNodeImpl {
217
218
match self . processor_mapping . lock ( ) {
218
219
Ok ( mut mapping) => {
219
220
//TODO limit the total count
220
- if mapping. contains_key ( contract_address) {
221
- warn ! ( "contract addr {} exist" , contract_address) ;
222
- return Err ( DB3Error :: WriteStoreError ( format ! (
223
- "contract_addr {} exist" ,
224
- contract_address
225
- ) ) ) ;
221
+ if mapping. contains_key ( db_addr. as_str ( ) ) {
222
+ return Err ( DB3Error :: DatabaseAlreadyExist ( db_addr. to_string ( ) ) ) ;
226
223
}
227
- mapping. insert ( contract_address . to_string ( ) , processor. clone ( ) ) ;
224
+ mapping. insert ( db_addr . to_string ( ) , processor. clone ( ) ) ;
228
225
}
229
226
_ => todo ! ( ) ,
230
227
}
@@ -241,6 +238,22 @@ impl IndexerNodeImpl {
241
238
Ok ( ( ) )
242
239
}
243
240
241
+ fn close_event_task ( & self , db : & DB3Address ) -> Result < ( ) > {
242
+ let addr = db. to_hex ( ) ;
243
+ match self . processor_mapping . lock ( ) {
244
+ Ok ( mut mapping) => match mapping. remove ( addr. as_str ( ) ) {
245
+ Some ( task) => {
246
+ task. close ( ) ;
247
+ }
248
+ None => {
249
+ return Err ( DB3Error :: DatabaseNotFound ( addr. to_string ( ) ) ) ;
250
+ }
251
+ } ,
252
+ _ => todo ! ( ) ,
253
+ }
254
+ Ok ( ( ) )
255
+ }
256
+
244
257
async fn parse_and_apply_mutations ( & self , mutations : & Vec < MutationWrapper > ) -> Result < ( ) > {
245
258
for mutation in mutations. iter ( ) {
246
259
let header = mutation. header . as_ref ( ) . unwrap ( ) ;
@@ -252,14 +265,16 @@ impl IndexerNodeImpl {
252
265
let action = MutationAction :: from_i32 ( dm. action ) . ok_or ( DB3Error :: WriteStoreError (
253
266
"fail to convert action type" . to_string ( ) ,
254
267
) ) ?;
268
+
255
269
let ( block, order, doc_ids_map_str) = match & mutation. header {
256
270
Some ( header) => Ok ( ( header. block_id , header. order_id , & header. doc_ids_map ) ) ,
257
271
_ => Err ( DB3Error :: WriteStoreError (
258
272
"invalid mutation header" . to_string ( ) ,
259
273
) ) ,
260
274
} ?;
275
+
261
276
let doc_ids_map = MutationUtil :: convert_doc_ids_map_to_vec ( doc_ids_map_str) ?;
262
- self . db_store . apply_mutation (
277
+ let extra_items = self . db_store . apply_mutation (
263
278
action,
264
279
dm,
265
280
& address,
@@ -269,6 +284,39 @@ impl IndexerNodeImpl {
269
284
order,
270
285
& doc_ids_map,
271
286
) ?;
287
+ match action {
288
+ MutationAction :: CreateEventDb => {
289
+ if extra_items. len ( ) > 0 && extra_items[ 0 ] . key . as_str ( ) == "db_addr" {
290
+ let addr = DB3Address :: from_hex ( extra_items[ 0 ] . value . as_str ( ) ) ?;
291
+ let ( collections, _) = self . db_store . get_collection_of_database ( & addr) ?;
292
+ let tables = collections. iter ( ) . map ( |c| c. name . to_string ( ) ) . collect ( ) ;
293
+ if let Some ( database) = self . db_store . get_event_db ( & addr) ? {
294
+ if let Err ( e) = self
295
+ . start_an_event_task (
296
+ & addr,
297
+ database. evm_node_url . as_str ( ) ,
298
+ database. events_json_abi . as_str ( ) ,
299
+ & tables,
300
+ database. contract_address . as_str ( ) ,
301
+ 0 ,
302
+ )
303
+ . await
304
+ {
305
+ info ! ( "start the event db {} with error {e}" , addr. to_hex( ) ) ;
306
+ } else {
307
+ info ! ( "start event db {} done" , addr. to_hex( ) ) ;
308
+ }
309
+ }
310
+ }
311
+ }
312
+ MutationAction :: DeleteEventDb => {
313
+ if extra_items. len ( ) > 0 && extra_items[ 0 ] . key . as_str ( ) == "db_addr" {
314
+ let addr = DB3Address :: from_hex ( extra_items[ 0 ] . value . as_str ( ) ) ?;
315
+ self . close_event_task ( & addr) ?;
316
+ }
317
+ }
318
+ _ => { }
319
+ }
272
320
}
273
321
Ok ( ( ) )
274
322
}
@@ -295,6 +343,29 @@ impl IndexerNode for IndexerNodeImpl {
295
343
Ok ( Response :: new ( GetContractSyncStatusResponse { status_list } ) )
296
344
}
297
345
346
+ async fn get_collection_of_database (
347
+ & self ,
348
+ request : Request < GetCollectionOfDatabaseRequest > ,
349
+ ) -> std:: result:: Result < Response < GetCollectionOfDatabaseResponse > , Status > {
350
+ let r = request. into_inner ( ) ;
351
+ let addr = DB3Address :: from_hex ( r. db_addr . as_str ( ) )
352
+ . map_err ( |e| Status :: invalid_argument ( format ! ( "invalid database address {e}" ) ) ) ?;
353
+ let ( collections, collection_states) = self
354
+ . db_store
355
+ . get_collection_of_database ( & addr)
356
+ . map_err ( |e| Status :: internal ( format ! ( "fail to get collect of database {e}" ) ) ) ?;
357
+
358
+ info ! (
359
+ "query collection count {} with database {}" ,
360
+ collections. len( ) ,
361
+ r. db_addr. as_str( )
362
+ ) ;
363
+ Ok ( Response :: new ( GetCollectionOfDatabaseResponse {
364
+ collections,
365
+ states : collection_states,
366
+ } ) )
367
+ }
368
+
298
369
async fn run_query (
299
370
& self ,
300
371
request : Request < RunQueryRequest > ,
0 commit comments