Skip to content

Commit 1cfecd1

Browse files
committed
feat(dev): csv file to solr;
- Data pusher plugin support with direct csv to solr.
1 parent 1442da6 commit 1cfecd1

File tree

6 files changed

+290
-43
lines changed

6 files changed

+290
-43
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ To install ckanext-datastore-search:
9797
# (optional, default: False).
9898
ckanext.datastore_search.only_use_engine = True
9999

100-
**ckanext.datastore_search.min_rows_for_index** sets the minimum number of database rows required to create and use a search index (mutually exclusive with only_use_engine):
100+
**ckanext.datastore_search.min_rows_for_index** sets the minimum number of database rows required to create and use a search index (mutually exclusive with `only_use_engine`):
101101

102102
# (optional, default: 100,000).
103103
ckanext.datastore_search.min_rows_for_index = 40000
@@ -122,6 +122,21 @@ To install ckanext-datastore-search:
122122
# (optional, default: datastore_resource).
123123
ckanext.datastore_search.solr.configset = ckan_ds_resource
124124

125+
**ckanext.datastore_search.download_proxy_address** defines the NETLOC address to download files from. This is for XLoader/DataPusher plugin integrations. This is only used when using `only_use_engine`.
126+
127+
# (optional, default: None).
128+
ckanext.datastore_search.download_proxy_address = 127.0.0.1:5001
129+
130+
**ckanext.datastore_search.download_verify_https** specifies whether or not to verify HTTPS when downloading files. This is for XLoader/DataPusher plugin integrations. This is only used when using `only_use_engine`.
131+
132+
# (optional, default: true).
133+
ckanext.datastore_search.download_verify_https = false
134+
135+
**ckanext.datastore_search.always_reupload_file** specifies whether or not to always ignore matching file hashes when downloading files. This is for XLoader/DataPusher plugin integrations.
136+
137+
# (optional, default: false).
138+
ckanext.datastore_search.always_reupload_file = true
139+
125140
## Tests
126141

127142
To run the tests, do:

ckanext/datastore_search/backend/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class DatastoreSearchBackend:
2222

2323
_backends = {}
2424
_active_backend: "DatastoreSearchBackend"
25+
supported_file_formats = ['csv', 'tsv']
2526
url = plugins.toolkit.config.get(
2627
'ckanext.datastore_search.url')
2728
prefix = plugins.toolkit.config.get(
@@ -38,6 +39,14 @@ class DatastoreSearchBackend:
3839
'ckanext.datastore_search.only_use_engine', False))
3940
min_rows_for_index = int(plugins.toolkit.config.get(
4041
'ckanext.datastore_search.min_rows_for_index', 100000))
42+
download_proxy_address = plugins.toolkit.config.get(
43+
'ckanext.datastore_search.download_proxy_address')
44+
download_verify_https = plugins.toolkit.asbool(
45+
plugins.toolkit.config.get(
46+
'ckanext.datastore_search.download_verify_https', True))
47+
always_reupload_file = plugins.toolkit.asbool(
48+
plugins.toolkit.config.get(
49+
'ckanext.datastore_search.always_reupload_file', False))
4150

4251
@classmethod
4352
def register_backends(cls):

ckanext/datastore_search/backend/solr.py

Lines changed: 128 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
import requests
44
import re
55
from logging import getLogger
6+
from urllib.parse import urlsplit
7+
import time
8+
import hashlib
9+
import tempfile
10+
import math
611

712
from typing import Any, Optional, Dict, cast, List
813
from ckan.types import Context, DataDict
@@ -12,6 +17,7 @@
1217

1318
from ckanext.datastore.logic.action import datastore_search_sql
1419
from ckanext.datastore.backend.postgres import identifier
20+
from ckanext.datastore.blueprint import dump_to
1521

