|
20 | 20 | TYPE_CHECKING, |
21 | 21 | Any, |
22 | 22 | AsyncIterable, |
| 23 | + AsyncIterator, |
23 | 24 | Dict, |
24 | 25 | List, |
25 | 26 | Optional, |
|
42 | 43 |
|
43 | 44 | if TYPE_CHECKING: |
44 | 45 | from elasticsearch import AsyncElasticsearch |
| 46 | + from elasticsearch.esql.esql import ESQLBase |
45 | 47 |
|
46 | 48 |
|
47 | 49 | class AsyncIndexMeta(DocumentMeta): |
@@ -520,3 +522,85 @@ async def __anext__(self) -> Dict[str, Any]: |
520 | 522 | return action |
521 | 523 |
|
522 | 524 | return await async_bulk(es, Generate(actions), **kwargs) |
| 525 | + |
| 526 | + @classmethod |
| 527 | + async def esql_execute( |
| 528 | + cls, |
| 529 | + query: "ESQLBase", |
| 530 | + return_additional: bool = False, |
| 531 | + ignore_missing_fields: bool = False, |
| 532 | + using: Optional[AsyncUsingType] = None, |
| 533 | + **kwargs: Any, |
| 534 | + ) -> AsyncIterator[Union[Self, Tuple[Self, Dict[str, Any]]]]: |
| 535 | + """ |
| 536 | + Execute the given ES|QL query and return an iterator of 2-element tuples, |
| 537 | + where the first element is an instance of this ``Document`` and the |
| 538 | + second a dictionary with any remaining columns requested in the query. |
| 539 | +
|
| 540 | + :arg query: an ES|QL query object created with the ``esql_from()`` method. |
| 541 | + :arg return_additional: if ``False`` (the default), this method returns |
| 542 | + document objects. If set to ``True``, the method returns tuples with |
| 543 | + a document in the first element and a dictionary with any additional |
| 544 | + columns returned by the query in the second element. |
| 545 | + :arg ignore_missing_fields: if ``False`` (the default), all the fields of |
| 546 | + the document must be present in the query, or else an exception is |
| 547 | + raised. Set to ``True`` to allow missing fields, which will result in |
| 548 | + partially initialized document objects. |
| 549 | + :arg using: connection alias to use, defaults to ``'default'`` |
| 550 | + :arg kwargs: additional options for the ``client.esql.query()`` function. |
| 551 | + """ |
| 552 | + es = cls._get_connection(using) |
| 553 | + response = await es.esql.query(query=str(query), **kwargs) |
| 554 | + query_columns = [col["name"] for col in response.body.get("columns", [])] |
| 555 | + |
| 556 | + # Here we get the list of columns defined in the document, which are the |
| 557 | + # columns that we will take from each result to assemble the document |
| 558 | + # object. |
| 559 | + # When `for_esql=False` is passed below by default, the list will include |
| 560 | + # nested fields, which ES|QL does not return, causing an error. When passing |
| 561 | + # `ignore_missing_fields=True` the list will be generated with |
| 562 | + # `for_esql=True`, so the error will not occur, but the documents will |
| 563 | + # not have any Nested objects in them. |
| 564 | + doc_fields = set(cls._get_field_names(for_esql=ignore_missing_fields)) |
| 565 | + if not ignore_missing_fields and not doc_fields.issubset(set(query_columns)): |
| 566 | + raise ValueError( |
| 567 | + f"Not all fields of {cls.__name__} were returned by the query. " |
| 568 | + "Make sure your document does not use Nested fields, which are " |
| 569 | + "currently not supported in ES|QL. To force the query to be " |
| 570 | + "evaluated in spite of the missing fields, pass set the " |
| 571 | + "ignore_missing_fields=True option in the esql_execute() call." |
| 572 | + ) |
| 573 | + non_doc_fields: set[str] = set(query_columns) - doc_fields - {"_id"} |
| 574 | + index_id = query_columns.index("_id") |
| 575 | + |
| 576 | + results = response.body.get("values", []) |
| 577 | + for column_values in results: |
| 578 | + # create a dictionary with all the document fields, expanding the |
| 579 | + # dot notation returned by ES|QL into the recursive dictionaries |
| 580 | + # used by Document.from_dict() |
| 581 | + doc_dict: Dict[str, Any] = {} |
| 582 | + for col, val in zip(query_columns, column_values): |
| 583 | + if col in doc_fields: |
| 584 | + cols = col.split(".") |
| 585 | + d = doc_dict |
| 586 | + for c in cols[:-1]: |
| 587 | + if c not in d: |
| 588 | + d[c] = {} |
| 589 | + d = d[c] |
| 590 | + d[cols[-1]] = val |
| 591 | + |
| 592 | + # create the document instance |
| 593 | + obj = cls(meta={"_id": column_values[index_id]}) |
| 594 | + obj._from_dict(doc_dict) |
| 595 | + |
| 596 | + if return_additional: |
| 597 | + # build a dict with any other values included in the response |
| 598 | + other = { |
| 599 | + col: val |
| 600 | + for col, val in zip(query_columns, column_values) |
| 601 | + if col in non_doc_fields |
| 602 | + } |
| 603 | + |
| 604 | + yield obj, other |
| 605 | + else: |
| 606 | + yield obj |
0 commit comments