Skip to content

Commit 8629575

Browse files
committed
Moar operators
1 parent aad7066 commit 8629575

File tree

8 files changed

+760
-0
lines changed

8 files changed

+760
-0
lines changed

reactivex/observable/mixins/combination.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from collections.abc import Callable, Iterable
56
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
67

78
if TYPE_CHECKING:
@@ -11,6 +12,7 @@
1112

1213

1314
_T = TypeVar("_T", covariant=True)
15+
_T2 = TypeVar("_T2")
1416

1517

1618
class CombinationMixin(Generic[_T]):
@@ -311,3 +313,171 @@ def amb(self, right_source: Observable[_T]) -> Observable[_T]:
311313
from reactivex import operators as ops
312314

313315
return self._as_observable().pipe(ops.amb(right_source))
316+
317+
def merge_all(self) -> Observable[Any]:
318+
"""Merge all inner observables.
319+
320+
Merges an observable sequence of observable sequences into an observable
321+
sequence.
322+
323+
Examples:
324+
Fluent style:
325+
>>> result = source_of_sources.merge_all()
326+
327+
Equivalent pipe style:
328+
>>> from reactivex import operators as ops
329+
>>> result = source_of_sources.pipe(ops.merge_all())
330+
331+
Returns:
332+
An observable sequence that merges the elements of the inner sequences.
333+
334+
See Also:
335+
- :func:`merge_all <reactivex.operators.merge_all>`
336+
- :meth:`merge`
337+
- :meth:`concat_all`
338+
"""
339+
# Cast is safe: merge_all is meant to be called on Observable of Observables.
340+
# The fluent API allows chaining this on nested observable sequences where
341+
# _T is Observable[_T2]. We return Observable[Any] as the inner type cannot
342+
# be statically inferred from _T without higher-kinded types.
343+
from reactivex import operators as ops
344+
345+
op: Callable[[Observable[Any]], Observable[Any]] = cast(
346+
"Callable[[Observable[Any]], Observable[Any]]", ops.merge_all()
347+
)
348+
return self._as_observable().pipe(op)
349+
350+
def zip_with_iterable(
351+
self, second: Iterable[_T2]
352+
) -> Observable[tuple[_T, _T2]]:
353+
"""Zip with iterable.
354+
355+
Merges the specified observable sequence and iterable into one observable
356+
sequence by creating a tuple whenever both sequences have produced an element
357+
at a corresponding index.
358+
359+
Examples:
360+
Fluent style:
361+
>>> result = source.zip_with_iterable([1, 2, 3])
362+
363+
Equivalent pipe style:
364+
>>> from reactivex import operators as ops
365+
>>> result = source.pipe(ops.zip_with_iterable([1, 2, 3]))
366+
367+
Args:
368+
second: Iterable to zip with the source observable.
369+
370+
Returns:
371+
An observable sequence containing the result of combining elements of the
372+
sources as a tuple.
373+
374+
See Also:
375+
- :func:`zip_with_iterable <reactivex.operators.zip_with_iterable>`
376+
- :meth:`zip`
377+
"""
378+
from reactivex import operators as ops
379+
380+
return self._as_observable().pipe(ops.zip_with_iterable(second))
381+
382+
def join(
383+
self,
384+
right: Observable[_T2],
385+
left_duration_mapper: Callable[[_T], Observable[Any]],
386+
right_duration_mapper: Callable[[_T2], Observable[Any]],
387+
) -> Observable[tuple[_T, _T2]]:
388+
"""Join based on overlapping durations.
389+
390+
Correlates the elements of two sequences based on overlapping durations.
391+
392+
Examples:
393+
Fluent style:
394+
>>> result = left.join(
395+
... right,
396+
... lambda x: rx.timer(0.5),
397+
... lambda x: rx.timer(0.5)
398+
... )
399+
400+
Equivalent pipe style:
401+
>>> from reactivex import operators as ops
402+
>>> result = left.pipe(
403+
... ops.join(
404+
... right,
405+
... lambda x: rx.timer(0.5),
406+
... lambda x: rx.timer(0.5)
407+
... )
408+
... )
409+
410+
Args:
411+
right: The right observable sequence to join elements for.
412+
left_duration_mapper: A function to select the duration (expressed as an
413+
observable sequence) of each element of the left observable sequence,
414+
used to determine overlap.
415+
right_duration_mapper: A function to select the duration (expressed as an
416+
observable sequence) of each element of the right observable sequence,
417+
used to determine overlap.
418+
419+
Returns:
420+
An observable sequence that contains elements combined into a tuple from
421+
source elements that have an overlapping duration.
422+
423+
See Also:
424+
- :func:`join <reactivex.operators.join>`
425+
- :meth:`group_join`
426+
"""
427+
from reactivex import operators as ops
428+
429+
return self._as_observable().pipe(
430+
ops.join(right, left_duration_mapper, right_duration_mapper)
431+
)
432+
433+
def group_join(
434+
self,
435+
right: Observable[_T2],
436+
left_duration_mapper: Callable[[_T], Observable[Any]],
437+
right_duration_mapper: Callable[[_T2], Observable[Any]],
438+
) -> Observable[tuple[_T, Observable[_T2]]]:
439+
"""Group join based on overlapping durations.
440+
441+
Correlates the elements of two sequences based on overlapping durations, and
442+
groups the results.
443+
444+
Examples:
445+
Fluent style:
446+
>>> result = left.group_join(
447+
... right,
448+
... lambda x: rx.timer(0.5),
449+
... lambda x: rx.timer(0.5)
450+
... )
451+
452+
Equivalent pipe style:
453+
>>> from reactivex import operators as ops
454+
>>> result = left.pipe(
455+
... ops.group_join(
456+
... right,
457+
... lambda x: rx.timer(0.5),
458+
... lambda x: rx.timer(0.5)
459+
... )
460+
... )
461+
462+
Args:
463+
right: The right observable sequence to join elements for.
464+
left_duration_mapper: A function to select the duration (expressed as an
465+
observable sequence) of each element of the left observable sequence,
466+
used to determine overlap.
467+
right_duration_mapper: A function to select the duration (expressed as an
468+
observable sequence) of each element of the right observable sequence,
469+
used to determine overlap.
470+
471+
Returns:
472+
An observable sequence that contains elements combined into a tuple from
473+
source elements that have an overlapping duration.
474+
475+
See Also:
476+
- :func:`group_join <reactivex.operators.group_join>`
477+
- :meth:`join`
478+
"""
479+
from reactivex import operators as ops
480+
481+
return self._as_observable().pipe(
482+
ops.group_join(right, left_duration_mapper, right_duration_mapper)
483+
)

