This repository was archived by the owner on Oct 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathclio_lite.py
More file actions
359 lines (328 loc) · 14.3 KB
/
clio_lite.py
File metadata and controls
359 lines (328 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
from collections import defaultdict
import json
import logging
import math
import os
import requests
from stop_words import get_stop_words
import urllib
from clio_utils import try_pop
from clio_utils import set_headers
from clio_utils import make_endpoint
from clio_utils import extract_docs
from clio_utils import extract_keywords
from clio_utils import assert_fraction
"""
Stop words: Feel free to add to these
e.g. :obj:`from clio_lite import STOP_WORDS; STOP_WORDS += ['water']`
"""
STOP_WORDS = get_stop_words('english')
"""
Maximum chunksize for doc iterator: Feel free to change.
e.g. :obj:`from clio_lite import MAX_CHUNKSIZE; MAX_CHUNKSIZE = 10`
"""
MAX_CHUNKSIZE = 10000
def combined_score(keyword_scores):
"""Combine Lucene keyword scores according to my own recipe,
which is calculate a weighted combination of the scores,
combined in quadrature (i.e. assuming that the scores are
orthogonal).
"""
numerator, denominator = 0, 0
for row in keyword_scores:
s2 = math.pow(row['score'], 2)
b2 = math.pow(row['bg_count'], 2) # includes doc_count
numerator += s2*b2
denominator += b2
return math.sqrt(numerator/denominator)
def simple_query(endpoint, query, fields, filters,
size=None, aggregations=None,
response_mode=False,
**kwargs):
"""Perform a simple query on Elasticsearch.
Args:
url (str): The Elasticsearch endpoint.
query (str): The query to make to ES.
fields (list): List of fields to query.
filters (list): List of ES filters.
size (int): Number of documents to return.
aggregations: Do not use this directly. See :obj:`clio_keywords`.
response_mode: Do not use this directly. See :obj:`clio_lite_searchkit_lambda`.
Returns:
{total, docs} (tuple): {total number of docs}, {top :obj:`size` docs}
"""
_query = {"_source": False}
if type(query) is dict:
_query['query'] = query
else:
_query['query'] = {
"bool": {
"must": [{"multi_match": {"query": query.lower(),
"fields": fields}}],
"filter": filters
}
}
# Assume that if you want aggregations, you don't want anything else
if aggregations is not None:
_query['aggregations'] = aggregations
_query['size'] = 0
_query.pop('_source')
elif size is not None:
_query['size'] = size
# Make the query
logging.debug(_query)
r = requests.post(url=endpoint, data=json.dumps(_query),
params={"search_type": "dfs_query_then_fetch"},
**kwargs)
# "Aggregation mode"
if aggregations is not None:
return extract_keywords(r)
total, docs = extract_docs(r)
# "Response mode"
if response_mode and total == 0:
return total, r
return total, docs
def more_like_this(endpoint, docs, fields, limit, offset,
min_term_freq, max_query_terms,
min_doc_frac, max_doc_frac,
min_should_match, total,
stop_words=STOP_WORDS,
filters=[], scroll=None,
response_mode=False,
post_aggregation={},
**kwargs):
"""Make an MLT query
Args:
endpoint (str): URL path to _search endpoint
docs (list): Document index and ids to expand from.
fields (list): List of fields to query.
limit (int): Number of documents to return.
offset (int): Offset from the highest ranked document.
n_seed_docs (int): Use a maxmimum of this many seed documents.
min_term_freq (int): Only consider seed terms which occur in all
documents with this frequency.
max_query_terms (int): Maximum number of important terms to
identify in the seed documents.
min_doc_frac (float): Only consider seed terms which appear more
than this fraction of the seed docs.
max_doc_frac (float): Only consider seed terms which appear less
than this fraction of the seed docs.
min_should_match (float): Fraction of important terms from the
seed docs explicitly required to match.
stop_words (list): A supplementary list of terms to ignore. Defaults
to standard English stop words.
filters (list): ES filters to supply to the query.
scroll (str): ES scroll time window (e.g. '1m').
Returns:
{total, docs} (tuple): {total number of docs}, {top :obj:`size` docs}.
"""
# If there are no documents to expand from
if total == 0:
return (0, [])
# Check that the fractions are fractions, to avoid weird behaviour
assert_fraction(min_should_match)
assert_fraction(min_doc_frac)
assert_fraction(max_doc_frac)
# Formulate the MLT query
msm = int(min_should_match*100)
max_doc_freq = int(max_doc_frac*total)
min_doc_freq = int(min_doc_frac*total)
mlt = {
"more_like_this": {
"fields": fields if fields != [] else None,
"like": docs,
"min_term_freq": min_term_freq,
"max_query_terms": max_query_terms,
"min_doc_freq": min_doc_freq,
"max_doc_freq": max_doc_freq,
"boost_terms": 1,
"stop_words": stop_words,
"minimum_should_match": f'{msm}%',
"include": True,
}
}
_query = {"query": {"bool": {"filter": filters, "must": [mlt]}}}
params = {"search_type": "dfs_query_then_fetch"}
# Offset assumes no scrolling (since it would be invalid)
if offset is not None and offset < total:
_query['from'] = offset
# If scrolling was specified
elif scroll is not None:
params['scroll'] = scroll
# The number of docs returned
if limit is not None:
_query['size'] = limit
# Make the query
logging.debug(_query)
r = requests.post(url=endpoint,
data=json.dumps(dict(**post_aggregation, **_query)),
params=params,
**kwargs)
if response_mode:
return None, r
# If successful, return
return extract_docs(r, scroll=scroll, include_score=True)
def clio_keywords(url, index, fields, max_query_terms=10,
filters=[], stop_words=STOP_WORDS,
shard_size=5000,
**kwargs):
"""Discover keywords associated with a seed query.
Args:
url (str): URL path to bare ES endpoint.
index (str): Index to query.
fields (list): List of fields to query.
max_query_terms (int): Maximum number of important terms to
identify in the seed documents.
stop_words (list): A supplementary list of terms to ignore. Defaults
to standard English stop words.
shard_size (int): ES shard_size (increases sample doc size).
Returns:
keywords (list): A list of keywords and their scores.
"""
set_headers(kwargs)
endpoint = make_endpoint(url, index)
# Formulate the aggregation query
keyword_agg = {
"_keywords": {
"sampler": {"shard_size": shard_size},
"aggregations": {
"keywords": {
"significant_text": {
"size": max_query_terms,
"jlh": {}
}
}
}
}
}
# The aggregation can only be performed once per field,
# so terms can be given multiple scores across fields.
data = defaultdict(list) # Mapping of term to scores for that term
for field in fields:
# Set the field and make the query
(keyword_agg['_keywords']['aggregations']['keywords']
['significant_text']['field']) = field
kws = simple_query(endpoint=endpoint, fields=[field],
filters=filters, aggregations=keyword_agg, **kwargs)
# Append keywords if not stop words
for kw in kws:
word = kw.pop('key')
if word in stop_words:
continue
data[word].append(kw)
# Calculate a combined score for each word, and sort by score
keywords = sorted((dict(key=word, score=combined_score(info))
for word, info in data.items()),
key=lambda kw: kw['score'], reverse=True)
return keywords
def clio_search(url, index, query,
fields=[], n_seed_docs=None,
limit=None, offset=None,
min_term_freq=1, max_query_terms=10,
min_doc_frac=0.001, max_doc_frac=0.9,
min_should_match=0.1, pre_filters=[],
post_filters=[], stop_words=STOP_WORDS,
scroll=None, post_aggregation={}, **kwargs):
"""Perform a contextual search of Elasticsearch data.
Args:
url (str): URL path to bare ES endpoint.
index (str): Index to query.
query (str): The simple text query to Elasticsearch.
fields (list): List of fields to query.
n_seed_docs (int): Number of seed documents to retrieve.
limit (int): Number of documents to return.
offset (int): Offset from the highest ranked document.
n_seed_docs (int): Use a maxmimum of this many seed documents.
min_term_freq (int): Only consider seed terms which occur in all
documents with this frequency.
max_query_terms (int): Maximum number of important terms to
identify in the seed documents.
min_doc_frac (float): Only consider seed terms which appear more
than this fraction of the seed docs.
max_doc_frac (float): Only consider seed terms which appear less
than this fraction of the seed docs.
min_should_match (float): Fraction of important terms from the
seed docs explicitly required to match.
{pre,post}_filters (list): ES filters to supply to the
{seed,expanded} queries.
stop_words (list): A supplementary list of terms to ignore. Defaults
to standard English stop words.
scroll (str): ES scroll time window (e.g. '1m').
Returns:
{total, docs} (tuple): {total number of docs}, {top :obj:`size` docs}.
"""
set_headers(kwargs)
endpoint = make_endpoint(url, index)
# Make the seed query
total, docs = simple_query(endpoint=endpoint,
query=query,
fields=fields,
size=n_seed_docs,
filters=pre_filters,
**kwargs)
# May as well break out early if there aren't any hits
if total == 0:
return total, docs
# Make the expanded search query
total, docs = more_like_this(endpoint=endpoint,
docs=docs, fields=fields,
limit=limit, offset=offset,
min_term_freq=min_term_freq,
max_query_terms=max_query_terms,
min_doc_frac=min_doc_frac,
max_doc_frac=max_doc_frac,
min_should_match=min_should_match,
total=total,
stop_words=stop_words,
filters=post_filters,
post_aggregation=post_aggregation,
scroll=scroll,
**kwargs)
return total, docs
def clio_search_iter(url, index, chunksize=1000, scroll='1m', **kwargs):
"""Perform a *bulk* (streamed) contextual search of Elasticsearch data.
Args:
url (str): URL path to bare ES endpoint.
index (str): Index to query.
chunksize (int): Chunk size to retrieve from Elasticsearch.
query (str): The simple text query to Elasticsearch.
fields (list): List of fields to query.
n_seed_docs (int): Number of seed documents to retrieve.
min_term_freq (int): Only consider seed terms which occur in all
documents with this frequency.
max_query_terms (int): Maximum number of important terms to
identify in the seed documents.
min_doc_frac (float): Only consider seed terms which appear more
than this fraction of the seed docs.
max_doc_frac (float): Only consider seed terms which appear less
than this fraction of the seed docs.
min_should_match (float): Fraction of important terms from the
seed docs explicitly required to match.
{pre,post}_filters (list): ES filters to supply to the
{seed,expanded} queries.
stop_words (list): A supplementary list of terms to ignore. Defaults
to standard English stop words.
scroll (str): ES scroll time window (e.g. '1m').
Yields:
Single rows of data
"""
try_pop(kwargs, 'limit') # Ignore limit and offset
try_pop(kwargs, 'offset')
if chunksize > MAX_CHUNKSIZE:
logging.warning(f'Will not consider chunksize greater than {MAX_CHUNKSIZE}. '
f'Reverting to chunksize={MAX_CHUNKSIZE}.')
# First search
scroll_id, docs = clio_search(url=url, index=index,
limit=chunksize, scroll=scroll, **kwargs)
for row in docs:
yield row
# Keep scrolling if required
endpoint = urllib.parse.urljoin(f'{url}/', '_search/scroll')
while len(docs) == chunksize:
r = requests.post(endpoint,
data=json.dumps({'scroll': scroll,
'scroll_id': scroll_id}),
headers={'Content-Type': 'application/json'})
_, docs = extract_docs(r)
for row in docs:
yield row