@@ -170,6 +170,7 @@ def reindex(self,
170170 gathering_solr_records = True
171171 offset = 0
172172 indexed_ids = []
173+ indexed_records = []
173174 while gathering_solr_records :
174175 solr_records = self .search (
175176 {'resource_id' : resource_id ,
@@ -180,51 +181,68 @@ def reindex(self,
180181 gathering_solr_records = False
181182 # type_ignore_reason: checking solr_records
182183 indexed_ids += [r ['_id' ] for r in solr_records ] # type: ignore
184+ if self .only_use_engine :
185+ indexed_records += solr_records
183186 offset += 1000
184- gathering_ds_records = True
185- offset = 0
186- table_name = identifier (resource_id )
187- where_statement = 'WHERE _id NOT IN ({indexed_ids})' .format (
188- indexed_ids = ',' .join (indexed_ids )) if \
189- only_missing and indexed_ids and int (solr_total ) <= int (ds_total ) else ''
190187 core_name = f'{ self .prefix } { resource_id } '
191188 errmsg = _ ('Failed to reindex records for %s' % core_name )
192- existing_ids = []
193- while gathering_ds_records :
194- sql_string = '''
195- SELECT {columns} FROM {table} {where_statement}
196- LIMIT 1000 OFFSET {offset}
197- ''' .format (columns = ds_columns ,
198- table = table_name ,
199- where_statement = where_statement ,
200- offset = offset )
201- ds_result = datastore_search_sql (
202- context , {'sql' : sql_string })
203- if not ds_result ['records' ]:
204- gathering_ds_records = False
205- for r in ds_result ['records' ]:
206- existing_ids .append (str (r ['_id' ]))
207- if only_missing and indexed_ids and str (r ['_id' ]) in indexed_ids :
208- continue
189+ if not self .only_use_engine :
190+ gathering_ds_records = True
191+ offset = 0
192+ table_name = identifier (resource_id )
193+ where_statement = 'WHERE _id NOT IN ({indexed_ids})' .format (
194+ indexed_ids = ',' .join (indexed_ids )) if \
195+ only_missing and indexed_ids and \
196+ int (solr_total ) <= int (ds_total ) else ''
197+ existing_ids = []
198+ while gathering_ds_records :
199+ sql_string = '''
200+ SELECT {columns} FROM {table} {where_statement}
201+ LIMIT 1000 OFFSET {offset}
202+ ''' .format (columns = ds_columns ,
203+ table = table_name ,
204+ where_statement = where_statement ,
205+ offset = offset )
206+ ds_result = datastore_search_sql (
207+ context , {'sql' : sql_string })
208+ if not ds_result ['records' ]:
209+ gathering_ds_records = False
210+ for r in ds_result ['records' ]:
211+ existing_ids .append (str (r ['_id' ]))
212+ if only_missing and indexed_ids and str (r ['_id' ]) in indexed_ids :
213+ continue
214+ try :
215+ conn .add (docs = [r ], commit = False )
216+ if DEBUG :
217+ log .debug ('Indexed DataStore record '
218+ '_id=%s for Resource %s' %
219+ (r ['_id' ], resource_id ))
220+ except pysolr .SolrError as e :
221+ raise DatastoreSearchException (
222+ errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
223+ offset += 1000
224+ orphan_ids = set (indexed_ids ) - set (existing_ids )
225+ for orphan_id in orphan_ids :
226+ try :
227+ conn .delete (q = '_id:%s' % orphan_id , commit = False )
228+ if DEBUG :
229+ log .debug ('Unindexed DataStore record '
230+ '_id=%s for Resource %s' %
231+ (orphan_id , resource_id ))
232+ except pysolr .SolrError as e :
233+ raise DatastoreSearchException (
234+ errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
235+ else :
236+ for r in indexed_records :
209237 try :
210238 conn .add (docs = [r ], commit = False )
211239 if DEBUG :
212- log .debug ('Indexed DataStore record _id=%s for Resource %s' %
240+ log .debug ('Indexed DataStore record '
241+ '_id=%s for Resource %s' %
213242 (r ['_id' ], resource_id ))
214243 except pysolr .SolrError as e :
215244 raise DatastoreSearchException (
216245 errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
217- offset += 1000
218- orphan_ids = set (indexed_ids ) - set (existing_ids )
219- for orphan_id in orphan_ids :
220- try :
221- conn .delete (q = '_id:%s' % orphan_id , commit = False )
222- if DEBUG :
223- log .debug ('Unindexed DataStore record _id=%s for Resource %s' %
224- (orphan_id , resource_id ))
225- except pysolr .SolrError as e :
226- raise DatastoreSearchException (
227- errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
228246 conn .commit (waitSearcher = False )
229247 log .debug ('Reindexed SOLR Core for DataStore Resource %s' % resource_id )
230248
@@ -270,6 +288,7 @@ def create(self,
270288 if not conn :
271289 errmsg = _ ('Could not create SOLR core %s' ) % core_name
272290 callback_queue = add_queue_name_prefix (self .redis_callback_queue_name )
291+ # TODO: add callback_extras to pass records if self.only_use_engine
273292 enqueue_job (
274293 # type_ignore_reason: incomplete typing
275294 fn = 'solr_utils.create_solr_core.proc.create_solr_core' , # type: ignore
@@ -279,7 +298,9 @@ def create(self,
279298 'callback_fn' : 'ckanext.datastore_search.logic.'
280299 'action.datastore_search_create_callback' ,
281300 'callback_queue' : callback_queue ,
282- 'callback_timeout' : config .get ('ckan.jobs.timeout' , 300 )},
301+ 'callback_timeout' : config .get ('ckan.jobs.timeout' , 300 ),
302+ 'callback_extras' : {'records' : data_dict .get ('records' , None )}
303+ if self .only_use_engine else None },
283304 title = 'SOLR Core creation %s' % core_name ,
284305 queue = self .redis_queue_name ,
285306 rq_kwargs = {'timeout' : 60 })
@@ -382,10 +403,11 @@ def create(self,
382403 if new_fields or updated_fields or remove_fields :
383404 self .reindex (resource_id , connection = conn )
384405
385- if 'records' in data_dict :
406+ if 'records' in data_dict and not self . only_use_engine :
386407 self .upsert (data_dict , connection = conn )
387408
388- self ._check_counts (resource_id , connection = conn )
409+ if not self .only_use_engine :
410+ self ._check_counts (resource_id , connection = conn )
389411
390412 def create_callback (self , data_dict : DataDict ) -> Any :
391413 """
@@ -400,17 +422,23 @@ def create_callback(self, data_dict: DataDict) -> Any:
400422 log .debug ('SOLR core creation stderr: %s' % data_dict .get ('stderr' ))
401423
402424 resource_id = data_dict .get ('core_name' , '' ).replace (self .prefix , '' )
403-
425+ context = self . _get_site_context ()
404426 ds_result = get_action ('datastore_search' )(
405- self . _get_site_context () , {'resource_id' : resource_id ,
406- 'limit' : 0 ,
407- 'skip_search_engine' : True })
427+ context , {'resource_id' : resource_id ,
428+ 'limit' : 0 ,
429+ 'skip_search_engine' : True })
408430 create_dict = {
409431 'resource_id' : resource_id ,
410432 'fields' : [f for f in ds_result ['fields' ] if
411433 f ['id' ] not in self .default_search_fields ]}
412434 self .create (create_dict )
413435
436+ if self .only_use_engine and data_dict .get ('extras' , {}).get ('records' ):
437+ get_action ('datastore_upsert' )(
438+ context , {'resource_id' : resource_id ,
439+ 'records' : data_dict .get ('extras' , {}).get ('records' ),
440+ 'method' : 'insert' })
441+
414442 def upsert (self ,
415443 data_dict : DataDict ,
416444 connection : Optional [pysolr .Solr ] = None ) -> Any :
@@ -438,7 +466,8 @@ def upsert(self,
438466 errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
439467 conn .commit (waitSearcher = False )
440468
441- self ._check_counts (resource_id , connection = conn )
469+ if not self .only_use_engine :
470+ self ._check_counts (resource_id , connection = conn )
442471
443472 def search (self ,
444473 data_dict : DataDict ,
@@ -505,6 +534,7 @@ def delete(self,
505534 """
506535 Removes records from the SOLR index, or deletes the core entirely.
507536 """
537+ # FIXME: delete core if count less than min_rows_for_index??
508538 resource_id = data_dict .get ('resource_id' )
509539 core_name = f'{ self .prefix } { resource_id } '
510540 conn = self ._make_connection (resource_id ) if not connection else connection
@@ -533,6 +563,37 @@ def delete(self,
533563 log .debug ('Unloaded SOLR Core for DataStore Resource %s' % resource_id )
534564 return
535565
566+ if self .only_use_engine :
567+ fq = []
568+ for key , value in data_dict .get ('filters' , {}).items ():
569+ fq .append ('%s:%s' % (key , value ))
570+ errmsg = _ ('Could not delete DataStore record(s) '
571+ 'q=%s in SOLR core %s' ) % \
572+ (' AND ' .join (fq ), core_name )
573+ collecting_deleted_records = True
574+ offset = 0
575+ deleted_records = []
576+ try :
577+ while collecting_deleted_records :
578+ results = conn .search (q = ' AND ' .join (fq ),
579+ start = offset ,
580+ limit = 1000 ,
581+ sort = '_id asc' )
582+ if not results :
583+ collecting_deleted_records = False
584+ deleted_records += results
585+ offset += 1000
586+ conn .delete (q = ' AND ' .join (fq ), commit = False )
587+ if DEBUG :
588+ log .debug ('Unindexed DataStore record(s) q=%s for Resource %s' %
589+ (' AND ' .join (fq ), resource_id ))
590+ except pysolr .SolrError as e :
591+ raise DatastoreSearchException (
592+ errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
593+ conn .commit (waitSearcher = False )
594+ data_dict ['deleted_records' ] = deleted_records
595+ return
596+
536597 for record in data_dict .get ('deleted_records' , []):
537598 errmsg = _ ('Could not delete DataStore record _id=%s in SOLR core %s' ) % \
538599 (record ['_id' ], core_name )
@@ -546,4 +607,5 @@ def delete(self,
546607 errmsg if not DEBUG else e .args [0 ][:MAX_ERR_LEN ])
547608 conn .commit (waitSearcher = False )
548609
549- self ._check_counts (resource_id , connection = conn )
610+ if not self .only_use_engine :
611+ self ._check_counts (resource_id , connection = conn )
0 commit comments