1622
from ckanext.datastore_search.backend import (
1723
DatastoreSearchBackend,
@@ -21,6 +27,8 @@
2127

2228
MAX_ERR_LEN = 1000
2329
PSQL_TO_SOLR_WILCARD_MATCH = re.compile('^_?|_?$')
30+
CHUNK_SIZE = 16 * 1024
31+
DOWNLOAD_TIMEOUT = 30
2432

2533
log = getLogger(__name__)
2634
DEBUG = config.get('debug', False)
@@ -182,7 +190,8 @@ def reindex(self,
182190
# type_ignore_reason: checking solr_records
183191
indexed_ids += [r['_id'] for r in solr_records] # type: ignore
184192
if self.only_use_engine:
185-
indexed_records += solr_records
193+
# type_ignore_reason: checking solr_records
194+
indexed_records += solr_records # type: ignore
186195
offset += 1000
187196
core_name = f'{self.prefix}{resource_id}'
188197
errmsg = _('Failed to reindex records for %s' % core_name)
@@ -193,16 +202,16 @@ def reindex(self,
193202
where_statement = 'WHERE _id NOT IN ({indexed_ids})'.format(
194203
indexed_ids=','.join(indexed_ids)) if \
195204
only_missing and indexed_ids and \
196-
int(solr_total) <= int(ds_total) else ''
205+
int(solr_total) <= int(ds_total) else ''
197206
existing_ids = []
198207
while gathering_ds_records:
199208
sql_string = '''
200209
SELECT {columns} FROM {table} {where_statement}
201210
LIMIT 1000 OFFSET {offset}
202211
'''.format(columns=ds_columns,
203-
table=table_name,
204-
where_statement=where_statement,
205-
offset=offset)
212+
table=table_name,
213+
where_statement=where_statement,
214+
offset=offset)
206215
ds_result = datastore_search_sql(
207216
context, {'sql': sql_string})
208217
if not ds_result['records']:
@@ -283,12 +292,12 @@ def create(self,
283292
Create or update & reload/reindex a core if the fields have changed.
284293
"""
285294
resource_id = data_dict.get('resource_id')
295+
upload_file = data_dict.pop('upload_file', False)
286296
core_name = f'{self.prefix}{resource_id}'
287297
conn = self._make_connection(resource_id) if not connection else connection
288298
if not conn:
289299
errmsg = _('Could not create SOLR core %s') % core_name
290300
callback_queue = add_queue_name_prefix(self.redis_callback_queue_name)
291-
# TODO: add callback_extras to pass records if self.only_use_engine
292301
enqueue_job(
293302
# type_ignore_reason: incomplete typing
294303
fn='solr_utils.create_solr_core.proc.create_solr_core', # type: ignore
@@ -299,8 +308,11 @@ def create(self,
299308
'action.datastore_search_create_callback',
300309
'callback_queue': callback_queue,
301310
'callback_timeout': config.get('ckan.jobs.timeout', 300),
302-
'callback_extras': {'records': data_dict.get('records', None)}
303-
if self.only_use_engine else None},
311+
'callback_extras': {
312+
'records': data_dict.get('records', None)
313+
if self.only_use_engine else None,
314+
'upload_file': upload_file}
315+
},
304316
title='SOLR Core creation %s' % core_name,
305317
queue=self.redis_queue_name,
306318
rq_kwargs={'timeout': 60})
@@ -335,6 +347,7 @@ def create(self,
335347
'name': ds_field['id'],
336348
'type': self.field_type_map[ds_field['type']],
337349
'stored': True,
350+
'multiValued': False,
338351
'indexed': True})
339352
continue
340353
if self.field_type_map[ds_field['type']] == \
@@ -400,6 +413,99 @@ def create(self,
400413
log.debug('Removed SOLR Field %s for DataStore Resource %s' %
401414
(f['name'], resource_id))
402415

416+
if upload_file:
417+
res_dict = get_action('resource_show')(
418+
{'ignore_auth': True}, {'id': resource_id})
419+
if res_dict.get('format', '').lower() not in self.supported_file_formats:
420+
raise DatastoreSearchException('Unsupported file format')
421+
url = res_dict.get('url')
422+
url_parts = urlsplit(url)
423+
scheme = url_parts.scheme
424+
if scheme not in ('http', 'https', 'ftp'):
425+
raise DatastoreSearchException('Only http, https, and ftp '
426+
'resources may be fetched.')
427+
download_uri = url
428+
headers = {}
429+
site_user = get_action('get_site_user')({'ignore_auth': True}, {})
430+
if res_dict.get('url_type') == 'upload':
431+
headers['Authorization'] = site_user['apikey']
432+
download_uri = url_parts._replace(
433+
query='{}&nonce={}'.format(url_parts.query, time.time()),
434+
netloc=(self.download_proxy_address or url_parts.netloc)).geturl()
435+
log.debug('Fetching from: %s' % download_uri)
436+
tmp_file = tempfile.TemporaryFile()
437+
length = 0
438+
m = hashlib.md5()
439+
errmsg = _('Could not load file to SOLR core %s') % core_name
440+
try:
441+
if self.only_use_engine:
442+
response = requests.get(
443+
download_uri,
444+
verify=self.download_verify_https,
445+
stream=True,
446+
headers=headers,
447+
timeout=DOWNLOAD_TIMEOUT)
448+
response.raise_for_status()
449+
for chunk in response.iter_content(CHUNK_SIZE):
450+
length += len(chunk)
451+
tmp_file.write(chunk)
452+
m.update(chunk)
453+
else:
454+
# type_ignore_reason: incomplete typing
455+
for chunk in dump_to(resource_id, # type: ignore
456+
fmt=res_dict['format'].lower(),
457+
offset=0,
458+
limit=None,
459+
options={'bom': False},
460+
sort='_id',
461+
search_params={'skip_search_engine': True},
462+
user=site_user['name']):
463+
length += len(chunk)
464+
tmp_file.write(chunk)
465+
m.update(chunk)
466+
human_length = '0 bytes'
467+
if length != 0:
468+
size_name = ('bytes', 'KB', 'MB', 'GB', 'TB')
469+
i = int(math.floor(math.log(length, 1024)))
470+
p = math.pow(1024, i)
471+
s = round(float(length) / p, 1)
472+
human_length = "%s %s" % (s, size_name[i])
473+
log.debug('Downloaded ok - %s', human_length)
474+
file_hash = m.hexdigest()
475+
tmp_file.seek(0)
476+
if not self.always_reupload_file and res_dict.get('hash') == file_hash:
477+
log.debug('The file hash has not changed, skipping loading into '
478+
'SOLR index for DataStore Resource %s' % resource_id)
479+
return
480+
log.debug('Emptying SOLR index for DataStore Resource %s' %
481+
resource_id)
482+
conn.delete(q='*:*', commit=False)
483+
log.debug('Unindexed all DataStore records for DataStore Resource %s' %
484+
resource_id)
485+
log.debug('Uploading file to SOLR core %s for DataStore Resource %s' %
486+
(core_name, resource_id))
487+
separator = ','
488+
if res_dict['format'].lower() == 'tsv':
489+
separator = '%09'
490+
solr_response = json.loads(conn._send_request(
491+
method='POST',
492+
path='update?commit=false&trim=true&overwrite=false'
493+
'&header=true&separator=%s' % separator,
494+
headers={'Content-Type': 'application/csv'},
495+
body=tmp_file.read()))
496+
if solr_response.get('errors'):
497+
raise DatastoreSearchException(
498+
errmsg if not DEBUG else solr_response.get('errors'))
499+
except pysolr.SolrError as e:
500+
raise DatastoreSearchException(
501+
errmsg if not DEBUG else e.args[0][:MAX_ERR_LEN])
502+
except Exception as e:
503+
raise DatastoreSearchException(e)
504+
finally:
505+
tmp_file.close()
506+
conn.commit(waitSearcher=False)
507+
return
508+
403509
if new_fields or updated_fields or remove_fields:
404510
self.reindex(resource_id, connection=conn)
405511

@@ -422,6 +528,7 @@ def create_callback(self, data_dict: DataDict) -> Any:
422528
log.debug('SOLR core creation stderr: %s' % data_dict.get('stderr'))
423529

424530
resource_id = data_dict.get('core_name', '').replace(self.prefix, '')
531+
upload_file = data_dict.get('extras', {}).get('upload_file', False)
425532
context = self._get_site_context()
426533
ds_result = get_action('datastore_search')(
427534
context, {'resource_id': resource_id,
@@ -430,10 +537,17 @@ def create_callback(self, data_dict: DataDict) -> Any:
430537
create_dict = {
431538
'resource_id': resource_id,
432539
'fields': [f for f in ds_result['fields'] if
433-
f['id'] not in self.default_search_fields]}
540+
f['id'] not in self.default_search_fields],
541+
'upload_file': upload_file}
542+
# call create again to get datastore fields and data types to build schema
434543
self.create(create_dict)
435544

436-
if self.only_use_engine and data_dict.get('extras', {}).get('records'):
545+
if (
546+
not upload_file and
547+
self.only_use_engine and
548+
data_dict.get('extras', {}).get('records')
549+
):
550+
# use datastore_upsert with insert to be able to use dry_run to get _id
437551
get_action('datastore_upsert')(
438552
context, {'resource_id': resource_id,
439553
'records': data_dict.get('extras', {}).get('records'),
@@ -534,7 +648,6 @@ def delete(self,
534648
"""
535649
Removes records from the SOLR index, or deletes the core entirely.
536650
"""
537-
# FIXME: delete core if count less than min_rows_for_index??
538651
resource_id = data_dict.get('resource_id')
539652
core_name = f'{self.prefix}{resource_id}'
540653
conn = self._make_connection(resource_id) if not connection else connection
@@ -547,7 +660,7 @@ def delete(self,
547660
errmsg = _('Could not delete SOLR core %s') % core_name
548661
try:
549662
conn.delete(q='*:*', commit=False)
550-
log.debug('Unindexed all DataStore records for Resource %s' %
663+
log.debug('Unindexed all DataStore records for DataStore Resource %s' %
551664
resource_id)
552665
except pysolr.SolrError as e:
553666
raise DatastoreSearchException(
@@ -564,16 +677,17 @@ def delete(self,
564677
return
565678

566679
if self.only_use_engine:
680+
# will not have deleted_records returned, so need to do this by SOLR query
567681
fq = []
568682
for key, value in data_dict.get('filters', {}).items():
569683
fq.append('%s:%s' % (key, value))
570684
errmsg = _('Could not delete DataStore record(s) '
571-
'q=%s in SOLR core %s') % \
572-
(' AND '.join(fq), core_name)
685+
'q=%s in SOLR core %s') % (' AND '.join(fq), core_name)
573686
collecting_deleted_records = True
574687
offset = 0
575688
deleted_records = []
576689
try:
690+
# do a search before deleting to emulate deleted_records
577691
while collecting_deleted_records:
578692
results = conn.search(q=' AND '.join(fq),
579693
start=offset,

ckanext/datastore_search/config_declaration.yaml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ groups:
2424
required: false
2525
- key: ckanext.datastore_search.only_use_engine
2626
default: false
27+
type: bool
2728
description: |
2829
Only use the search engine, never inserting records into the database.
2930
Because the DataDictionary saves data types and field descriptions into
3031
the database column comments, the database table and fields will still
3132
be created to maintain this functionality.
32-
validators: ignore_missing
33+
validators: ignore_missing boolean_validator
3334
required: false
3435
- key: ckanext.datastore_search.min_rows_for_index
3536
default: 100000
@@ -60,3 +61,29 @@ groups:
6061
validators: ignore_missing
6162
example: ckan_ds_resource
6263
required: false
64+
- key: ckanext.datastore_search.download_proxy_address
65+
default: None
66+
description: |
67+
NETLOC address for an internal proxy to download files.
68+
This is only used in XLoader & DataPusher integration when using ckanext.datastore_search.only_use_engine.
69+
validators: ignore_missing
70+
example: 127.0.0.1:5001
71+
required: false
72+
- key: ckanext.datastore_search.download_verify_https
73+
default: true
74+
type: bool
75+
description: |
76+
Whether or not to verify HTTPS to download files.
77+
This is only used in XLoader & DataPusher integration when using ckanext.datastore_search.only_use_engine.
78+
validators: ignore_missing boolean_validator
79+
example: false
80+
required: false
81+
- key: ckanext.datastore_search.always_reupload_file
82+
default: false
83+
type: bool
84+
description: |
85+
Whether or not to always ignore file hashes.
86+
This is only used in XLoader & DataPusher integration.
87+
validators: ignore_missing boolean_validator
88+
example: true
89+
required: false

0 commit comments

Comments
 (0)