Skip to content

Commit 61d2ba5

Browse files
committed
add elastic plugin
1 parent 976b5d7 commit 61d2ba5

File tree

1 file changed

+304
-0
lines changed

1 file changed

+304
-0
lines changed
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <tomkralidis@gmail.com>
4+
#
5+
# Copyright (c) 2025 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
import json
31+
import logging
32+
from urllib.parse import urlencode
33+
34+
from elasticsearch import Elasticsearch
35+
from elasticsearch_dsl import connections, Document, Index, Search, Text
36+
from pygeofilter.backends.elasticsearch import to_filter
37+
import requests
38+
39+
LOGGER = logging.getLogger(__name__)
40+
41+
ES_VERSION = '8.3'
42+
43+
class Record(Document):
44+
identifier = Text()
45+
46+
47+
class ElasticsearchRepository:
48+
"""
49+
Class to interact with Elasticsearch metadata repository
50+
"""
51+
52+
def __init__(self, repo_object: dict, context):
53+
"""
54+
Initialize repository
55+
"""
56+
57+
self.database = None
58+
self.filter = repo_object.get('filter')
59+
self.context = context
60+
self.fts = False
61+
self.label = 'Elasticsearch'
62+
self.local_ingest = True
63+
self.dbtype = self.label
64+
self.username = repo_object.get('username')
65+
self.password = repo_object.get('password')
66+
self.authentication = None
67+
68+
self.query_mappings = {
69+
# OGC API - Records mappings
70+
'identifier': 'identifier',
71+
'type': 'type',
72+
'typename': 'typename',
73+
'parentidentifier': 'parentidentifier',
74+
'collections': 'parentidentifier',
75+
'updated': 'insert_date',
76+
'title': 'title',
77+
'description': 'abstract',
78+
'keywords': 'keywords',
79+
'edition': 'edition',
80+
'anytext': 'anytext',
81+
'bbox': 'wkt_geometry',
82+
'date': 'date',
83+
'datetime': 'date',
84+
'time_begin': 'time_begin',
85+
'time_end': 'time_end',
86+
'platform': 'platform',
87+
'instrument': 'instrument',
88+
'sensortype': 'sensortype',
89+
'off_nadir': 'illuminationelevationangle'
90+
}
91+
92+
self.es_host, self.index_name = self.filter.rsplit('/', 1)
93+
self.es_search_url = f'{self.filter}/_search'
94+
self.es = Elasticsearch(self.es_host)
95+
96+
# generate core queryables db and obj bindings
97+
self.queryables = {}
98+
99+
for tname in self.context.model['typenames']:
100+
for qname in self.context.model['typenames'][tname]['queryables']:
101+
self.queryables[qname] = {}
102+
items = self.context.model['typenames'][tname]['queryables'][
103+
qname
104+
].items()
105+
106+
for qkey, qvalue in items:
107+
self.queryables[qname][qkey] = qvalue
108+
109+
# flatten all queryables
110+
self.queryables['_all'] = {}
111+
for qbl in self.queryables:
112+
self.queryables['_all'].update(self.queryables[qbl])
113+
self.queryables['_all'].update(self.context.md_core_model['mappings'])
114+
115+
LOGGER.debug('Connecting to Elasticsearch')
116+
if not self.es.ping():
117+
msg = f'Cannot connect to Elasticsearch: {self.es_host}'
118+
LOGGER.error(msg)
119+
raise RuntimeError(msg)
120+
121+
def describe(self) -> dict:
122+
"""Derive table columns and types"""
123+
124+
properties = {
125+
'geometry': {
126+
'$ref': 'https://geojson.org/schema/Polygon.json',
127+
'x-ogc-role': 'primary-geometry'
128+
}
129+
}
130+
131+
i = Index(name=self.index_name, using=self.es)
132+
mapping = i.get_mapping()[self.index_name]['mappings']['properties']
133+
134+
LOGGER.debug(f'Response: {mapping}')
135+
136+
for key, value in mapping.items():
137+
properties[key] = {
138+
'type': value.get('type', 'string')
139+
}
140+
141+
return properties
142+
143+
def dataset(self, record):
144+
"""
145+
Stub to mock a pycsw dataset object for Transactions
146+
"""
147+
148+
return type('dataset', (object,), record)
149+
150+
def query_ids(self, ids: list) -> list:
151+
"""
152+
Query by list of identifiers
153+
"""
154+
155+
results = []
156+
157+
s = Search(using=self.es, index=self.index_name)
158+
s = s.query('ids', values=ids)
159+
response = s.execute()
160+
161+
for hit in response:
162+
results.append(self._doc2record(hit))
163+
164+
return results
165+
166+
def query_collections(self, filters=None, limit=10) -> list:
167+
''' Query for parent collections '''
168+
169+
results = []
170+
params = {}
171+
172+
try:
173+
response = requests.get(self.es_search_url, params=params,
174+
auth=self.authentication)
175+
response.raise_for_status()
176+
response = response.json()
177+
except requests.exceptions.HTTPError as err:
178+
msg = f'Elasticsearch query error: {err.response.text}'
179+
LOGGER.error(msg)
180+
raise RuntimeError(msg)
181+
182+
for hit in response['hits']['hits']:
183+
if hit['_source']['type'] in ['collection', 'stac:Collection']:
184+
results.append(self._doc2record(hit['_source']))
185+
186+
return results
187+
188+
def query_domain(self, domain, typenames, domainquerytype='list',
189+
count=False) -> list:
190+
"""
191+
Query by property domain values
192+
"""
193+
194+
results = []
195+
196+
params = {
197+
'q': '*:*',
198+
'rows': 0,
199+
'facet': 'true',
200+
'facet.query': 'distinct',
201+
'facet.type': 'terms',
202+
'facet.field': domain,
203+
'fq': ['metadata_status:Active']
204+
}
205+
206+
try:
207+
response = requests.get(f'{self.filter}/select', params=params,
208+
auth=self.authentication)
209+
response.raise_for_status()
210+
response = response.json()
211+
except requests.exceptions.HTTPError as err:
212+
msg = f'Solr query error: {err.response.text}'
213+
LOGGER.error(msg)
214+
raise RuntimeError(msg)
215+
216+
counts = response['facet_counts']['facet_fields'][domain]
217+
218+
for term in zip(*([iter(counts)] * 2)):
219+
LOGGER.debug(f'Term: {term}')
220+
results.append(term)
221+
222+
return results
223+
224+
def query_insert(self, direction='max') -> str:
225+
"""
226+
Query to get latest (default) or earliest update to repository
227+
"""
228+
229+
if direction == 'min':
230+
sort_order = 'asc'
231+
else:
232+
sort_order = 'desc'
233+
234+
params = {
235+
'q': '*:*',
236+
'q.op': 'OR',
237+
'fl': 'timestamp',
238+
'sort': f'timestamp {sort_order}',
239+
'fq': ['metadata_status:Active'],
240+
}
241+
242+
try:
243+
response = requests.get(f'{self.filter}/select', params=params,
244+
auth=self.authentication)
245+
response.raise_for_status()
246+
response = response.json()
247+
except requests.exceptions.HTTPError as err:
248+
msg = f'Solr query error: {err.response.text}'
249+
LOGGER.error(msg)
250+
raise RuntimeError(msg)
251+
252+
try:
253+
timestamp = datetime.strptime(
254+
response['response']['docs'][0]['timestamp'],
255+
'%Y-%m-%dT%H:%M:%S.%fZ'
256+
)
257+
except IndexError:
258+
timestamp = datetime.now()
259+
260+
return timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
261+
262+
def query_source(self, source):
263+
"""
264+
Query by source
265+
"""
266+
267+
return NotImplementedError()
268+
269+
def query(self, constraint=None, sortby=None, typenames=None, maxrecords=10,
270+
startposition=0) -> tuple:
271+
"""
272+
Query records from underlying repository
273+
"""
274+
275+
results = []
276+
record = Record()
277+
278+
print(record.search.__doc__)
279+
es_query = record.search(using=self.es, index=self.index_name)
280+
281+
if constraint.get('ast') is not None:
282+
LOGGER.debug('Applying filter')
283+
es_query = es_query.query(
284+
to_filter(constraint['ast'], {'ows:BoundingBox': 'geometry'}, version=ES_VERSION)).extra(track_total_hits=True)
285+
286+
es_response = es_query.execute()
287+
288+
total = es_response.hits.total.value
289+
LOGGER.debug(f'Found: {total}')
290+
291+
for doc in es_response:
292+
results.append(self._doc2record(doc))
293+
294+
return total, results
295+
296+
def _doc2record(self, doc: dict):
297+
"""
298+
Transform a Solr doc into a pycsw dataset object
299+
"""
300+
301+
record = doc.to_dict()
302+
record['metadata_type'] = 'application/geo+json'
303+
record['metadata'] = json.dumps(doc.to_dict())
304+
return self.dataset(record)

0 commit comments

Comments
 (0)