reactivex/observable/mixins/conditional.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from collections.abc import Callable
56
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, overload
67

78
if TYPE_CHECKING:
@@ -66,3 +67,69 @@ def default_if_empty(self, default_value: Any = None) -> Observable[Any]:
6667
from reactivex import operators as ops
6768

6869
return self._as_observable().pipe(ops.default_if_empty(default_value))
70+
71+
def find(
72+
self, predicate: Callable[[_T, int, Observable[_T]], bool]
73+
) -> Observable[_T | None]:
74+
"""Find first element matching predicate.
75+
76+
Searches for an element that matches the conditions defined by the
77+
specified predicate, and returns the first occurrence within the entire
78+
Observable sequence.
79+
80+
Examples:
81+
Fluent style:
82+
>>> result = source.find(lambda x, i, obs: x > 3)
83+
84+
Equivalent pipe style:
85+
>>> from reactivex import operators as ops
86+
>>> result = source.pipe(ops.find(lambda x, i, obs: x > 3))
87+
88+
Args:
89+
predicate: The predicate that defines the conditions of the element to
90+
search for. Takes (value, index, source).
91+
92+
Returns:
93+
An observable sequence with the first element that matches the conditions
94+
defined by the specified predicate, if found; otherwise, None.
95+
96+
See Also:
97+
- :func:`find <reactivex.operators.find>`
98+
- :meth:`find_index`
99+
"""
100+
from reactivex import operators as ops
101+
102+
return self._as_observable().pipe(ops.find(predicate))
103+
104+
def find_index(
105+
self, predicate: Callable[[_T, int, Observable[_T]], bool]
106+
) -> Observable[int | None]:
107+
"""Find index of first element matching predicate.
108+
109+
Searches for an element that matches the conditions defined by the specified
110+
predicate, and returns an Observable sequence with the zero-based index of
111+
the first occurrence within the entire Observable sequence.
112+
113+
Examples:
114+
Fluent style:
115+
>>> result = source.find_index(lambda x, i, obs: x > 3)
116+
117+
Equivalent pipe style:
118+
>>> from reactivex import operators as ops
119+
>>> result = source.pipe(ops.find_index(lambda x, i, obs: x > 3))
120+
121+
Args:
122+
predicate: The predicate that defines the conditions of the element to
123+
search for. Takes (value, index, source).
124+
125+
Returns:
126+
An observable sequence with the zero-based index of the first element
127+
that matches the conditions; if not found, returns -1.
128+
129+
See Also:
130+
- :func:`find_index <reactivex.operators.find_index>`
131+
- :meth:`find`
132+
"""
133+
from reactivex import operators as ops
134+
135+
return self._as_observable().pipe(ops.find_index(predicate))

