Skip to content

Commit ae86323

Browse files
committed
work on piped actions
1 parent 5e6aedb commit ae86323

File tree

5 files changed

+33
-22
lines changed

5 files changed

+33
-22
lines changed

data_algebra/arrow.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,14 @@ def get_feature_names(self):
6868
cp = self.outgoing_columns.copy()
6969
return cp
7070

71-
def act_on(self, b):
72-
"""replace self input table with b"""
71+
def act_on(self, b, *, correct_ordered_first_call: bool = False):
72+
"""
73+
Apply self onto b.
74+
75+
:param b: item to act on, or item that has been sent to self.
76+
:param correct_ordered_first_call: if True indicates this call is from __rshift__ or __rrshift__ and not the fallback paths.
77+
"""
78+
assert isinstance(correct_ordered_first_call, bool)
7379
if isinstance(b, data_algebra.data_ops.ViewRepresentation):
7480
b = DataOpArrow(b)
7581
if isinstance(b, DataOpArrow):
@@ -87,8 +93,8 @@ def act_on(self, b):
8793
free_table_key=b.free_table_key,
8894
)
8995
return res
90-
if isinstance(b, ShiftPipeAction):
91-
return b.act_on(self) # fall back
96+
if correct_ordered_first_call and isinstance(b, ShiftPipeAction):
97+
return b.act_on(self, correct_ordered_first_call=False) # fall back
9298
# assume a pandas.DataFrame compatible object
9399
# noinspection PyUnresolvedReferences
94100
cols = set(b.columns)

data_algebra/cdata.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -533,13 +533,14 @@ def inverse(self):
533533
assert self.strict
534534
return RecordMap(blocks_in=self.blocks_out, blocks_out=self.blocks_in, strict=True)
535535

536-
def act_on(self, b):
536+
def act_on(self, b, *, correct_ordered_first_call: bool = False):
537+
assert isinstance(correct_ordered_first_call, bool)
537538
if isinstance(b, RecordMap):
538539
self.compose(b)
539540
if isinstance(b, data_algebra.data_ops.ViewRepresentation):
540541
return b.convert_records(self)
541-
if isinstance(b, ShiftPipeAction):
542-
return b.act_on(self) # fall back to peer's action
542+
if correct_ordered_first_call and isinstance(b, ShiftPipeAction):
543+
return b.act_on(self, correct_ordered_first_call=False) # fall back to peer's action
543544
# assume table like
544545
return self.transform(b)
545546

data_algebra/data_ops.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,14 +412,16 @@ def __ne__(self, other):
412412

413413
# composition
414414

415-
def act_on(self, b):
415+
def act_on(self, b, *, correct_ordered_first_call: bool = False):
416416
"""
417417
apply self to b, must associate with composition
418418
Operator is strict about column names.
419419
420420
:param b: input data frame
421+
:param correct_ordered_first_call: indicate not on fallback path
421422
:return: transformed or composed result
422423
"""
424+
assert isinstance(correct_ordered_first_call, bool)
423425
tables = self.get_tables()
424426
if isinstance(b, ViewRepresentation):
425427
# insert to only table or if more than one, table with matching key
@@ -432,8 +434,8 @@ def act_on(self, b):
432434
assert set(b.column_names) == set(old.column_names) # this is defending associativity of composition against table narrowing
433435
return self.replace_leaves({key: b})
434436
# see if b is ShiftPipeAction, so it can handle the mapping (using fact data is not a ShiftPipeAction instance)
435-
if isinstance(b, ShiftPipeAction):
436-
return b.act_on(self)
437+
if correct_ordered_first_call and isinstance(b, ShiftPipeAction):
438+
return b.act_on(self, correct_ordered_first_call=False)
437439
# assume a table
438440
assert len(tables) == 1
439441
key = list(tables.keys())[0]

data_algebra/shift_pipe_action.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ def __init__(self) -> None:
1111
pass
1212

1313
@abc.abstractmethod
14-
def act_on(self, b):
14+
def act_on(self, b, *, correct_ordered_first_call: bool = False):
1515
"""
1616
Apply self onto b.
17+
18+
:param b: item to act on, or item that has been sent to self.
19+
:param correct_ordered_first_call: if True indicates this call is from __rshift__ or __rrshift__ and not the fallback paths.
1720
"""
1821

1922
def __rshift__(self, b): # override self >> b
@@ -23,13 +26,13 @@ def __rshift__(self, b): # override self >> b
2326
"""
2427
if isinstance(b, ShiftPipeAction):
2528
# this is the expected path
26-
return b.act_on(self)
29+
return b.act_on(self, correct_ordered_first_call=True)
2730
# fall back to our action
28-
return self.act_on(b)
31+
return self.act_on(b, correct_ordered_first_call=False)
2932

3033
def __rrshift__(self, b): # override b >> self
3134
"""
3235
Delegate b >> self to self.act_on(b).
3336
This is read as sending b to self.
3437
"""
35-
return self.act_on(b)
38+
return self.act_on(b, correct_ordered_first_call=True)

tests/test_shift_pipe_action.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from data_algebra.shift_pipe_action import ShiftPipeAction
55
from data_algebra.cdata import pivot_rowrecs_to_blocks
66
from data_algebra.data_ops import descr, ConvertRecordsNode, ViewRepresentation
7-
import pytest
87

98

109
def test_shift_pipe_action_1():
@@ -16,21 +15,21 @@ def __init__(self, nm: str) -> None:
1615
ShiftPipeAction.__init__(self)
1716
self.nm = nm
1817

19-
def act_on(self, b):
20-
return f"{self}.act_on({b})"
18+
def act_on(self, b, *, correct_ordered_first_call: bool = False):
19+
return f"{self}.act_on({b}, correct_ordered_first_call={correct_ordered_first_call})"
2120

2221
def __str__(self) -> str:
2322
return self.nm
2423

2524
a = T1("a")
2625
b = T1("b")
27-
assert (a >> b) == "b.act_on(a)"
28-
assert (b >> a) == "a.act_on(b)"
29-
assert (7 >> a) == "a.act_on(7)"
30-
assert (a >> 7) == "a.act_on(7)"
26+
assert (a >> b) == "b.act_on(a, correct_ordered_first_call=True)"
27+
assert (b >> a) == "a.act_on(b, correct_ordered_first_call=True)"
28+
assert (7 >> a) == "a.act_on(7, correct_ordered_first_call=True)"
29+
assert (a >> 7) == "a.act_on(7, correct_ordered_first_call=False)"
3130

3231

33-
def test_shift_pipe_action_rm_data():
32+
def test_shift_pipe_action_mp_ops_data():
3433
mp = pivot_rowrecs_to_blocks(
3534
attribute_key_column="curve",
3635
attribute_value_column="effect_size",

0 commit comments

Comments
 (0)