Skip to content

Commit 28a048c

Browse files
authored
Merge pull request #94 from aaxelb/9696-timeseries-search
[ENG-9696] timeseries search
2 parents aef1276 + 7a7f664 commit 28a048c

File tree

11 files changed

+580
-157
lines changed

11 files changed

+580
-157
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,5 @@ pc up testbox
105105

106106
example devloop -- build with current code, lint and run tests with debugger on error, stop on failure
107107
```
108-
pc run --build --rm --no-deps testbox poetry run python -m elasticsearch_metrics.tests --devloop
108+
pc run --build --rm --no-deps testbox poetry run python -m elasticsearch_metrics.tests --failfast --pdb
109109
```

README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ from elasticsearch_metrics.imps.elastic8 import EventRecord
5757
class UsageRecord(EventRecord):
5858
item_id: int
5959

60-
class Meta:
61-
djelme_backend = "my-es8-backend" # optional if only one backend
60+
class Index:
61+
using = "my-es8-backend" # optional if only one backend
6262
```
6363

6464
Either enable autosetup...
@@ -87,8 +87,13 @@ UsageRecord.record(item_id='my.item.id')
8787
Go forth and search!
8888

8989
```python
90-
# search across all timeseries indexes -- get an `elasticsearch8.dsl.Search` object
90+
# get an instance of `elasticsearch8.dsl.Search` that queries all timeseries indexes of this type:
9191
UsageRecord.search()
92+
93+
# or get a `Search` for a given time range (from_when <= timestamp < until_when)
94+
UsageRecord.search_timeseries_range((1999,), (2001,)) # in or after 1999; before 2001
95+
UsageRecord.search_timeseries_range((2050, 12), (2051,)) # in 2050-12
96+
UsageRecord.search_timeseries_range(datetime.date(2030, 1, 1), datetime.date(2030, 2, 1)) # in 2030-01
9297
```
9398

9499
## Timeseries indexes

elasticsearch_metrics/apps.py

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
from django.apps import AppConfig
44
from django.conf import settings
5-
from django.core.exceptions import ImproperlyConfigured
65
from django.utils.module_loading import autodiscover_modules
76

87
from elasticsearch_metrics.registry import djelme_registry
98

10-
BACKENDS_SETTING = "DJELME_BACKENDS"
119
AUTOSETUP_SETTING = "DJELME_AUTOSETUP"
1210

1311

@@ -17,14 +15,15 @@ class ElasticsearchMetricsConfig(AppConfig):
1715
def ready(self) -> None:
1816
# load backends settings
1917
_backend_names_by_module = collections.defaultdict(list)
20-
for _backend_name, _imp_module_name, _imp_kwargs in _each_backend_setting():
18+
for (
19+
_backend_name,
20+
_imp_module_name,
21+
_,
22+
) in djelme_registry.each_backend_settings():
2123
_backend_names_by_module[_imp_module_name].append(_backend_name)
22-
djelme_registry.register_backend(
23-
_backend_name, _imp_module_name, _imp_kwargs
24-
)
2524
# discover any `foo.metrics` in installed apps
2625
autodiscover_modules("metrics")
27-
# call `djelme_when_ready` on each imp module
26+
# call `djelme_when_ready` for each imp module (only once)
2827
for _imp_module_name, _backend_names in _backend_names_by_module.items():
2928
_imp_module = djelme_registry.get_imp_module(_imp_module_name)
3029
_imp_module.djelme_when_ready(
@@ -39,31 +38,3 @@ def ready(self) -> None:
3938
_recordtypes,
4039
) in djelme_registry.each_recordtype_by_backend():
4140
djelme_registry.get_backend(_backend_name).djelme_setup(_recordtypes)
42-
43-
44-
###
45-
# accessing django settings
46-
47-
48-
def _each_backend_setting() -> (
49-
collections.abc.Iterator[tuple[str, str, dict[str, str]]]
50-
):
51-
for _backend_name, _backend_imps in getattr(settings, BACKENDS_SETTING, {}).items():
52-
if len(_backend_imps) > 1:
53-
raise ImproperlyConfigured(
54-
BACKENDS_SETTING, "only one imp per backend, at the moment"
55-
)
56-
for _imp_module_name, _imp_kwargs in _backend_imps.items():
57-
if not (
58-
isinstance(_imp_module_name, str)
59-
and isinstance(_imp_kwargs, dict)
60-
and all(
61-
isinstance(_k, str) and isinstance(_v, str)
62-
for _k, _v in _imp_kwargs.items()
63-
)
64-
):
65-
raise ImproperlyConfigured(
66-
BACKENDS_SETTING,
67-
'expected {"mybackend": {"imp.path": {"imp_config": "..."}}}',
68-
)
69-
yield (_backend_name, _imp_module_name, _imp_kwargs)

elasticsearch_metrics/imps/elastic6.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,21 @@ def get_timeseries_index_template(cls):
251251
template_name=cls._template_name, pattern=cls._template_pattern
252252
)
253253

254+
@classmethod
255+
def get_timeseries_name_prefix(cls) -> str:
256+
return ""
257+
254258
@classmethod
255259
def get_index_name(cls, date: datetime.date | None = None) -> str:
256260
date = date or timezone.now().date()
257261
dateformat = getattr(
258262
settings, "ELASTICSEARCH_METRICS_DATE_FORMAT", DEFAULT_DATE_FORMAT
259263
)
260264
date_formatted = date.strftime(dateformat)
261-
return "{}_{}".format(cls._template_name, date_formatted)
265+
_name_parts = (cls._template_name, date_formatted)
266+
if _prefix := cls.get_timeseries_name_prefix():
267+
_name_parts = (_prefix, *_name_parts)
268+
return "_".join(_name_parts)
262269

263270

264271
class Metric(Document, BaseMetric):

elasticsearch_metrics/imps/elastic8.py

Lines changed: 98 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,17 @@
2626
from django.conf import settings
2727
from elasticsearch8.exceptions import NotFoundError
2828
from elasticsearch8 import Elasticsearch as Elastic8Client
29-
from elasticsearch8.dsl import (
30-
Document,
31-
connections,
32-
ComposableIndexTemplate,
33-
mapped_field,
34-
Keyword,
35-
)
29+
from elasticsearch8 import dsl as esdsl
3630
from elasticsearch8.dsl._sync.document import IndexMeta
37-
from elasticsearch8.dsl.document_base import DocumentOptions
3831

3932
from elasticsearch_metrics import signals
4033
from elasticsearch_metrics import exceptions
4134
from elasticsearch_metrics.registry import djelme_registry
42-
from elasticsearch_metrics.protocols import ProtoDjelmeBackend, ProtoCountedUsage
35+
from elasticsearch_metrics.protocols import (
36+
ProtoDjelmeBackend,
37+
ProtoCountedUsage,
38+
ProtoDjelmeRecord,
39+
)
4340
from elasticsearch_metrics.util import timeseries_naming
4441
from elasticsearch_metrics.util.unique_together import get_unique_id
4542
from elasticsearch_metrics.util.anon_enough import opaque_sessionhour_id
@@ -55,7 +52,7 @@
5552
# invasive hacky changes to elasticsearch8.dsl
5653

5754
# change default mapping for `str` annotations from Text to Keyword:
58-
DocumentOptions.type_annotation_map[str] = (Keyword, {})
55+
esdsl.document_base.DocumentOptions.type_annotation_map[str] = (esdsl.Keyword, {})
5956

6057

6158
# changes to document metaclass behavior
@@ -117,7 +114,7 @@ def app_label(self) -> str | None:
117114
return djelme_registry.get_recordtype_app_label(self)
118115

119116

120-
class DjelmeRecordtype(Document, metaclass=_DjelmeRecordtypeMetaclass):
117+
class DjelmeRecordtype(esdsl.Document, metaclass=_DjelmeRecordtypeMetaclass):
121118
"""a subclass of elasticsearch8.dsl.Document, with conveniences
122119
123120
>>> class MyAbstractRecord(DjelmeRecordtype):
@@ -145,24 +142,28 @@ class DjelmeRecordtype(Document, metaclass=_DjelmeRecordtypeMetaclass):
145142
"""
146143

147144
UNIQUE_TOGETHER_FIELDS: typing.ClassVar[collections.abc.Iterable[str]] = ()
148-
unique_id: str = mapped_field(Keyword(), default="") # filled on save
145+
unique_id: str = esdsl.mapped_field(esdsl.Keyword(), default="") # filled on save
149146

150147
class Meta:
151148
abstract = True
152149

153150
@classmethod
154-
def record(cls, *, using=None, **kwargs):
151+
def record(
152+
cls, *, using: str | None = None, **kwargs: typing.Any
153+
) -> "typing.Self": # typing.Self added in py 3.11 -- str annotation until 3.10 eol
155154
"""Persist a record in Elasticsearch."""
156155
_instance = cls(**kwargs)
157156
_instance.save(using=using)
158157
return _instance
159158

160159
@classmethod
161-
def check_djelme_setup(cls, using: str | Elastic8Client | None = None) -> bool:
160+
def check_djelme_setup(cls, using: str | None = None) -> bool:
161+
# this base class has only a single index -- does it exist?
162162
return bool(cls._index.get(using=using))
163163

164164
@classmethod
165165
def _djelme_teardown(cls, es_client):
166+
# this base class has only a single index -- delete it
166167
cls._index.delete(using=es_client)
167168

168169
@classmethod
@@ -229,9 +230,14 @@ def _get_unique_id(self) -> str | None:
229230

230231

231232
class TimeseriesRecord(DjelmeRecordtype):
232-
timestamp: datetime.datetime = mapped_field(
233+
timestamp: datetime.datetime = esdsl.mapped_field(
233234
default_factory=lambda: django.utils.timezone.now()
234235
)
236+
# the 'version' field type allows range queries on semver-like strings
237+
# that fit perfectly with "timeparts" representation of a UTC datetime
238+
# as a sequence of integers -- helps to avoid time zones and date math
239+
# (e.g. '2000' < '2000.5.10' < '2000.5.20.20.20' < '2000.11' < '2001')
240+
timestamp_parts: str = esdsl.mapped_field(esdsl.Version(), default="")
235241

236242
class Meta:
237243
abstract = True
@@ -240,18 +246,57 @@ class Meta:
240246
# class methods
241247

242248
@classmethod
243-
def init(cls, index=None, using=None):
244-
"""Create the index and populate the mappings in elasticsearch.
249+
def init(cls, index=None, using=None) -> None:
250+
"""Create an index template with mappings for timeseries indexes
245251
246252
overrides elasticsearch.Document.init
253+
(but doesn't call super().init(), which would create a "now" index)
247254
"""
248255
assert not cls.is_abstract
249-
# to init timeseries indexes, create only the template
250256
cls.sync_index_template(using=using)
251-
return super().init(
252-
index=(index or cls.format_timeseries_index_name()),
253-
using=cls._get_using(using),
257+
258+
@classmethod
259+
def search(cls, *, index=None, **kwargs):
260+
return super().search(
261+
index=(index or cls.format_timeseries_index_pattern()),
262+
**kwargs,
263+
)
264+
265+
@classmethod
266+
def search_timeseries_range(
267+
cls,
268+
from_when: tuple[int, ...] | datetime.date,
269+
until_when: tuple[int, ...] | datetime.date,
270+
**kwargs: typing.Any,
271+
) -> typing.Any:
272+
_index_pattern = cls.format_timeseries_index_pattern_for_range(
273+
from_when, until_when
274+
)
275+
_timestamp_q = esdsl.query.Range(
276+
timestamp_parts={
277+
"gte": timeseries_naming.full_semverlike_timeparts(from_when),
278+
"lt": timeseries_naming.full_semverlike_timeparts(until_when),
279+
}
254280
)
281+
return cls.search(index=_index_pattern).filter(_timestamp_q)
282+
283+
@classmethod
284+
def refresh_timeseries_indexes(cls, using: str | None = None) -> None:
285+
cls._get_connection(using).indices.refresh(
286+
index=cls.format_timeseries_index_pattern()
287+
)
288+
289+
@classmethod
290+
def each_timeseries_index(
291+
cls, using: str | None = None
292+
) -> collections.abc.Iterator[tuple[str, dict[str, typing.Any]]]:
293+
_resp = cls._get_connection(using).indices.get(
294+
index=cls.format_timeseries_index_pattern(),
295+
)
296+
for _index_name, _index_info in _resp.items():
297+
assert isinstance(_index_name, str)
298+
assert isinstance(_index_info, dict)
299+
yield _index_name, _index_info
255300

256301
@classmethod
257302
def _djelme_teardown(cls, es8_client: Elastic8Client) -> None:
@@ -277,7 +322,7 @@ def format_timeseries_index_name(
277322
)
278323

279324
@classmethod
280-
def get_timeseries_template(cls) -> ComposableIndexTemplate:
325+
def get_timeseries_template(cls) -> esdsl.ComposableIndexTemplate:
281326
return cls._index.as_composable_template(
282327
template_name=cls.get_timeseries_template_name(),
283328
pattern=cls.format_timeseries_index_pattern(),
@@ -312,6 +357,20 @@ def format_timeseries_index_pattern(cls, timeparts: tuple[int, ...] = ()) -> str
312357
max_timedepth=cls.get_timedepth(),
313358
)
314359

360+
@classmethod
361+
def format_timeseries_index_pattern_for_range(
362+
cls,
363+
from_when: tuple[int, ...] | datetime.date,
364+
until_when: tuple[int, ...] | datetime.date,
365+
) -> str:
366+
return timeseries_naming.format_index_pattern_for_range(
367+
cls.get_timeseries_name_prefix(),
368+
cls.get_timeseries_recordtype_name(),
369+
from_when,
370+
until_when,
371+
timedepth=cls.get_timedepth(),
372+
)
373+
315374
@classmethod
316375
def get_timedepth(cls) -> int:
317376
_default_timedepth = getattr(
@@ -404,6 +463,13 @@ def check_djelme_setup(cls, using: str | Elastic8Client | None = None) -> bool:
404463
###
405464
# instance methods
406465

466+
def __init__(self, *args, **kwargs):
467+
super().__init__(*args, **kwargs)
468+
self.timestamp_parts = self._build_timestamp_parts()
469+
470+
def _build_timestamp_parts(self) -> str:
471+
return timeseries_naming.full_semverlike_timeparts(self.timestamp)
472+
407473
def djelme_index_name(self) -> str:
408474
assert self.timestamp is not None
409475
return self.format_timeseries_index_name(self.timestamp)
@@ -444,11 +510,11 @@ class CountedUsageRecord(EventRecord):
444510
"""
445511

446512
# for ProtoCountedUsage:
447-
platform_iri: str = mapped_field(required=True, default="")
448-
database_iri: str = mapped_field(required=True, default="")
449-
item_iri: str = mapped_field(required=True, default="")
450-
sessionhour_id: str = mapped_field(Keyword(), default="")
451-
within_iris: list[str] = mapped_field(Keyword(), default_factory=list)
513+
platform_iri: str = esdsl.mapped_field(required=True, default="")
514+
database_iri: str = esdsl.mapped_field(required=True, default="")
515+
item_iri: str = esdsl.mapped_field(required=True, default="")
516+
sessionhour_id: str = esdsl.mapped_field(esdsl.Keyword(), default="")
517+
within_iris: list[str] = esdsl.mapped_field(esdsl.Keyword(), default_factory=list)
452518

453519
class Meta:
454520
abstract = True
@@ -457,9 +523,9 @@ class Meta:
457523
def record(
458524
cls,
459525
*,
460-
# each usage record needs a sessionhour_id -- for migrating old data, can set explicitly
526+
# each usage record needs a sessionhour_id -- for migrating old data, can set explicitly...
461527
sessionhour_id: str = "",
462-
# ...but when saving new data, give either the dirty identifying strings
528+
# ...but when saving new data, give either the dirty identifying strings:
463529
user_id: str = "",
464530
session_id: str = "",
465531
request_host: str = "",
@@ -518,7 +584,7 @@ def djelme_imp_kwargs(self) -> dict[str, str]: # for ProtoDjelmeBackend
518584
@property
519585
def elastic8_client(self) -> Elastic8Client:
520586
# assumes `connections.configure` was already called
521-
return connections.get_connection(self._elastic8dsl_connection_name)
587+
return esdsl.connections.get_connection(self._elastic8dsl_connection_name)
522588

523589
@property
524590
def _elastic8dsl_connection_name(self) -> str:
@@ -546,6 +612,7 @@ def djelme_teardown(self, recordtypes: collections.abc.Iterable[type]) -> None:
546612
# for static type-checking; verify intent
547613
_: type[ProtoCountedUsage] = CountedUsageRecord
548614
__: type[ProtoDjelmeBackend] = DjelmeElastic8Backend
615+
___: type[ProtoDjelmeRecord] = DjelmeRecordtype
549616

550617
###
551618
# names expected by ProtoDjelmeImp
@@ -556,7 +623,7 @@ def djelme_teardown(self, recordtypes: collections.abc.Iterable[type]) -> None:
556623
def djelme_when_ready( # for ProtoDjelmeImp
557624
backends: collections.abc.Iterable[ProtoDjelmeBackend],
558625
) -> None:
559-
connections.configure(
626+
esdsl.connections.configure(
560627
**{
561628
_backend._elastic8dsl_connection_name: _backend._elastic8dsl_connection_kwargs
562629
for _backend in backends

0 commit comments

Comments
 (0)