Skip to content

Commit c6d3022

Browse files
committed
better creation backend handling with REDIS queues and REDIS callback queues.
1 parent 65e9eac commit c6d3022

File tree

12 files changed

+144
-198
lines changed

12 files changed

+144
-198
lines changed

README.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ This plugin hooks into the creation and insertion of DataStore tables and record
77

88
## Requirements
99

10-
SOLR 9+ running in Stand Alone mode. Currently, this plugin does NOT support Cloud Mode SOLR.
11-
1210
https://github.com/ckan/ckan/pull/8684
1311

12+
**If using the SOLR engine:**
13+
- SOLR 9+ running in Stand Alone mode. Currently, this plugin does NOT support Cloud Mode SOLR.
14+
- [Python SOLR Utils](https://github.com/JVickery-TBS/pysolr-utils) installed and service running on a SOLR server.
15+
- A `ckan -c <INI> jobs worker ckan_ds_create_index_callback` service running on a CKAN server. *See: ckanext.datastore_search.redis.callback_queue_name below*
16+
1417
Compatibility with core CKAN versions:
1518

1619
| CKAN version | Compatible? |
@@ -88,6 +91,26 @@ To install ckanext-datastore-search:
8891
# (optional, default: datastore_).
8992
ckanext.datastore_search.prefix = ds_res_
9093

94+
**ckanext.datastore_search.redis.queue_name** controls the REDIS queue name to enqueue SOLR core creations on.
95+
96+
# (optional, default: ckan_ds_create_index).
97+
ckanext.datastore_search.redis.queue_name = ckan_ds_create_solr_core
98+
99+
**ckanext.datastore_search.redis.callback_queue_name** controls the REDIS queue name to enqueue optional callbacks from the create queue. *Note:* this will be used inside the CKAN framework and have `ckan:<ckan.site_id>:` prefixed to it.
100+
101+
# (optional, default: ckan_ds_create_index_callback).
102+
ckanext.datastore_search.redis.callback_queue_name = ckan_ds_create_solr_core_callback
103+
104+
**ckanext.datastore_search.redis.callback_queue_name** controls the REDIS queue name to enqueue optional callbacks from the create queue. *Note:* this will be used inside the CKAN framework and have `ckan:<ckan.site_id>:` prefixed to it.
105+
106+
# (optional, default: ckan_ds_create_index_callback).
107+
ckanext.datastore_search.redis.callback_queue_name = ckan_ds_create_solr_core_callback
108+
109+
**ckanext.datastore_search.solr.configset** controls the SOLR configset name.
110+
111+
# (optional, default: datastore_resource).
112+
ckanext.datastore_search.solr.configset = ckan_ds_resource
113+
91114
## Tests
92115

93116
To run the tests, do:

ckanext/datastore_search/backend/__init__.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ckan.types import Context, DataDict
1+
from ckan.types import DataDict
22
from typing import Any
33

44
import ckan.plugins as plugins
@@ -27,6 +27,12 @@ class DatastoreSearchBackend:
2727
prefix = plugins.toolkit.config.get(
2828
'ckanext.datastore_search.prefix', 'datastore_')
2929
default_search_fields = ['_id']
30+
redis_queue_name = plugins.toolkit.config.get(
31+
'ckanext.datastore_search.redis_queue',
32+
'ckan_ds_create_index')
33+
redis_callback_queue_name = plugins.toolkit.config.get(
34+
'ckanext.datastore_search.redis.callback_queue_name',
35+
'ckan_ds_create_index_callback')
3036

3137
@classmethod
3238
def register_backends(cls):
@@ -81,10 +87,7 @@ def reindex(self,
8187
"""
8288
raise NotImplementedError()
8389

84-
def create(
85-
self,
86-
context: Context,
87-
data_dict: DataDict) -> Any:
90+
def create(self, data_dict: DataDict) -> Any:
8891
"""Create new resource inside the search index.
8992
9093
Called by `datastore_create`.
@@ -95,7 +98,12 @@ def create(
9598
"""
9699
raise NotImplementedError()
97100

98-
def upsert(self, context: Context, data_dict: DataDict) -> Any:
101+
def create_callback(self, data_dict: DataDict) -> Any:
102+
"""Any special actions to execute during possible created index callbacks.
103+
"""
104+
pass
105+
106+
def upsert(self, data_dict: DataDict) -> Any:
99107
"""Update or create resource depending on data_dict param.
100108
101109
Called by `datastore_upsert`.
@@ -106,7 +114,7 @@ def upsert(self, context: Context, data_dict: DataDict) -> Any:
106114
"""
107115
raise NotImplementedError()
108116

109-
def delete(self, context: Context, data_dict: DataDict) -> Any:
117+
def delete(self, data_dict: DataDict) -> Any:
110118
"""Remove resource from the search index.
111119
112120
Called by `datastore_delete`.
@@ -117,7 +125,7 @@ def delete(self, context: Context, data_dict: DataDict) -> Any:
117125
"""
118126
raise NotImplementedError()
119127

120-
def search(self, context: Context, data_dict: DataDict) -> Any:
128+
def search(self, data_dict: DataDict) -> Any:
121129
"""Base search.
122130
123131
Called by `datastore_search`.

ckanext/datastore_search/backend/solr.py

Lines changed: 64 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
import json
33
import requests
44
import re
5-
import rq
65
from logging import getLogger
76

87
from typing import Any, Optional, Dict, cast, List
98
from ckan.types import Context, DataDict
109

11-
from ckan.plugins.toolkit import _, config, get_action
12-
from ckan.lib.redis import connect_to_redis
10+
from ckan.plugins.toolkit import _, config, get_action, enqueue_job
11+
from ckan.lib.jobs import add_queue_name_prefix
1312

1413
from ckanext.datastore.logic.action import datastore_search_sql
1514
from ckanext.datastore.backend.postgres import identifier
@@ -22,20 +21,19 @@
2221

2322
MAX_ERR_LEN = 1000
2423
PSQL_TO_SOLR_WILCARD_MATCH = re.compile('^_?|_?$')
25-
REDIS_QUEUE_NAME = 'ckan_ds_solr_core_create'
2624

2725
log = getLogger(__name__)
2826
DEBUG = config.get('debug', False)
2927

30-
_ds_solr_queues: Dict[str, rq.Queue] = {}
31-
3228

3329
class DatastoreSolrBackend(DatastoreSearchBackend):
3430
"""
3531
SOLR class for datastore search backend.
3632
"""
3733
timeout = config.get('solr_timeout')
3834
default_search_fields = ['_id', '_version_', 'indexed_ts', '_text_']
35+
configset_name = config.get('ckanext.datastore_search.solr.configset',
36+
'datastore_resource')
3937

4038
@property
4139
def field_type_map(self):
@@ -95,30 +93,6 @@ def _make_connection(self, resource_id: str) -> Optional[pysolr.Solr]:
9593
except pysolr.SolrError:
9694
pass
9795

98-
def _make_or_create_connection(self, resource_id: str) -> Optional[pysolr.Solr]:
99-
"""
100-
Tries to make a SOLR connection to a core,
101-
otherwise tries to creates a new core.
102-
"""
103-
core_name = f'{self.prefix}{resource_id}'
104-
conn = self._make_connection(resource_id)
105-
if conn:
106-
return conn
107-
ds_result = get_action('datastore_search')(
108-
self._get_site_context(), {'resource_id': resource_id,
109-
'limit': 0,
110-
'skip_search_engine': True})
111-
create_dict = {
112-
'resource_id': resource_id,
113-
'fields': [f for f in ds_result['fields'] if
114-
f['id'] not in self.default_search_fields]}
115-
self.create(self._get_site_context(), create_dict)
116-
conn = self._make_connection(resource_id)
117-
if conn:
118-
return conn
119-
raise DatastoreSearchException(
120-
_('Could not connect to SOLR core %s') % core_name)
121-
12296
def _send_api_request(self,
12397
method: str,
12498
endpoint: str,
@@ -158,7 +132,10 @@ def reindex(self,
158132
# DS Resource could take a long time??
159133
context = self._get_site_context()
160134
core_name = f'{self.prefix}{resource_id}'
161-
conn = self._make_or_create_connection(resource_id) if not connection else connection
135+
conn = self._make_connection(resource_id) if not connection else connection
136+
137+
if not conn:
138+
return
162139

163140
errmsg = _('Could not reload SOLR core %s') % core_name
164141
resp = self._send_api_request(method='POST',
@@ -189,9 +166,9 @@ def reindex(self,
189166
indexed_ids = []
190167
while gathering_solr_records:
191168
solr_records = self.search(
192-
context, {'resource_id': resource_id,
193-
'limit': 1000,
194-
'offset': offset},
169+
{'resource_id': resource_id,
170+
'limit': 1000,
171+
'offset': offset},
195172
conn)
196173
if not solr_records:
197174
gathering_solr_records = False
@@ -266,7 +243,6 @@ def _check_counts(self,
266243
self.reindex(resource_id, connection, only_missing=True)
267244

268245
def create(self,
269-
context: Context,
270246
data_dict: DataDict,
271247
connection: Optional[pysolr.Solr] = None) -> Any:
272248
"""
@@ -276,35 +252,24 @@ def create(self,
276252
core_name = f'{self.prefix}{resource_id}'
277253
conn = self._make_connection(resource_id) if not connection else connection
278254
if not conn:
279-
# FIXME: using configSet in API does not copy the configSet
280-
# into the core conf directory. We need to send some type
281-
# of signal to the SOLR server so it can run
282-
# solr create -c core_name -d configsets/datastore_resource
283-
# then does SOLR server need to send a signal back?
284-
# or can we just keep retrying a couple of times??
285-
global _ds_solr_queues
286-
redis_queue = None
287255
errmsg = _('Could not create SOLR core %s') % core_name
288-
if REDIS_QUEUE_NAME in _ds_solr_queues:
289-
redis_queue = _ds_solr_queues[REDIS_QUEUE_NAME]
290-
else:
291-
redis_conn = connect_to_redis()
292-
redis_queue = _ds_solr_queues[REDIS_QUEUE_NAME] = \
293-
rq.Queue(REDIS_QUEUE_NAME, connection=redis_conn)
294-
if not redis_queue:
295-
raise DatastoreSearchException(errmsg)
296-
job = redis_queue.enqueue_call(
297-
'create_solr_core.proc._create_solr_core',
298-
args=[core_name, 'datastore_resource'],
299-
timeout=60)
300-
if not job.meta:
301-
job.meta = {}
302-
job.meta['title'] = 'SOLR Core creation %s' % core_name
303-
job.save()
256+
callback_queue = add_queue_name_prefix(self.redis_callback_queue_name)
257+
enqueue_job(fn='solr_utils.create_solr_core.proc.create_solr_core',
258+
kwargs={
259+
'core_name': core_name,
260+
'config_set': self.configset_name,
261+
'callback_fn': 'ckanext.datastore_search.logic.'
262+
'action.datastore_search_create_callback',
263+
'callback_queue': callback_queue,
264+
'callback_timeout': config.get('ckan.jobs.timeout', 300)},
265+
title='SOLR Core creation %s' % core_name,
266+
queue=self.redis_queue_name,
267+
rq_kwargs={'timeout': 60})
304268
log.debug('Enqueued SOLR Core creation for DataStore Resource %s ' %
305269
resource_id)
306-
# TODO: await or retry here???
307-
conn = self._make_connection(resource_id)
270+
# we return here as we do not know how long the background
271+
# job to create the new SOLR core will take.
272+
return
308273
if not conn:
309274
raise DatastoreSearchException(
310275
_('Could not connect to SOLR core %s') % core_name)
@@ -399,20 +364,46 @@ def create(self,
399364
self.reindex(resource_id, connection=conn)
400365

401366
if 'records' in data_dict:
402-
self.upsert(context, data_dict, connection=conn)
367+
self.upsert(data_dict, connection=conn)
403368

404369
self._check_counts(resource_id, connection=conn)
405370

371+
def create_callback(self, data_dict: DataDict) -> Any:
372+
"""
373+
Callback from the REDIS queue via SOLR server
374+
after successful creation of the SOLR core.
375+
"""
376+
if data_dict.get('exit_code'):
377+
log.debug('SOLR core creation exit_code: %s' % data_dict.get('exit_code'))
378+
if data_dict.get('stdout'):
379+
log.debug('SOLR core creation stdout: %s' % data_dict.get('stdout'))
380+
if data_dict.get('stderr'):
381+
log.debug('SOLR core creation stderr: %s' % data_dict.get('stderr'))
382+
383+
resource_id = data_dict.get('core_name', '').replace(self.prefix, '')
384+
385+
ds_result = get_action('datastore_search')(
386+
self._get_site_context(), {'resource_id': resource_id,
387+
'limit': 0,
388+
'skip_search_engine': True})
389+
create_dict = {
390+
'resource_id': resource_id,
391+
'fields': [f for f in ds_result['fields'] if
392+
f['id'] not in self.default_search_fields]}
393+
self.create(create_dict)
394+
406395
def upsert(self,
407-
context: Context,
408396
data_dict: DataDict,
409397
connection: Optional[pysolr.Solr] = None) -> Any:
410398
"""
411399
Insert records into the SOLR index.
412400
"""
413401
resource_id = data_dict.get('resource_id')
414402
core_name = f'{self.prefix}{resource_id}'
415-
conn = self._make_or_create_connection(resource_id) if not connection else connection
403+
conn = self._make_connection(resource_id) if not connection else connection
404+
405+
if not conn:
406+
return
416407

417408
if data_dict['records']:
418409
for r in data_dict['records']:
@@ -430,7 +421,6 @@ def upsert(self,
430421
self._check_counts(resource_id, connection=conn)
431422

432423
def search(self,
433-
context: Context,
434424
data_dict: DataDict,
435425
connection: Optional[pysolr.Solr] = None) -> Optional[List[Dict[str, Any]]]:
436426
"""
@@ -440,7 +430,11 @@ def search(self,
440430
return
441431

442432
resource_id = data_dict.get('resource_id')
443-
conn = self._make_or_create_connection(resource_id) if not connection else connection
433+
conn = self._make_connection(resource_id) if not connection else connection
434+
435+
if not conn:
436+
raise DatastoreSearchException(
437+
_('SOLR core does not exist for DataStore Resource %s') % resource_id)
444438

445439
query = data_dict.get('q', {})
446440
filters = data_dict.get('filters', {})
@@ -485,15 +479,17 @@ def search(self,
485479
return results.docs
486480

487481
def delete(self,
488-
context: Context,
489482
data_dict: DataDict,
490483
connection: Optional[pysolr.Solr] = None) -> Any:
491484
"""
492485
Removes records from the SOLR index, or deletes the core entirely.
493486
"""
494487
resource_id = data_dict.get('resource_id')
495488
core_name = f'{self.prefix}{resource_id}'
496-
conn = self._make_or_create_connection(resource_id) if not connection else connection
489+
conn = self._make_connection(resource_id) if not connection else connection
490+
491+
if not conn:
492+
return
497493

498494
if not data_dict.get('filters'):
499495
errmsg = _('Could not delete SOLR core %s') % core_name

ckanext/datastore_search/config/redis/create_solr_core/__init__.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

ckanext/datastore_search/config/redis/create_solr_core/proc.py

Lines changed: 0 additions & 8 deletions
This file was deleted.

0 commit comments

Comments
 (0)