Skip to content

Commit 2c2b3f2

Browse files
add FORK command
1 parent ed0d0b0 commit 2c2b3f2

File tree

1 file changed

+161
-61
lines changed

1 file changed

+161
-61
lines changed

elasticsearch/esql/esql.py

Lines changed: 161 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -77,41 +77,61 @@ def show(item: str) -> "Show":
7777
"""
7878
return Show(item)
7979

80+
@staticmethod
81+
def branch() -> "Branch":
82+
"""This method can only be used inside a ``FORK`` command to create each branch.
83+
84+
Examples::
85+
86+
ESQL.from_("employees").fork(
87+
ESQL.branch().where("emp_no == 10001"),
88+
ESQL.branch().where("emp_no == 10002"),
89+
)
90+
"""
91+
return Branch()
92+
8093

8194
class ESQLBase(ABC):
8295
""" """
8396

8497
def __init__(self, parent: Optional["ESQLBase"] = None):
85-
self.parent = parent
98+
self._parent = parent
8699

87100
def __repr__(self) -> str:
88101
return self.render()
89102

90103
def render(self) -> str:
91104
return (
92-
self.parent.render() + "\n| " if self.parent else ""
105+
self._parent.render() + "\n| " if self._parent else ""
93106
) + self._render_internal()
94107

95108
@abstractmethod
96109
def _render_internal(self) -> str:
97110
pass
98111

99-
# def change_point(self, value: FieldType) -> "ChangePoint":
100-
# """`CHANGE_POINT` detects spikes, dips, and change points in a metric.
101-
#
102-
# :param value: The column with the metric in which you want to detect a change point.
103-
#
104-
# Examples::
105-
#
106-
# (
107-
# ESQL.row(key=list(range(1, 26)))
108-
# .mv_expand("key")
109-
# .eval(value="CASE(key<13, 0, 42)")
110-
# .change_point("value").on("key")
111-
# .where("type IS NOT NULL")
112-
# )
113-
# """
114-
# return ChangePoint(self, value)
112+
def _is_forked(self) -> bool:
113+
if self.__class__.__name__ == "Fork":
114+
return True
115+
if self._parent:
116+
return self._parent._is_forked()
117+
return False
118+
119+
def change_point(self, value: FieldType) -> "ChangePoint":
120+
"""`CHANGE_POINT` detects spikes, dips, and change points in a metric.
121+
122+
:param value: The column with the metric in which you want to detect a change point.
123+
124+
Examples::
125+
126+
(
127+
ESQL.row(key=list(range(1, 26)))
128+
.mv_expand("key")
129+
.eval(value="CASE(key<13, 0, 42)")
130+
.change_point("value").on("key")
131+
.where("type IS NOT NULL")
132+
)
133+
"""
134+
return ChangePoint(self, value)
115135

116136
def dissect(self, input: FieldType, pattern: str) -> "Dissect":
117137
"""``DISSECT`` enables you to extract structured data out of a string.
@@ -191,6 +211,38 @@ def eval(self, *columns: ExpressionType, **named_columns: ExpressionType) -> "Ev
191211
"""
192212
return Eval(self, *columns, **named_columns)
193213

