Skip to content

Commit 2943c87

Browse files
ES|QL query builder integration with the DSL module
1 parent 4761d56 commit 2943c87

File tree

4 files changed

+212
-1
lines changed

4 files changed

+212
-1
lines changed

elasticsearch/dsl/_async/document.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
TYPE_CHECKING,
2121
Any,
2222
AsyncIterable,
23+
AsyncIterator,
2324
Dict,
2425
List,
2526
Optional,
@@ -43,6 +44,8 @@
4344
if TYPE_CHECKING:
4445
from elasticsearch import AsyncElasticsearch
4546

47+
from ...esql.esql import ESQLBase
48+
4649

4750
class AsyncIndexMeta(DocumentMeta):
4851
_index: AsyncIndex
@@ -520,3 +523,83 @@ async def __anext__(self) -> Dict[str, Any]:
520523
return action
521524

522525
return await async_bulk(es, Generate(actions), **kwargs)
526+
527+
@classmethod
528+
async def esql_execute(
529+
cls,
530+
query: "ESQLBase",
531+
return_additional: bool = False,
532+
ignore_missing_fields: bool = False,
533+
using: Optional[AsyncUsingType] = None,
534+
**kwargs: Any,
535+
) -> AsyncIterator[Union[Self, Tuple[Self, Dict[str, Any]]]]:
536+
"""
537+
Execute the given ES|QL query and return an iterator of 2-element tuples,
538+
where the first element is an instance of this ``Document`` and the
539+
second a dictionary with any remaining columns requested in the query.
540+
541+
:arg query: an ES|QL query object created with the ``esql_from()`` method.
542+
:arg return_additional: if ``False`` (the default), this method returns
543+
document objects. If set to ``True``, the method returns tuples with
544+
a document in the first element and a dictionary with any additional
545+
columns returned by the query in the second element.
546+
:arg ignore_missing_fields: if ``False`` (the default), all the fields of
547+
the document must be present in the query, or else an exception is
548+
raised. Set to ``True`` to allow missing fields, which will result in
549+
partially initialized document objects.
550+
:arg using: connection alias to use, defaults to ``'default'``
551+
:arg kwargs: additional options for the ``client.esql.query()`` function.
552+
"""
553+
es = cls._get_connection(using)
554+
response = await es.esql.query(query=str(query), **kwargs)
555+
query_columns = [col["name"] for col in response.body.get("columns", [])]
556+
557+
# Here we get the list of columns defined in the document, which are the
558+
# columns that we will take from each result to assemble the document
559+
# object.
560+
# When `for_esql=False` is passed below by default, the list will include
561+
# nested fields, which ES|QL does not return, causing an error. When passing
562+
# `ignore_missing_fields=True` the list will be generated with
563+
# `for_esql=True`, so the error will not occur, but the documents will
564+
# not have any nested objects in them.
565+
doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields))
566+
if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)):
567+
raise ValueError(
568+
f"Not all fields of {cls.__name__} were returned by the query. "
569+
"Make sure your document does not use Nested fields, which are "
570+
"currently not supported in ES|QL."
571+
)
572+
non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"}
573+
index_id = query_columns.index("_id")
574+
575+
results = response.body.get("values", [])
576+
for column_values in results:
577+
# create a dictionary with all the document fields, expanding the
578+
# dot notation returned by ES|QL into the recursive dictionaries
579+
# used by Document.from_dict()
580+
doc_dict: Dict[str, Any] = {}
581+
for col, val in zip(query_columns, column_values):
582+
if col in doc_fields:
583+
cols = col.split(".")
584+
d = doc_dict
585+
for c in cols[:-1]:
586+
if c not in d:
587+
d[c] = {}
588+
d = d[c]
589+
d[cols[-1]] = val
590+
591+
# create the document instance
592+
obj = cls(meta={"_id": column_values[index_id]})
593+
obj._from_dict(doc_dict)
594+
595+
if return_additional:
596+
# build a dict with any other values included in the response
597+
other = {
598+
col: val
599+
for col, val in zip(query_columns, column_values)
600+
if col in non_doc_fields
601+
}
602+
603+
yield obj, other
604+
else:
605+
yield obj

elasticsearch/dsl/_sync/document.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Any,
2222
Dict,
2323
Iterable,
24+
Iterator,
2425
List,
2526
Optional,
2627
Tuple,
@@ -43,6 +44,8 @@
4344
if TYPE_CHECKING:
4445
from elasticsearch import Elasticsearch
4546

47+
from ...esql.esql import ESQLBase
48+
4649

4750
class IndexMeta(DocumentMeta):
4851
_index: Index
@@ -512,3 +515,83 @@ def __next__(self) -> Dict[str, Any]:
512515
return action
513516

