Skip to content

Commit 68d8731

Browse files
[Backport 8.19] ES|QL query builder integration with the DSL module (elastic#3056)
* ES|QL query builder integration with the DSL module (elastic#3048) * ES|QL query builder integration with the DSL module * esql DSL tests * more esql DSL tests * documentation * add esql+dsl example * review feedback (cherry picked from commit 228e66c) * python 3.8 typing fix --------- Co-authored-by: Miguel Grinberg <[email protected]>
1 parent 25f3273 commit 68d8731

File tree

16 files changed

+920
-49
lines changed

16 files changed

+920
-49
lines changed

docs/guide/dsl/howto.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ include::search_dsl.asciidoc[]
44
include::persistence.asciidoc[]
55
include::faceted_search.asciidoc[]
66
include::update_by_query.asciidoc[]
7+
include::esql.asciidoc[]
78
include::asyncio.asciidoc[]

docs/guide/dsl/tutorials.asciidoc

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,17 @@ system:
8383
[source,python]
8484
----
8585
from datetime import datetime
86-
from elasticsearch.dsl import Document, Date, Integer, Keyword, Text, connections
86+
from elasticsearch.dsl import Document, Date, Integer, Keyword, Text, connections, mapped_field
8787
8888
# Define a default Elasticsearch client
8989
connections.create_connection(hosts="https://localhost:9200")
9090
9191
class Article(Document):
92-
title = Text(analyzer='snowball', fields={'raw': Keyword()})
93-
body = Text(analyzer='snowball')
94-
tags = Keyword()
95-
published_from = Date()
96-
lines = Integer()
92+
title: str = mapped_field(Text(analyzer='snowball', fields={'raw': Keyword()}))
93+
body: str = mapped_field(Text(analyzer='snowball'))
94+
tags: list[str] = mapped_field(Keyword())
95+
published_from: datetime
96+
lines: int
9797
9898
class Index:
9999
name = 'blog'
@@ -229,13 +229,31 @@ savings offered by the `Search` object, and additionally allows one to
229229
update the results of the search based on a script assigned in the same
230230
manner.
231231

232+
==== ES|QL Queries
233+
234+
The DSL module features an integration with the ES|QL query builder, consisting of two methods available in all `Document` sub-classes: `esql_from()` and `esql_execute()`. Using the `Article` document from above, we can search for up to ten articles that include `"world"` in their titles with the following ES|QL query:
235+
236+
[source,python]
237+
----
238+
from elasticsearch.esql import functions
239+
240+
query = Article.esql_from().where(functions.match(Article.title, 'world')).limit(10)
241+
for a in Article.esql_execute(query):
242+
print(a.title)
243+
----
244+
245+
Review the ES|QL Query Builder section to learn more about building ES|QL queries in Python.
246+
232247
==== Migration from the standard client
233248

249+
<<<<<<< HEAD:docs/guide/dsl/tutorials.asciidoc
234250
You don't have to port your entire application to get the benefits of
235251
the DSL module, you can start gradually by creating a `Search` object
236252
from your existing `dict`, modifying it using the API and serializing it
237253
back to a `dict`:
238254

255+
==== Migration from the standard client
256+
239257
[source,python]
240258
----
241259
body = {...} # insert complicated query here

docs/guide/esql-query-builder.asciidoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@ You can then see the assembled ES|QL query by printing the resulting query objec
2121

2222
[source, python]
2323
----------------------------
24-
>>> query
24+
>>> print(query)
2525
FROM employees
2626
| SORT emp_no
2727
| KEEP first_name, last_name, height
2828
| EVAL height_feet = height * 3.281, height_cm = height * 100
2929
| LIMIT 3
3030
----------------------------
3131

32-
To execute this query, you can cast it to a string and pass the string to the `client.esql.query()` endpoint:
32+
To execute this query, you can pass it to the `client.esql.query()` endpoint:
3333

3434
[source, python]
3535
----------------------------
3636
>>> from elasticsearch import Elasticsearch
3737
>>> client = Elasticsearch(hosts=[os.environ['ELASTICSEARCH_URL']])
38-
>>> response = client.esql.query(query=str(query))
38+
>>> response = client.esql.query(query=query)
3939
----------------------------
4040

4141
The response body contains a `columns` attribute with the list of columns included in the results, and a `values` attribute with the list of results for the query, each given as a list of column values. Here is a possible response body returned by the example query given above:
@@ -228,7 +228,7 @@ def find_employee_by_name(name):
228228
.keep("first_name", "last_name", "height")
229229
.where(E("first_name") == E("?"))
230230
)
231-
return client.esql.query(query=str(query), params=[name])
231+
return client.esql.query(query=query, params=[name])
232232
----------------------------
233233

234234
Here the part of the query in which the untrusted data needs to be inserted is replaced with a parameter, which in ES|QL is defined by the question mark. When using Python expressions, the parameter must be given as `E("?")` so that it is treated as an expression and not as a literal string.

elasticsearch/dsl/_async/document.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
TYPE_CHECKING,
2121
Any,
2222
AsyncIterable,
23+
AsyncIterator,
2324
Dict,
2425
List,
2526
Optional,
@@ -42,6 +43,7 @@
4243

4344
if TYPE_CHECKING:
4445
from elasticsearch import AsyncElasticsearch
46+
from elasticsearch.esql.esql import ESQLBase
4547

4648

4749
class AsyncIndexMeta(DocumentMeta):
@@ -520,3 +522,85 @@ async def __anext__(self) -> Dict[str, Any]:
520522
return action
521523

522524
return await async_bulk(es, Generate(actions), **kwargs)
525+
526+
@classmethod
527+
async def esql_execute(
528+
cls,
529+
query: "ESQLBase",
530+
return_additional: bool = False,
531+
ignore_missing_fields: bool = False,
532+
using: Optional[AsyncUsingType] = None,
533+
**kwargs: Any,
534+
) -> AsyncIterator[Union[Self, Tuple[Self, Dict[str, Any]]]]:
535+
"""
536+
Execute the given ES|QL query and return an iterator of 2-element tuples,
537+
where the first element is an instance of this ``Document`` and the
538+
second a dictionary with any remaining columns requested in the query.
539+
540+
:arg query: an ES|QL query object created with the ``esql_from()`` method.
541+
:arg return_additional: if ``False`` (the default), this method returns
542+
document objects. If set to ``True``, the method returns tuples with
543+
a document in the first element and a dictionary with any additional
544+
columns returned by the query in the second element.
545+
:arg ignore_missing_fields: if ``False`` (the default), all the fields of
546+
the document must be present in the query, or else an exception is
547+
raised. Set to ``True`` to allow missing fields, which will result in
548+
partially initialized document objects.
549+
:arg using: connection alias to use, defaults to ``'default'``
550+
:arg kwargs: additional options for the ``client.esql.query()`` function.
551+
"""
552+
es = cls._get_connection(using)
553+
response = await es.esql.query(query=str(query), **kwargs)
554+
query_columns = [col["name"] for col in response.body.get("columns", [])]
555+
556+
# Here we get the list of columns defined in the document, which are the
557+
# columns that we will take from each result to assemble the document
558+
# object.
559+
# When `for_esql=False` is passed below by default, the list will include
560+
# nested fields, which ES|QL does not return, causing an error. When passing
561+
# `ignore_missing_fields=True` the list will be generated with
562+
# `for_esql=True`, so the error will not occur, but the documents will
563+
# not have any Nested objects in them.
564+
doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields))
565+
if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)):
566+
raise ValueError(
567+
f"Not all fields of {cls.__name__} were returned by the query. "
568+
"Make sure your document does not use Nested fields, which are "
569+
"currently not supported in ES|QL. To force the query to be "
570+
"evaluated in spite of the missing fields, pass set the "
571+
"ignore_missing_fields=True option in the esql_execute() call."
572+
)
573+
non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"}
574+
index_id = query_columns.index("_id")
575+
576+
results = response.body.get("values", [])
577+
for column_values in results:
578+
# create a dictionary with all the document fields, expanding the
579+
# dot notation returned by ES|QL into the recursive dictionaries
580+
# used by Document.from_dict()
581+
doc_dict: Dict[str, Any] = {}
582+
for col, val in zip(query_columns, column_values):
583+
if col in doc_fields:
584+
cols = col.split(".")
585+
d = doc_dict
586+
for c in cols[:-1]:
587+
if c not in d:
588+
d[c] = {}
589+
d = d[c]
590+
d[cols[-1]] = val
591+
592+
# create the document instance
593+
obj = cls(meta={"_id": column_values[index_id]})
594+
obj._from_dict(doc_dict)
595+
596+
if return_additional:
597+
# build a dict with any other values included in the response
598+
other = {
599+
col: val
600+
for col, val in zip(query_columns, column_values)
601+
if col in non_doc_fields
602+
}
603+
604+
yield obj, other
605+
else:
606+
yield obj