214+
def fork(
215+
self,
216+
fork1: "ESQLBase",
217+
fork2: Optional["ESQLBase"] = None,
218+
fork3: Optional["ESQLBase"] = None,
219+
fork4: Optional["ESQLBase"] = None,
220+
fork5: Optional["ESQLBase"] = None,
221+
fork6: Optional["ESQLBase"] = None,
222+
fork7: Optional["ESQLBase"] = None,
223+
fork8: Optional["ESQLBase"] = None,
224+
) -> "Fork":
225+
"""The ``FORK`` processing command creates multiple execution branches to operate on the
226+
same input data and combines the results in a single output table.
227+
228+
:param fork<n>: Up to 8 execution branches, created with the ``ESQL.branch()`` method.
229+
230+
Examples::
231+
232+
(
233+
ESQL.from_("employees")
234+
.fork(
235+
ESQL.branch().where("emp_no == 10001"),
236+
ESQL.branch().where("emp_no == 10002"),
237+
)
238+
.keep("emp_no", "_fork")
239+
.sort("emp_no")
240+
)
241+
"""
242+
if self._is_forked():
243+
raise ValueError("a query can only have one fork")
244+
return Fork(self, fork1, fork2, fork3, fork4, fork5, fork6, fork7, fork8)
245+
194246
def grok(self, input: FieldType, pattern: str) -> "Grok":
195247
"""``GROK`` enables you to extract structured data out of a string.
196248
@@ -509,49 +561,62 @@ def _render_internal(self) -> str:
509561
return f"SHOW {self._item}"
510562

511563

512-
# class ChangePoint(ESQLBase):
513-
# """Implementation of the ``CHANGE POINT`` processing command.
514-
#
515-
# This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
516-
# to make it possible to chain all the commands that belong to an ES|QL query
517-
# in a single expression.
518-
# """
519-
# def __init__(self, parent: ESQLBase, value: FieldType):
520-
# super().__init__(parent)
521-
# self._value = value
522-
# self._key: Optional[FieldType] = None
523-
# self._type_name: Optional[str] = None
524-
# self._pvalue_name: Optional[str] = None
525-
#
526-
# def on(self, key: FieldType) -> "ChangePoint":
527-
# """Continuation of the `CHANGE_POINT` command.
528-
#
529-
# :param key: The column with the key to order the values by. If not specified,
530-
# `@timestamp` is used.
531-
# """
532-
# self._key = key
533-
# return self
534-
#
535-
# def as_(self, type_name: str, pvalue_name: str) -> "ChangePoint":
536-
# """Continuation of the `CHANGE_POINT` command.
537-
#
538-
# :param type_name: The name of the output column with the change point type.
539-
# If not specified, `type` is used.
540-
# :param pvalue_name: The name of the output column with the p-value that indicates
541-
# how extreme the change point is. If not specified, `pvalue` is used.
542-
# """
543-
# self._type_name = type_name
544-
# self._pvalue_name = pvalue_name
545-
# return self
546-
#
547-
# def _render_internal(self) -> str:
548-
# key = "" if not self._key else f" ON {self._key}"
549-
# names = (
550-
# ""
551-
# if not self._type_name and not self._pvalue_name
552-
# else f' AS {self._type_name or "type"}, {self._pvalue_name or "pvalue"}'
553-
# )
554-
# return f"CHANGE_POINT {self._value}{key}{names}"
564+
class Branch(ESQLBase):
565+
"""Implementation of a branch inside a ``FORK`` processing command.
566+
567+
This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
568+
which makes it possible to chain all the commands that belong to the branch
569+
in a single expression.
570+
"""
571+
572+
def _render_internal(self) -> str:
573+
return ""
574+
575+
576+
class ChangePoint(ESQLBase):
577+
"""Implementation of the ``CHANGE POINT`` processing command.
578+
579+
This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
580+
to make it possible to chain all the commands that belong to an ES|QL query
581+
in a single expression.
582+
"""
583+
584+
def __init__(self, parent: ESQLBase, value: FieldType):
585+
super().__init__(parent)
586+
self._value = value
587+
self._key: Optional[FieldType] = None
588+
self._type_name: Optional[str] = None
589+
self._pvalue_name: Optional[str] = None
590+
591+
def on(self, key: FieldType) -> "ChangePoint":
592+
"""Continuation of the `CHANGE_POINT` command.
593+
594+
:param key: The column with the key to order the values by. If not specified,
595+
`@timestamp` is used.
596+
"""
597+
self._key = key
598+
return self
599+
600+
def as_(self, type_name: str, pvalue_name: str) -> "ChangePoint":
601+
"""Continuation of the `CHANGE_POINT` command.
602+
603+
:param type_name: The name of the output column with the change point type.
604+
If not specified, `type` is used.
605+
:param pvalue_name: The name of the output column with the p-value that indicates
606+
how extreme the change point is. If not specified, `pvalue` is used.
607+
"""
608+
self._type_name = type_name
609+
self._pvalue_name = pvalue_name
610+
return self
611+
612+
def _render_internal(self) -> str:
613+
key = "" if not self._key else f" ON {self._key}"
614+
names = (
615+
""
616+
if not self._type_name and not self._pvalue_name
617+
else f' AS {self._type_name or "type"}, {self._pvalue_name or "pvalue"}'
618+
)
619+
return f"CHANGE_POINT {self._value}{key}{names}"
555620

556621

557622
class Dissect(ESQLBase):
@@ -688,6 +753,41 @@ def _render_internal(self) -> str:
688753
return f"EVAL {cols}"
689754

690755

756+
class Fork(ESQLBase):
757+
"""Implementation of the ``FORK`` processing command.
758+
759+
This class inherits from :class:`ESQLBase <elasticsearch.esql.esql.ESQLBase>`,
760+
to make it possible to chain all the commands that belong to an ES|QL query
761+
in a single expression.
762+
"""
763+
764+
def __init__(
765+
self,
766+
parent: ESQLBase,
767+
fork1: ESQLBase,
768+
fork2: Optional[ESQLBase] = None,
769+
fork3: Optional[ESQLBase] = None,
770+
fork4: Optional[ESQLBase] = None,
771+
fork5: Optional[ESQLBase] = None,
772+
fork6: Optional[ESQLBase] = None,
773+
fork7: Optional[ESQLBase] = None,
774+
fork8: Optional[ESQLBase] = None,
775+
):
776+
super().__init__(parent)
777+
self._branches = [fork1, fork2, fork3, fork4, fork5, fork6, fork7, fork8]
778+
779+
def _render_internal(self) -> str:
780+
cmds = ""
781+
for branch in self._branches:
782+
if branch:
783+
cmd = branch.render()[3:].replace("\n", " ")
784+
if cmds == "":
785+
cmds = f"( {cmd} )"
786+
else:
787+
cmds += f"\n ( {cmd} )"
788+
return f"FORK {cmds}"
789+
790+
691791
class Grok(ESQLBase):
692792
"""Implementation of the ``GROK`` processing command.
693793

0 commit comments

Comments
 (0)