Skip to content

Commit 8666fd5

Browse files
add TS, FUSE and INLINE STATS commands
1 parent 8820a00 commit 8666fd5

File tree

2 files changed

+344
-7
lines changed

2 files changed

+344
-7
lines changed

elasticsearch/esql/esql.py

Lines changed: 180 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import json
1919
import re
2020
from abc import ABC, abstractmethod
21-
from typing import Any, Dict, Optional, Tuple, Type, Union
21+
from typing import Any, Dict, List, Optional, Tuple, Type, Union
2222

2323
from ..dsl.document_base import DocumentBase, InstrumentedExpression, InstrumentedField
2424

@@ -78,6 +78,22 @@ def show(item: str) -> "Show":
7878
"""
7979
return Show(item)
8080

81+
@staticmethod
82+
def ts(*indices: IndexType) -> "TS":
83+
"""The ``TS`` source command is similar to ``FROM``, but for time series indices.
84+
85+
:param indices: A list of indices, data streams or aliases. Supports wildcards and date math.
86+
87+
Examples::
88+
89+
query = (
90+
ESQL.ts("metrics")
91+
.where("@timestamp >= now() - 1 day")
92+
.stats("SUM(AVG_OVER_TIME(memory_usage)").by("host", "TBUCKET(1 hour)")
93+
)
94+
"""
95+
return TS(*indices)
96+
8197
@staticmethod
8298
def branch() -> "Branch":
8399
"""This method can only be used inside a ``FORK`` command to create each branch.
@@ -314,6 +330,51 @@ def fork(
314330
raise ValueError("a query can only have one fork")
315331
return Fork(self, fork1, fork2, fork3, fork4, fork5, fork6, fork7, fork8)
316332

333+
def fuse(self, method: Optional[str] = None) -> "Fuse":
334+
"""The ``FUSE`` processing command merges rows from multiple result sets and assigns
335+
new relevance scores.
336+
337+
:param method: Defaults to ``RRF``. Can be one of ``RRF`` (for Reciprocal Rank Fusion)
338+
or ``LINEAR`` (for linear combination of scores). Designates which
339+
method to use to assign new relevance scores.
340+
341+
Examples::
342+
343+
query1 = (
344+
ESQL.from_("books").metadata("_id", "_index", "_score")
345+
.fork(
346+
ESQL.branch().where('title:"Shakespeare"').sort("_score DESC"),
347+
ESQL.branch().where('semantic_title:"Shakespeare"').sort("_score DESC"),
348+
)
349+
.fuse()
350+
)
351+
query2 = (
352+
ESQL.from_("books").metadata("_id", "_index", "_score")
353+
.fork(
354+
ESQL.branch().where('title:"Shakespeare"').sort("_score DESC"),
355+
ESQL.branch().where('semantic_title:"Shakespeare"').sort("_score DESC"),
356+
)
357+
.fuse("linear")
358+
)
359+
query3 = (
360+
ESQL.from_("books").metadata("_id", "_index", "_score")
361+
.fork(
362+
ESQL.branch().where('title:"Shakespeare"').sort("_score DESC"),
363+
ESQL.branch().where('semantic_title:"Shakespeare"').sort("_score DESC"),
364+
)
365+
.fuse("linear").by("title", "description")
366+
)
367+
query4 = (
368+
ESQL.from_("books").metadata("_id", "_index", "_score")
369+
.fork(
370+
ESQL.branch().where('title:"Shakespeare"').sort("_score DESC"),
371+
ESQL.branch().where('semantic_title:"Shakespeare"').sort("_score DESC"),
372+
)
373+
.fuse("linear").with_(normalizer="minmax")
374+
)
375+
"""
376+
return Fuse(self, method)
377+
317378
def grok(self, input: FieldType, pattern: str) -> "Grok":
318379
"""``GROK`` enables you to extract structured data out of a string.
319380
@@ -348,6 +409,58 @@ def grok(self, input: FieldType, pattern: str) -> "Grok":
348409
"""
349410
return Grok(self, input, pattern)
350411

412+
def inline_stats(
413+
self, *expressions: ExpressionType, **named_expressions: ExpressionType
414+
) -> "Stats":
415+
"""The ``INLINE STATS`` processing command groups rows according to a common value
416+
and calculates one or more aggregated values over the grouped rows.
417+
418+
The command is identical to ``STATS`` except that it preserves all the columns from
419+
the input table.
420+
421+
:param expressions: A list of expressions, given as positional arguments.
422+
:param named_expressions: A list of expressions, given as keyword arguments. The
423+
argument names are used for the returned aggregated values.
424+
425+
Note that only one of ``expressions`` and ``named_expressions`` must be provided.
426+
427+
Examples::
428+
429+
query1 = (
430+
ESQL.from_("employees")
431+
.keep("emp_no", "languages", "salary")
432+
.inline_stats(max_salary=functions.max(E("salary"))).by("languages")
433+
)
434+
query2 = (
435+
ESQL.from_("employees")
436+
.keep("emp_no", "languages", "salary")
437+
.inline_stats(max_salary=functions.max(E("salary")))
438+
)
439+
query3 = (
440+
ESQL.from_("employees")
441+
.where("still_hired")
442+
.keep("emp_no", "languages", "salary", "hire_date")
443+
.eval(tenure=functions.date_diff("year", E("hire_date"), "2025-09-18T00:00:00"))
444+
.drop("hire_date")
445+
.inline_stats(
446+
avg_salary=functions.avg(E("salary")),
447+
count=functions.count(E("*")),
448+
)
449+
.by("languages", "tenure")
450+
)
451+
query4 = (
452+
ESQL.from_("employees")
453+
.keep("emp_no", "salary")
454+
.inline_stats(
455+
avg_lt_50=functions.round(functions.avg(E("salary"))).where(E("salary") < 50000),
456+
avg_lt_60=functions.round(functions.avg(E("salary"))).where(E("salary") >= 50000, E("salary") < 60000),
457+
avg_gt_60=functions.round(functions.avg(E("salary"))).where(E("salary") >= 60000),
458+
)
459+
)
460+
461+
"""
462+
return InlineStats(self, *expressions, **named_expressions)
463+
351464
def keep(self, *columns: FieldType) -> "Keep":
352465
"""The ``KEEP`` processing command enables you to specify what columns are returned
353466
and the order in which they are returned.
@@ -629,13 +742,15 @@ class From(ESQLBase):
629742
in a single expression.
630743
"""
631744

745+
command_name = "FROM"
746+
632747
def __init__(self, *indices: IndexType):
633748
super().__init__()
634749
self._indices = indices
635750
self._metadata_fields: Tuple[FieldType, ...] = tuple()
636751

637752
def metadata(self, *fields: FieldType) -> "From":
638-
"""Continuation of the ``FROM`` source command.
753+
"""Continuation of the ``FROM`` and ``TS`` source commands.
639754
640755
:param fields: metadata fields to retrieve, given as positional arguments.
641756
"""
@@ -644,7 +759,7 @@ def metadata(self, *fields: FieldType) -> "From":
644759

645760
def _render_internal(self) -> str:
646761
indices = [self._format_index(index) for index in self._indices]
647-
s = f'{self.__class__.__name__.upper()} {", ".join(indices)}'
762+
s = f'{self.command_name} {", ".join(indices)}'
648763
if self._metadata_fields:
649764
s = (
650765
s
@@ -692,6 +807,17 @@ def _render_internal(self) -> str:
692807
return f"SHOW {self._format_id(self._item)}"
693808

694809

810+
class TS(From):
811+
"""Implementation of the ``TS`` source command.
812+
813+
This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
814+
to make it possible to chain all the commands that belong to an ES|QL query
815+
in a single expression.
816+
"""
817+
818+
command_name = "TS"
819+
820+
695821
class Branch(ESQLBase):
696822
"""Implementation of a branch inside a ``FORK`` processing command.
697823
@@ -978,6 +1104,39 @@ def _render_internal(self) -> str:
9781104
return f"FORK {cmds}"
9791105

9801106

1107+
class Fuse(ESQLBase):
1108+
"""Implementation of the ``FUSE`` processing command.
1109+
1110+
This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
1111+
to make it possible to chain all the commands that belong to an ES|QL query
1112+
in a single expression.
1113+
"""
1114+
1115+
def __init__(self, parent: ESQLBase, method: Optional[str] = None):
1116+
super().__init__(parent)
1117+
self.method = method
1118+
self.by_columns: List[FieldType] = []
1119+
self.options = {}
1120+
1121+
def by(self, *columns: FieldType) -> "Fuse":
1122+
self.by_columns += list(columns)
1123+
return self
1124+
1125+
def with_(self, **options: Any) -> "Fuse":
1126+
self.options = options
1127+
return self
1128+
1129+
def _render_internal(self) -> str:
1130+
method = f" {self.method.upper()}" if self.method else ""
1131+
by = (
1132+
" " + " ".join([f"BY {column}" for column in self.by_columns])
1133+
if self.by_columns
1134+
else ""
1135+
)
1136+
with_ = " WITH " + json.dumps(self.options) if self.options else ""
1137+
return f"FUSE{method}{by}{with_}"
1138+
1139+
9811140
class Grok(ESQLBase):
9821141
"""Implementation of the ``GROK`` processing command.
9831142
@@ -1201,6 +1360,8 @@ class Stats(ESQLBase):
12011360
in a single expression.
12021361
"""
12031362

1363+
command_name = "STATS"
1364+
12041365
def __init__(
12051366
self,
12061367
parent: ESQLBase,
@@ -1216,7 +1377,7 @@ def __init__(
12161377
self._grouping_expressions: Optional[Tuple[ExpressionType, ...]] = None
12171378

12181379
def by(self, *grouping_expressions: ExpressionType) -> "Stats":
1219-
"""Continuation of the ``STATS`` command.
1380+
"""Continuation of the ``STATS`` and ``INLINE STATS`` commands.
12201381
12211382
:param grouping_expressions: Expressions that output the values to group by.
12221383
If their names coincide with one of the computed
@@ -1233,13 +1394,25 @@ def _render_internal(self) -> str:
12331394
]
12341395
else:
12351396
exprs = [f"{self._format_expr(expr)}" for expr in self._expressions]
1236-
expression_separator = ",\n "
1397+
indent = " " * (len(self.command_name) + 3)
1398+
expression_separator = f",\n{indent}"
12371399
by = (
12381400
""
12391401
if self._grouping_expressions is None
1240-
else f'\n BY {", ".join([f"{self._format_expr(expr)}" for expr in self._grouping_expressions])}'
1402+
else f'\n{indent}BY {", ".join([f"{self._format_expr(expr)}" for expr in self._grouping_expressions])}'
12411403
)
1242-
return f'STATS {expression_separator.join([f"{expr}" for expr in exprs])}{by}'
1404+
return f'{self.command_name} {expression_separator.join([f"{expr}" for expr in exprs])}{by}'
1405+
1406+
1407+
class InlineStats(Stats):
1408+
"""Implementation of the ``INLINE STATS`` processing command.
1409+
1410+
This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
1411+
to make it possible to chain all the commands that belong to an ES|QL query
1412+
in a single expression.
1413+
"""
1414+
1415+
command_name = "INLINE STATS"
12431416

12441417

12451418
class Where(ESQLBase):

0 commit comments

Comments
 (0)