elasticsearch/dsl/_sync/document.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Any,
2222
Dict,
2323
Iterable,
24+
Iterator,
2425
List,
2526
Optional,
2627
Tuple,
@@ -42,6 +43,7 @@
4243

4344
if TYPE_CHECKING:
4445
from elasticsearch import Elasticsearch
46+
from elasticsearch.esql.esql import ESQLBase
4547

4648

4749
class IndexMeta(DocumentMeta):
@@ -512,3 +514,85 @@ def __next__(self) -> Dict[str, Any]:
512514
return action
513515

514516
return bulk(es, Generate(actions), **kwargs)
517+
518+
@classmethod
519+
def esql_execute(
520+
cls,
521+
query: "ESQLBase",
522+
return_additional: bool = False,
523+
ignore_missing_fields: bool = False,
524+
using: Optional[UsingType] = None,
525+
**kwargs: Any,
526+
) -> Iterator[Union[Self, Tuple[Self, Dict[str, Any]]]]:
527+
"""
528+
Execute the given ES|QL query and return an iterator of 2-element tuples,
529+
where the first element is an instance of this ``Document`` and the
530+
second a dictionary with any remaining columns requested in the query.
531+
532+
:arg query: an ES|QL query object created with the ``esql_from()`` method.
533+
:arg return_additional: if ``False`` (the default), this method returns
534+
document objects. If set to ``True``, the method returns tuples with
535+
a document in the first element and a dictionary with any additional
536+
columns returned by the query in the second element.
537+
:arg ignore_missing_fields: if ``False`` (the default), all the fields of
538+
the document must be present in the query, or else an exception is
539+
raised. Set to ``True`` to allow missing fields, which will result in
540+
partially initialized document objects.
541+
:arg using: connection alias to use, defaults to ``'default'``
542+
:arg kwargs: additional options for the ``client.esql.query()`` function.
543+
"""
544+
es = cls._get_connection(using)
545+
response = es.esql.query(query=str(query), **kwargs)
546+
query_columns = [col["name"] for col in response.body.get("columns", [])]
547+
548+
# Here we get the list of columns defined in the document, which are the
549+
# columns that we will take from each result to assemble the document
550+
# object.
551+
# When `for_esql=False` is passed below by default, the list will include
552+
# nested fields, which ES|QL does not return, causing an error. When passing
553+
# `ignore_missing_fields=True` the list will be generated with
554+
# `for_esql=True`, so the error will not occur, but the documents will
555+
# not have any Nested objects in them.
556+
doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields))
557+
if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)):
558+
raise ValueError(
559+
f"Not all fields of {cls.__name__} were returned by the query. "
560+
"Make sure your document does not use Nested fields, which are "
561+
"currently not supported in ES|QL. To force the query to be "
562+
"evaluated in spite of the missing fields, pass set the "
563+
"ignore_missing_fields=True option in the esql_execute() call."
564+
)
565+
non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"}
566+
index_id = query_columns.index("_id")
567+
568+
results = response.body.get("values", [])
569+
for column_values in results:
570+
# create a dictionary with all the document fields, expanding the
571+
# dot notation returned by ES|QL into the recursive dictionaries
572+
# used by Document.from_dict()
573+
doc_dict: Dict[str, Any] = {}
574+
for col, val in zip(query_columns, column_values):
575+
if col in doc_fields:
576+
cols = col.split(".")
577+
d = doc_dict
578+
for c in cols[:-1]:
579+
if c not in d:
580+
d[c] = {}
581+
d = d[c]
582+
d[cols[-1]] = val
583+
584+
# create the document instance
585+
obj = cls(meta={"_id": column_values[index_id]})
586+
obj._from_dict(doc_dict)
587+
588+
if return_additional:
589+
# build a dict with any other values included in the response
590+
other = {
591+
col: val
592+
for col, val in zip(query_columns, column_values)
593+
if col in non_doc_fields
594+
}
595+
596+
yield obj, other
597+
else:
598+
yield obj

