Skip to content

Commit 65e9eac

Browse files
committed
redis for solr core creation.
1 parent f0921d6 commit 65e9eac

File tree

10 files changed

+141
-10
lines changed

10 files changed

+141
-10
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ bower_components
55
# Byte-compiled / optimized / DLL files
66
__pycache__/
77
*.py[cod]
8+
**/venv/**
89

910
# C extensions
1011
*.so

ckanext/datastore_search/backend/solr.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import json
33
import requests
44
import re
5+
import rq
56
from logging import getLogger
67

78
from typing import Any, Optional, Dict, cast, List
89
from ckan.types import Context, DataDict
910

1011
from ckan.plugins.toolkit import _, config, get_action
12+
from ckan.lib.redis import connect_to_redis
1113

1214
from ckanext.datastore.logic.action import datastore_search_sql
1315
from ckanext.datastore.backend.postgres import identifier
@@ -20,10 +22,13 @@
2022

2123
MAX_ERR_LEN = 1000
2224
PSQL_TO_SOLR_WILCARD_MATCH = re.compile('^_?|_?$')
25+
REDIS_QUEUE_NAME = 'ckan_ds_solr_core_create'
2326

2427
log = getLogger(__name__)
2528
DEBUG = config.get('debug', False)
2629

30+
_ds_solr_queues: Dict[str, rq.Queue] = {}
31+
2732

2833
class DatastoreSolrBackend(DatastoreSearchBackend):
2934
"""
@@ -277,17 +282,28 @@ def create(self,
277282
# solr create -c core_name -d configsets/datastore_resource
278283
# then does SOLR server need to send a signal back?
279284
# or can we just keep retrying a couple of times??
285+
global _ds_solr_queues
286+
redis_queue = None
280287
errmsg = _('Could not create SOLR core %s') % core_name
281-
req_body = {'create': [{'name': core_name,
282-
'configSet': 'datastore_resource'}]}
283-
resp = self._send_api_request(method='POST',
284-
endpoint='cores',
285-
body=req_body)
286-
if 'error' in resp:
287-
raise DatastoreSearchException(
288-
errmsg if not DEBUG
289-
else resp['error'].get('msg', errmsg)[:MAX_ERR_LEN])
290-
log.debug('Created SOLR Core for DataStore Resource %s' % resource_id)
288+
if REDIS_QUEUE_NAME in _ds_solr_queues:
289+
redis_queue = _ds_solr_queues[REDIS_QUEUE_NAME]
290+
else:
291+
redis_conn = connect_to_redis()
292+
redis_queue = _ds_solr_queues[REDIS_QUEUE_NAME] = \
293+
rq.Queue(REDIS_QUEUE_NAME, connection=redis_conn)
294+
if not redis_queue:
295+
raise DatastoreSearchException(errmsg)
296+
job = redis_queue.enqueue_call(
297+
'create_solr_core.proc._create_solr_core',
298+
args=[core_name, 'datastore_resource'],
299+
timeout=60)
300+
if not job.meta:
301+
job.meta = {}
302+
job.meta['title'] = 'SOLR Core creation %s' % core_name
303+
job.save()
304+
log.debug('Enqueued SOLR Core creation for DataStore Resource %s ' %
305+
resource_id)
306+
# TODO: await or retry here???
291307
conn = self._make_connection(resource_id)
292308
if not conn:
293309
raise DatastoreSearchException(
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# encoding: utf-8
2+
__all__ = ['proc', 'worker']
3+
import create_solr_core.proc as mod_proc
4+
import create_solr_core.worker as mod_worker
5+
6+
proc = mod_proc
7+
worker = mod_worker
8+
9+
# this is a namespace package
10+
try:
11+
import pkg_resources
12+
pkg_resources.declare_namespace(__name__)
13+
except ImportError:
14+
import pkgutil
15+
__path__ = pkgutil.extend_path(__path__, __name__)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import subprocess
2+
3+
def _create_solr_core(core_name, config_set):
4+
subprocess.run(
5+
['solr', 'create', '-c', core_name, '-d', config_set],
6+
stdout=subprocess.PIPE,
7+
stderr=subprocess.STDOUT,
8+
timeout=60)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import rq
2+
from redis import Redis, ConnectionPool
3+
import click
4+
from logging import getLogger
5+
6+
from typing import Dict, Optional, Tuple
7+
8+
9+
log = getLogger(__name__)
10+
11+
_solr_queues: Dict[str, rq.Queue] = {}
12+
_redis_connection_pool = None
13+
14+
15+
@click.command(short_help="Work the given queues.")
16+
@click.option('-r', '--redis-url', required=True,
17+
type=click.STRING, default=None,
18+
help='The REDIS connection URI.')
19+
@click.option('-q', '--queues', default=None, required=True,
20+
multiple=True, help='Queue names to work on.')
21+
@click.option('-b', '--burst', is_flag=True, default=False,
22+
type=click.BOOL, help='Burst the worker.')
23+
@click.option('-v', '--verbose', is_flag=True, default=False,
24+
type=click.BOOL, help='Increase verbosity.')
25+
def _worker(redis_url: Optional[str] = None,
26+
queues: Optional[Tuple[str]] = None,
27+
burst: bool = False,
28+
verbose: bool = False):
29+
"""
30+
Work the given queues.
31+
"""
32+
global _solr_queues
33+
global _redis_connection_pool
34+
35+
if _redis_connection_pool is None:
36+
if verbose:
37+
click.echo('Using Redis at %s' % redis_url)
38+
_redis_connection_pool = ConnectionPool.from_url(redis_url)
39+
redis_conn = Redis(connection_pool=_redis_connection_pool)
40+
41+
redis_queues = {}
42+
for queue in queues:
43+
if queue in _solr_queues:
44+
redis_queue = _solr_queues[queue]
45+
else:
46+
redis_queue = _solr_queues[queue] = \
47+
rq.Queue(queue, connection=redis_conn)
48+
redis_queues[queue] = redis_queue
49+
50+
worker = rq.Worker(queues=redis_queues.values(),
51+
connection=redis_conn)
52+
worker.work(burst=burst)
53+
54+
55+
if __name__ == '__main__':
56+
_worker()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
rq==2.0.0
2+
redis==5.2.0
3+
click==8.1.8
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[metadata]
2+
name = create_solr_core
3+
version = 0.0.1
4+
description = Python module to create SOLR cores.
5+
url = https://github.com/JVickery-TBS/ckanext-datastore-search
6+
author = Jesse Vickery
7+
author_email = jesse.vickery@tbs-sct.gc.ca
8+
license = AGPL
9+
classifiers =
10+
Development Status :: 4 - Beta
11+
License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)
12+
Programming Language :: Python :: 3.7
13+
Programming Language :: Python :: 3.8
14+
Programming Language :: Python :: 3.9
15+
Programming Language :: Python :: 3.10
16+
keywords = CKAN Datastore SOLR search index indices
17+
18+
[options]
19+
packages = find_namespace:
20+
package_dir =
21+
=create_solr_core
22+
install_requires =
23+
include_package_data = True
24+
25+
[options.packages.find]
26+
where=create_solr_core
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# -*- coding: utf-8 -*-
2+
from setuptools import setup
3+
4+
setup()

ckanext/datastore_search/logic/__init__.py

Whitespace-only changes.

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
pysolr==3.10.0
22
requests==2.32.3
3+
rq==2.0.0
4+
redis==5.2.0

0 commit comments

Comments
 (0)