514517
return bulk(es, Generate(actions), **kwargs)
518+
519+
@classmethod
520+
def esql_execute(
521+
cls,
522+
query: "ESQLBase",
523+
return_additional: bool = False,
524+
ignore_missing_fields: bool = False,
525+
using: Optional[UsingType] = None,
526+
**kwargs: Any,
527+
) -> Iterator[Union[Self, Tuple[Self, Dict[str, Any]]]]:
528+
"""
529+
Execute the given ES|QL query and return an iterator of 2-element tuples,
530+
where the first element is an instance of this ``Document`` and the
531+
second a dictionary with any remaining columns requested in the query.
532+
533+
:arg query: an ES|QL query object created with the ``esql_from()`` method.
534+
:arg return_additional: if ``False`` (the default), this method returns
535+
document objects. If set to ``True``, the method returns tuples with
536+
a document in the first element and a dictionary with any additional
537+
columns returned by the query in the second element.
538+
:arg ignore_missing_fields: if ``False`` (the default), all the fields of
539+
the document must be present in the query, or else an exception is
540+
raised. Set to ``True`` to allow missing fields, which will result in
541+
partially initialized document objects.
542+
:arg using: connection alias to use, defaults to ``'default'``
543+
:arg kwargs: additional options for the ``client.esql.query()`` function.
544+
"""
545+
es = cls._get_connection(using)
546+
response = es.esql.query(query=str(query), **kwargs)
547+
query_columns = [col["name"] for col in response.body.get("columns", [])]
548+
549+
# Here we get the list of columns defined in the document, which are the
550+
# columns that we will take from each result to assemble the document
551+
# object.
552+
# When `for_esql=False` is passed below by default, the list will include
553+
# nested fields, which ES|QL does not return, causing an error. When passing
554+
# `ignore_missing_fields=True` the list will be generated with
555+
# `for_esql=True`, so the error will not occur, but the documents will
556+
# not have any nested objects in them.
557+
doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields))
558+
if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)):
559+
raise ValueError(
560+
f"Not all fields of {cls.__name__} were returned by the query. "
561+
"Make sure your document does not use Nested fields, which are "
562+
"currently not supported in ES|QL."
563+
)
564+
non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"}
565+
index_id = query_columns.index("_id")
566+
567+
results = response.body.get("values", [])
568+
for column_values in results:
569+
# create a dictionary with all the document fields, expanding the
570+
# dot notation returned by ES|QL into the recursive dictionaries
571+
# used by Document.from_dict()
572+
doc_dict: Dict[str, Any] = {}
573+
for col, val in zip(query_columns, column_values):
574+
if col in doc_fields:
575+
cols = col.split(".")
576+
d = doc_dict
577+
for c in cols[:-1]:
578+
if c not in d:
579+
d[c] = {}
580+
d = d[c]
581+
d[cols[-1]] = val
582+
583+
# create the document instance
584+
obj = cls(meta={"_id": column_values[index_id]})
585+
obj._from_dict(doc_dict)
586+
587+
if return_additional:
588+
# build a dict with any other values included in the response
589+
other = {
590+
col: val
591+
for col, val in zip(query_columns, column_values)
592+
if col in non_doc_fields
593+
}
594+
595+
yield obj, other
596+
else:
597+
yield obj

elasticsearch/dsl/document_base.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
if TYPE_CHECKING:
5050
from elastic_transport import ObjectApiResponse
5151

52+
from ..esql.esql import ESQLBase
5253
from .index_base import IndexBase
5354

5455

@@ -602,3 +603,44 @@ def to_dict(self, include_meta: bool = False, skip_empty: bool = True) -> Dict[s
602603

603604
meta["_source"] = d
604605
return meta
606+
607+
@classmethod
608+
def _get_field_names(
609+
cls, for_esql: bool = False, nested_class: Optional[type[InnerDoc]] = None
610+
) -> List[str]:
611+
"""Return the list of field names used by this document.
612+
If the document has nested objects, their fields are reported using dot
613+
notation. If the ``for_esql`` argument is set to ``True``, the list omits
614+
nested fields, which are currently unsupported in ES|QL.
615+
"""
616+
fields = []
617+
class_ = nested_class or cls
618+
for field_name in class_._doc_type.mapping:
619+
field = class_._doc_type.mapping[field_name]
620+
if isinstance(field, Object):
621+
if for_esql and isinstance(field, Nested):
622+
# ES|QL does not recognize Nested fields at this time
623+
continue
624+
sub_fields = cls._get_field_names(
625+
for_esql=for_esql, nested_class=field._doc_class
626+
)
627+
for sub_field in sub_fields:
628+
fields.append(f"{field_name}.{sub_field}")
629+
else:
630+
fields.append(field_name)
631+
return fields
632+
633+
@classmethod
634+
def esql_from(cls) -> "ESQLBase":
635+
"""Return a base ES|QL query for instances of this document class.
636+
637+
The returned query is initialized with ``FROM`` and ``KEEP`` statements,
638+
and can be completed as desired.
639+
"""
640+
from ..esql import ESQL # here to avoid circular imports
641+
642+
return (
643+
ESQL.from_(cls)
644+
.metadata("_id")
645+
.keep("_id", *tuple(cls._get_field_names(for_esql=True)))
646+
)

elasticsearch/esql/esql.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ class ESQLBase(ABC):
9797
commands, used to build ES|QL queries.
9898
"""
9999

100-
def __init__(self, parent: Optional["ESQLBase"] = None):
100+
def __init__(
101+
self,
102+
parent: Optional["ESQLBase"] = None,
103+
):
101104
self._parent = parent
102105

103106
def __repr__(self) -> str:

0 commit comments

Comments
 (0)