Skip to content

Commit f9c499e

Browse files
authored
Merge pull request #840 from aaxelb/feat/multiple-current-indexes
[ENG-6708] IndexStrategy revamp (more than one index per strategy, wow)
2 parents c953024 + 2d267eb commit f9c499e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1907
-1436
lines changed

ARCHITECTURE.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ Multiple records which describe the same item/object are grouped by a
103103
the source repository. In most outward-facing views, default to showing only
104104
the most recent record for each suid.
105105

106+
### Conventions
107+
(an incomplete list)
108+
109+
- functions prefixed `pls_` ("please") are a request for something to happen
110+
106111
## Why this?
107112
inspired by [this writeup](https://matklad.github.io/2021/02/06/ARCHITECTURE.md.html)
108113
and [this example architecture document](https://github.com/rust-analyzer/rust-analyzer/blob/d7c99931d05e3723d878bea5dc26766791fa4e69/docs/dev/architecture.md)

api/search/views.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ def post(self, request):
2828

2929
def _handle_request(self, request):
3030
queryparams = request.query_params.dict()
31-
requested_index_strategy = queryparams.pop('indexStrategy', None)
31+
requested_index_strategy = queryparams.get('indexStrategy', None)
3232
if 'scroll' in queryparams:
3333
return http.HttpResponseForbidden(reason='Scroll is not supported.')
3434
try:
35-
specific_index = index_strategy.get_index_for_sharev2_search(requested_index_strategy)
35+
_index_strategy = index_strategy.get_strategy_for_sharev2_search(requested_index_strategy)
3636
except exceptions.IndexStrategyError as error:
3737
raise http.Http404(str(error))
3838
try:
39-
response_json = specific_index.pls_handle_search__sharev2_backcompat(
39+
response_json = _index_strategy.pls_handle_search__passthru(
4040
request_body=request.data,
4141
request_queryparams=queryparams,
4242
)

api/views/feeds.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class MetadataRecordsRSS(Feed):
3434
description = 'Updates to the SHARE open dataset'
3535
author_name = 'SHARE'
3636

37-
_search_index: index_strategy.IndexStrategy.SpecificIndex
37+
_search_strategy: index_strategy.IndexStrategy
3838

3939
def title(self, obj):
4040
query = json.dumps(obj.get('query', 'All'))
@@ -43,7 +43,7 @@ def title(self, obj):
4343
def get_object(self, request):
4444
self._order = request.GET.get('order')
4545
elastic_query = request.GET.get('elasticQuery')
46-
self._search_index = index_strategy.get_index_for_sharev2_search(request.GET.get('indexStrategy'))
46+
self._search_strategy = index_strategy.get_strategy_for_sharev2_search(request.GET.get('indexStrategy'))
4747

4848
if self._order not in {'date_modified', 'date_updated', 'date_created', 'date_published'}:
4949
self._order = 'date_modified'
@@ -64,7 +64,7 @@ def get_object(self, request):
6464

6565
def items(self, obj):
6666
try:
67-
json_response = self._search_index.pls_handle_search__sharev2_backcompat(
67+
json_response = self._search_strategy.pls_handle_search__passthru(
6868
request_body=obj,
6969
)
7070
except IndexStrategyError:

share/admin/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,15 +318,15 @@ class FormattedMetadataRecordAdmin(admin.ModelAdmin):
318318
class IndexBackfillAdmin(admin.ModelAdmin):
319319
readonly_fields = (
320320
'index_strategy_name',
321-
'specific_indexname',
321+
'strategy_checksum',
322322
'error_type',
323323
'error_message',
324324
'error_context',
325325
)
326326
paginator = TimeLimitedPaginator
327-
list_display = ('index_strategy_name', 'backfill_status', 'created', 'modified', 'specific_indexname')
327+
list_display = ('index_strategy_name', 'backfill_status', 'created', 'modified', 'strategy_checksum')
328328
show_full_result_count = False
329-
search_fields = ('index_strategy_name', 'specific_indexname',)
329+
search_fields = ('index_strategy_name', 'strategy_checksum',)
330330
actions = ('reset_to_initial',)
331331

332332
def reset_to_initial(self, request, queryset):

share/admin/search.py

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@
77
from share.admin.util import admin_url
88
from share.models.index_backfill import IndexBackfill
99
from share.search.index_messenger import IndexMessenger
10-
from share.search import index_strategy
10+
from share.search.index_strategy import (
11+
IndexStrategy,
12+
all_strategy_names,
13+
each_strategy,
14+
parse_strategy_name,
15+
parse_specific_index_name,
16+
)
1117

1218

1319
logger = logging.getLogger(__name__)
@@ -25,19 +31,15 @@ def search_indexes_view(request):
2531
},
2632
)
2733
if request.method == 'POST':
28-
_specific_index = index_strategy.get_specific_index(request.POST['specific_indexname'])
34+
_index_strategy = parse_strategy_name(request.POST['strategy_name'])
2935
_pls_doer = PLS_DOERS[request.POST['pls_do']]
30-
_pls_doer(_specific_index)
31-
_redirect_id = (
32-
_specific_index.index_strategy.name
33-
if _pls_doer is _pls_delete
34-
else _specific_index.indexname
35-
)
36+
_pls_doer(_index_strategy)
37+
_redirect_id = _index_strategy.strategy_name
3638
return HttpResponseRedirect('#'.join((request.path, _redirect_id)))
3739

3840

3941
def search_index_mappings_view(request, index_name):
40-
_specific_index = index_strategy.get_specific_index(index_name)
42+
_specific_index = parse_specific_index_name(index_name)
4143
_mappings = _specific_index.pls_get_mappings()
4244
return JsonResponse(_mappings)
4345

@@ -52,30 +54,23 @@ def _mappings_url_prefix():
5254

5355

5456
def _index_status_by_strategy():
55-
backfill_by_indexname: dict[str, IndexBackfill] = {
56-
backfill.specific_indexname: backfill
57-
for backfill in (
57+
_backfill_by_checksum: dict[str, IndexBackfill] = {
58+
_backfill.strategy_checksum: _backfill
59+
for _backfill in (
5860
IndexBackfill.objects
59-
.filter(index_strategy_name__in=index_strategy.all_index_strategies().keys())
61+
.filter(index_strategy_name__in=all_strategy_names())
6062
)
6163
}
6264
status_by_strategy = {}
6365
_messenger = IndexMessenger()
64-
for _index_strategy in index_strategy.all_index_strategies().values():
65-
current_index = _index_strategy.for_current_index()
66-
status_by_strategy[_index_strategy.name] = {
67-
'current': {
68-
'status': current_index.pls_get_status(),
69-
'backfill': _serialize_backfill(
70-
current_index,
71-
backfill_by_indexname.get(current_index.indexname),
72-
),
73-
},
74-
'prior': sorted((
75-
specific_index.pls_get_status()
76-
for specific_index in _index_strategy.each_specific_index()
77-
if not specific_index.is_current
78-
), reverse=True),
66+
for _index_strategy in each_strategy():
67+
_current_backfill = (
68+
_backfill_by_checksum.get(str(_index_strategy.CURRENT_STRATEGY_CHECKSUM))
69+
or _backfill_by_checksum.get(_index_strategy.indexname_prefix) # backcompat
70+
)
71+
status_by_strategy[_index_strategy.strategy_name] = {
72+
'status': _index_strategy.pls_get_strategy_status(),
73+
'backfill': _serialize_backfill(_index_strategy, _current_backfill),
7974
'queues': [
8075
{
8176
'name': _queue_name,
@@ -91,14 +86,14 @@ def _index_status_by_strategy():
9186

9287

9388
def _serialize_backfill(
94-
specific_index: index_strategy.IndexStrategy.SpecificIndex,
89+
strategy: IndexStrategy,
9590
backfill: IndexBackfill | None,
9691
):
97-
if not specific_index.is_current:
92+
if not strategy.is_current:
9893
return {}
9994
if not backfill:
10095
return {
101-
'can_start_backfill': specific_index.pls_check_exists(),
96+
'can_start_backfill': strategy.pls_check_exists(),
10297
}
10398
return {
10499
'backfill_status': backfill.backfill_status,
@@ -109,35 +104,35 @@ def _serialize_backfill(
109104
}
110105

111106

112-
def _pls_setup(specific_index):
113-
assert specific_index.is_current
114-
specific_index.pls_setup()
107+
def _pls_setup(index_strategy: IndexStrategy):
108+
assert index_strategy.is_current
109+
index_strategy.pls_setup()
115110

116111

117-
def _pls_start_keeping_live(specific_index):
118-
specific_index.pls_start_keeping_live()
112+
def _pls_start_keeping_live(index_strategy: IndexStrategy):
113+
index_strategy.pls_start_keeping_live()
119114

120115

121-
def _pls_stop_keeping_live(specific_index):
122-
specific_index.pls_stop_keeping_live()
116+
def _pls_stop_keeping_live(index_strategy: IndexStrategy):
117+
index_strategy.pls_stop_keeping_live()
123118

124119

125-
def _pls_start_backfill(specific_index):
126-
assert specific_index.is_current
127-
specific_index.index_strategy.pls_start_backfill()
120+
def _pls_start_backfill(index_strategy: IndexStrategy):
121+
assert index_strategy.is_current
122+
index_strategy.pls_start_backfill()
128123

129124

130-
def _pls_mark_backfill_complete(specific_index):
131-
specific_index.index_strategy.pls_mark_backfill_complete()
125+
def _pls_mark_backfill_complete(index_strategy: IndexStrategy):
126+
index_strategy.pls_mark_backfill_complete()
132127

133128

134-
def _pls_make_default_for_searching(specific_index):
135-
specific_index.index_strategy.pls_make_default_for_searching(specific_index)
129+
def _pls_make_default_for_searching(index_strategy: IndexStrategy):
130+
index_strategy.pls_make_default_for_searching()
136131

137132

138-
def _pls_delete(specific_index):
139-
assert not specific_index.is_current
140-
specific_index.pls_delete()
133+
def _pls_delete(index_strategy: IndexStrategy):
134+
assert not index_strategy.is_current
135+
index_strategy.pls_teardown()
141136

142137

143138
PLS_DOERS = {

share/bin/search.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ def search(args, argv):
2626
@search.subcommand('Drop the Elasticsearch index')
2727
def purge(args, argv):
2828
"""
29-
Usage: {0} search purge <index_names>...
29+
Usage: {0} search purge <strategy_names>...
3030
"""
31-
for index_name in args['<index_names>']:
32-
specific_index = index_strategy.get_specific_index(index_name)
33-
specific_index.pls_delete()
31+
for _strategy_name in args['<strategy_names>']:
32+
_strategy = index_strategy.parse_strategy_name(_strategy_name)
33+
_strategy.pls_teardown()
3434

3535

3636
@search.subcommand('Create indicies and apply mappings')
@@ -41,25 +41,16 @@ def setup(args, argv):
4141
"""
4242
_is_initial = args.get('--initial')
4343
if _is_initial:
44-
_specific_indexes = [
45-
_index_strategy.for_current_index()
46-
for _index_strategy in index_strategy.all_index_strategies().values()
47-
]
44+
for _index_strategy in index_strategy.each_strategy():
45+
_index_strategy.pls_setup()
4846
else:
4947
_index_or_strategy_name = args['<index_or_strategy_name>']
5048
try:
51-
_specific_indexes = [index_strategy.get_specific_index(_index_or_strategy_name)]
49+
_strategy = index_strategy.get_strategy(_index_or_strategy_name)
5250
except IndexStrategyError:
53-
try:
54-
_specific_indexes = [
55-
index_strategy.get_specific_index(_index_or_strategy_name),
56-
]
57-
except IndexStrategyError:
58-
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
59-
for _specific_index in _specific_indexes:
60-
_specific_index.pls_setup(
61-
skip_backfill=_is_initial, # for initial setup, there's nothing back to fill
62-
)
51+
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
52+
else:
53+
_strategy.pls_setup()
6354

6455

6556
@search.subcommand('Start the search indexing daemon')

share/checks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ def check_all_index_strategies_current(app_configs, **kwargs):
55
from share.search import index_strategy
66
from share.search.exceptions import IndexStrategyError
77
errors = []
8-
for _index_strategy in index_strategy.all_index_strategies().values():
8+
for _index_strategy in index_strategy.each_strategy():
99
try:
1010
_index_strategy.assert_strategy_is_current()
1111
except IndexStrategyError as exception:

share/models/index_backfill.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ def __repr__(self):
6868
def __str__(self):
6969
return repr(self)
7070

71+
@property
72+
def strategy_checksum(self):
73+
# back-compat alias for specific_indexname (may be removed if that's renamed via migration)
74+
return self.specific_indexname # for backcompat
75+
76+
@strategy_checksum.setter
77+
def strategy_checksum(self, value):
78+
# back-compat alias for specific_indexname (may be removed if that's renamed via migration)
79+
self.specific_indexname = value
80+
7181
@contextlib.contextmanager
7282
def mutex(self):
7383
with IndexBackfill.objects.get_with_mutex(pk=self.pk) as index_backfill:
@@ -76,14 +86,14 @@ def mutex(self):
7686

7787
def pls_start(self, index_strategy):
7888
with self.mutex() as locked_self:
79-
assert locked_self.index_strategy_name == index_strategy.name
80-
current_index = index_strategy.for_current_index()
81-
if locked_self.specific_indexname == current_index.indexname:
89+
assert locked_self.index_strategy_name == index_strategy.strategy_name
90+
_current_checksum = str(index_strategy.CURRENT_STRATEGY_CHECKSUM)
91+
if locked_self.strategy_checksum == _current_checksum:
8292
# what is "current" has not changed -- should be INITIAL
8393
assert locked_self.backfill_status == IndexBackfill.INITIAL
8494
else:
8595
# what is "current" has changed! disregard backfill_status
86-
locked_self.specific_indexname = current_index.indexname
96+
locked_self.strategy_checksum = _current_checksum
8797
locked_self.backfill_status = IndexBackfill.INITIAL
8898
locked_self.__update_error(None)
8999
try:

share/search/daemon.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def start_daemonthreads_for_strategy(self, index_strategy):
6868
return _daemon
6969

7070
def start_all_daemonthreads(self):
71-
for _index_strategy in index_strategy.all_index_strategies().values():
71+
for _index_strategy in index_strategy.each_strategy():
7272
self.start_daemonthreads_for_strategy(_index_strategy)
7373

7474
def stop_daemonthreads(self, *, wait=False):
@@ -119,7 +119,7 @@ def get_consumers(self, Consumer, channel):
119119
]
120120

121121
def __repr__(self):
122-
return '<{}({})>'.format(self.__class__.__name__, self.__index_strategy.name)
122+
return '<{}({})>'.format(self.__class__.__name__, self.__index_strategy.strategy_name)
123123

124124
def consume(self, *args, **kwargs):
125125
# wrap `consume` in `kombu.Connection.ensure`, following guidance from
@@ -191,7 +191,7 @@ def on_message(self, body, message):
191191
continue
192192

193193
def __repr__(self):
194-
return '<{}({})>'.format(self.__class__.__name__, self.index_strategy.name)
194+
return '<{}({})>'.format(self.__class__.__name__, self.index_strategy.strategy_name)
195195

196196

197197
@dataclasses.dataclass
@@ -232,11 +232,12 @@ def _the_loop_itself(self):
232232
def _raise_if_backfill_noncurrent(self):
233233
if self.message_type.is_backfill:
234234
index_backfill = self.index_strategy.get_or_create_backfill()
235-
if index_backfill.specific_indexname != self.index_strategy.current_indexname:
235+
_current_checksum = str(self.index_strategy.CURRENT_STRATEGY_CHECKSUM)
236+
if index_backfill.strategy_checksum != _current_checksum:
236237
raise exceptions.DaemonSetupError(
237238
'IndexerDaemon observes conflicting currence:'
238-
f'\n\tIndexBackfill (from database) says current is "{index_backfill.specific_indexname}"'
239-
f'\n\tIndexStrategy (from static code) says current is "{self.index_strategy.current_indexname}"'
239+
f'\n\tIndexBackfill (from database) says current is "{index_backfill.strategy_checksum}"'
240+
f'\n\tIndexStrategy (from static code) says current is "{_current_checksum}"'
240241
'\n\t(may be the daemon is running old code -- will die and retry,'
241242
' but if this keeps happening you may need to reset backfill_status'
242243
' to INITIAL and restart the backfill)'

share/search/index_messenger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, *, celery_app=None, index_strategys=None):
3232
if celery_app is None
3333
else celery_app
3434
)
35-
self.index_strategys = index_strategys or tuple(index_strategy.all_index_strategies().values())
35+
self.index_strategys = index_strategys or tuple(index_strategy.each_strategy())
3636

3737
def notify_indexcard_update(self, indexcards, *, urgent=False):
3838
self.send_messages_chunk(

0 commit comments

Comments
 (0)