reactivex/observable/mixins/filtering.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,3 +593,121 @@ def skip_while_indexed(
593593
from reactivex import operators as ops
594594

595595
return self._as_observable().pipe(ops.skip_while_indexed(predicate_indexed))
596+
597+
def single(self, predicate: typing.Predicate[_T] | None = None) -> Observable[_T]:
598+
"""Return single element matching predicate.
599+
600+
Returns the only element of an observable sequence that satisfies the condition
601+
in the optional predicate, and reports an exception if there is not exactly one
602+
element in the observable sequence.
603+
604+
Examples:
605+
Fluent style:
606+
>>> result = source.single()
607+
>>> result = source.single(lambda x: x == 42)
608+
609+
Equivalent pipe style:
610+
>>> from reactivex import operators as ops
611+
>>> result = source.pipe(ops.single())
612+
>>> result = source.pipe(ops.single(lambda x: x == 42))
613+
614+
Args:
615+
predicate: A predicate function to evaluate for elements in the source
616+
sequence.
617+
618+
Returns:
619+
An observable sequence containing the single element in the observable
620+
sequence that satisfies the condition in the predicate.
621+
622+
Raises:
623+
Exception: If there is not exactly one element matching the predicate.
624+
625+
See Also:
626+
- :func:`single <reactivex.operators.single>`
627+
- :meth:`single_or_default`
628+
- :meth:`first`
629+
- :meth:`last`
630+
"""
631+
from reactivex import operators as ops
632+
633+
return self._as_observable().pipe(ops.single(predicate))
634+
635+
def single_or_default(
636+
self, predicate: typing.Predicate[_T] | None = None, default_value: Any = None
637+
) -> Observable[_T]:
638+
"""Return single element or default.
639+
640+
Returns the only element of an observable sequence that matches the predicate,
641+
or a default value if no such element exists.
642+
643+
Examples:
644+
Fluent style:
645+
>>> result = source.single_or_default()
646+
>>> result = source.single_or_default(lambda x: x == 42, 0)
647+
648+
Equivalent pipe style:
649+
>>> from reactivex import operators as ops
650+
>>> result = source.pipe(ops.single_or_default())
651+
>>> result = source.pipe(ops.single_or_default(lambda x: x == 42, 0))
652+
653+
Args:
654+
predicate: A predicate function to evaluate for elements in the source
655+
sequence.
656+
default_value: The default value if no element matches or sequence is empty.
657+
658+
Returns:
659+
An observable sequence containing the single element in the observable
660+
sequence that satisfies the condition in the predicate, or the default
661+
value if no such element exists.
662+
663+
See Also:
664+
- :func:`single_or_default <reactivex.operators.single_or_default>`
665+
- :meth:`single`
666+
- :meth:`first_or_default`
667+
- :meth:`last_or_default`
668+
"""
669+
from reactivex import operators as ops
670+
671+
return self._as_observable().pipe(
672+
ops.single_or_default(predicate, default_value)
673+
)
674+
675+
def element_at_or_default(
676+
self, index: int, default_value: _T | None = None
677+
) -> Observable[_T]:
678+
"""Get element at index or default.
679+
680+
Returns the element at a specified index in a sequence or a default value if
681+
the index is out of range.
682+
683+
Examples:
684+
Fluent style:
685+
>>> result = source.element_at_or_default(5)
686+
>>> result = source.element_at_or_default(5, 0)
687+
688+
Equivalent pipe style:
689+
>>> from reactivex import operators as ops
690+
>>> result = source.pipe(ops.element_at_or_default(5))
691+
>>> result = source.pipe(ops.element_at_or_default(5, 0))
692+
693+
Args:
694+
index: The zero-based index of the element to retrieve.
695+
default_value: The default value if the index is outside the bounds of
696+
the source sequence.
697+
698+
Returns:
699+
An observable sequence that produces the element at the specified position
700+
in the source sequence, or a default value if the index is outside the
701+
bounds of the source sequence.
702+
703+
See Also:
704+
- :func:`element_at_or_default <reactivex.operators.element_at_or_default>`
705+
- :meth:`element_at`
706+
- :meth:`first_or_default`
707+
- :meth:`last_or_default`
708+
"""
709+
from reactivex import operators as ops
710+
711+
return self._as_observable().pipe(
712+
ops.element_at_or_default(index, default_value)
713+
)

0 commit comments

Comments
 (0)