elasticsearch/dsl/document_base.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
List,
2929
Optional,
3030
Tuple,
31+
Type,
3132
TypeVar,
3233
Union,
3334
get_args,
@@ -49,6 +50,7 @@
4950
if TYPE_CHECKING:
5051
from elastic_transport import ObjectApiResponse
5152

53+
from ..esql.esql import ESQLBase
5254
from .index_base import IndexBase
5355

5456

@@ -602,3 +604,44 @@ def to_dict(self, include_meta: bool = False, skip_empty: bool = True) -> Dict[s
602604

603605
meta["_source"] = d
604606
return meta
607+
608+
@classmethod
609+
def _get_field_names(
610+
cls, for_esql: bool = False, nested_class: Optional[Type[InnerDoc]] = None
611+
) -> List[str]:
612+
"""Return the list of field names used by this document.
613+
If the document has nested objects, their fields are reported using dot
614+
notation. If the ``for_esql`` argument is set to ``True``, the list omits
615+
nested fields, which are currently unsupported in ES|QL.
616+
"""
617+
fields = []
618+
class_ = nested_class or cls
619+
for field_name in class_._doc_type.mapping:
620+
field = class_._doc_type.mapping[field_name]
621+
if isinstance(field, Object):
622+
if for_esql and isinstance(field, Nested):
623+
# ES|QL does not recognize Nested fields at this time
624+
continue
625+
sub_fields = cls._get_field_names(
626+
for_esql=for_esql, nested_class=field._doc_class
627+
)
628+
for sub_field in sub_fields:
629+
fields.append(f"{field_name}.{sub_field}")
630+
else:
631+
fields.append(field_name)
632+
return fields
633+
634+
@classmethod
635+
def esql_from(cls) -> "ESQLBase":
636+
"""Return a base ES|QL query for instances of this document class.
637+
638+
The returned query is initialized with ``FROM`` and ``KEEP`` statements,
639+
and can be completed as desired.
640+
"""
641+
from ..esql import ESQL # here to avoid circular imports
642+
643+
return (
644+
ESQL.from_(cls)
645+
.metadata("_id")
646+
.keep("_id", *tuple(cls._get_field_names(for_esql=True)))
647+
)

elasticsearch/esql/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
# under the License.
1717

1818
from ..dsl import E # noqa: F401
19-
from .esql import ESQL, and_, not_, or_ # noqa: F401
19+
from .esql import ESQL, ESQLBase, and_, not_, or_ # noqa: F401

elasticsearch/esql/functions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,11 +661,11 @@ def multi_match(
661661
"""
662662
if options is not None:
663663
return InstrumentedExpression(
664-
f"MULTI_MATCH({_render(query)}, {_render(fields)}, {_render(options)})"
664+
f'MULTI_MATCH({_render(query)}, {", ".join([_render(c) for c in fields])}, {_render(options)})'
665665
)
666666
else:
667667
return InstrumentedExpression(
668-
f"MULTI_MATCH({_render(query)}, {_render(fields)})"
668+
f'MULTI_MATCH({_render(query)}, {", ".join([_render(c) for c in fields])})'
669669
)
670670

671671

0 commit comments

Comments
 (0)