Skip to content

Commit 5e6aedb

Browse files
committed
unify piping with fallback
1 parent 7c744a8 commit 5e6aedb

File tree

10 files changed

+189
-96
lines changed

10 files changed

+189
-96
lines changed

Examples/Arrow/Arrow.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@
10791079
"text": [
10801080
"[\n",
10811081
" 'd2':\n",
1082-
" [ ngroup, x, g, v ]\n",
1082+
" [ g, ngroup, x, v ]\n",
10831083
" ->\n",
10841084
" [ g, ngroup, row_number, shift_v, v, x ]\n",
10851085
"]\n",

Examples/Arrow/ArrowSetSystem.ipynb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
{
8282
"data": {
8383
"text/plain": [
84-
"(TableDescription(table_name=\"table_a\", column_names=[\"a\", \"b\"]).extend({\"c\": \"a + b\"}))"
84+
"(TableDescription(table_name=\"table_a\", column_names=[\"a\", \"b\"]).extend({\"c\": \"a + b\"}))\n"
8585
]
8686
},
8787
"execution_count": 2,
@@ -158,7 +158,7 @@
158158
" TableDescription(table_name=\"table_b\", column_names=[\"a\", \"b\", \"c\"]).rename_columns(\n",
159159
" {\"x\": \"a\"}\n",
160160
" )\n",
161-
")"
161+
")\n"
162162
]
163163
},
164164
"execution_count": 4,
@@ -233,7 +233,7 @@
233233
" TableDescription(table_name=\"table_a\", column_names=[\"a\", \"b\"])\n",
234234
" .extend({\"c\": \"a + b\"})\n",
235235
" .rename_columns({\"x\": \"a\"})\n",
236-
")"
236+
")\n"
237237
]
238238
},
239239
"execution_count": 6,
@@ -742,7 +742,7 @@
742742
" TableDescription(table_name=\"table_c\", column_names=[\"a\", \"b\", \"c\"])\n",
743743
" .extend({\"x\": \"a + b\"})\n",
744744
" .select_columns([\"x\"])\n",
745-
")"
745+
")\n"
746746
]
747747
},
748748
"execution_count": 14,
@@ -1036,7 +1036,7 @@
10361036
" TableDescription(\n",
10371037
" table_name=\"data_frame\", column_names=[\"a\", \"b\", \"d\", \"x\"]\n",
10381038
" ).drop_columns([\"d\", \"x\"])\n",
1039-
")"
1039+
")\n"
10401040
]
10411041
},
10421042
"execution_count": 19,
@@ -1073,7 +1073,7 @@
10731073
" .drop_columns([\"d\", \"x\"])\n",
10741074
" .extend({\"c\": \"a + b\"})\n",
10751075
" .rename_columns({\"x\": \"a\"})\n",
1076-
")"
1076+
")\n"
10771077
]
10781078
},
10791079
"execution_count": 20,

Examples/Arrow/CDesign.ipynb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@
350350
" TableDescription(table_name=\"data_frame\", column_names=[\"x\", \"y\"]).extend(\n",
351351
" {\"z\": \"x.mean()\"}, partition_by=[\"y\"]\n",
352352
" )\n",
353-
")"
353+
")\n"
354354
]
355355
},
356356
"execution_count": 2,
@@ -481,7 +481,7 @@
481481
" TableDescription(table_name=\"data_frame\", column_names=[\"x\", \"y\"])\n",
482482
" .extend({\"z\": \"x.mean()\"}, partition_by=[\"y\"])\n",
483483
" .extend({\"ratio\": \"y / x\"})\n",
484-
")"
484+
")\n"
485485
]
486486
},
487487
"execution_count": 4,
@@ -773,7 +773,7 @@
773773
" TableDescription(table_name=\"data_frame\", column_names=[\"x\", \"y\"])\n",
774774
" .extend({\"z\": \"x.mean()\"}, partition_by=[\"y\"])\n",
775775
" .extend({\"ratio\": \"y / x\"})\n",
776-
")"
776+
")\n"
777777
]
778778
},
779779
"execution_count": 11,
@@ -915,7 +915,7 @@
915915
"name": "stdout",
916916
"output_type": "stream",
917917
"text": [
918-
"Caught: extra incoming columns: {'z', 'ratio'}\n"
918+
"Caught: extra incoming columns: {'ratio', 'z'}\n"
919919
]
920920
}
921921
],
@@ -1065,7 +1065,7 @@
10651065
" TableDescription(table_name=\"data_frame\", column_names=[\"x\", \"y\"]).extend(\n",
10661066
" {\"x\": \"x + 1\", \"y\": \"9\"}\n",
10671067
" )\n",
1068-
")"
1068+
")\n"
10691069
]
10701070
},
10711071
"execution_count": 19,

data_algebra/arrow.py

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import data_algebra.data_ops
44
import data_algebra.flow_text
5+
from data_algebra.shift_pipe_action import ShiftPipeAction
56

67

7-
class Arrow(abc.ABC):
8+
class Arrow(ShiftPipeAction):
89
"""
910
Arrow from category theory: see Steve Awody,
1011
"Category Theory, 2nd Edition", Oxford Univ. Press, 2010 pg. 4.
@@ -13,7 +14,7 @@ class Arrow(abc.ABC):
1314
"""
1415

1516
def __init__(self):
16-
pass
17+
ShiftPipeAction.__init__(self)
1718

1819
@abc.abstractmethod
1920
def dom(self):
@@ -23,28 +24,16 @@ def dom(self):
2324
def cod(self):
2425
"""return co-domain, object at head of arrow"""
2526

26-
@abc.abstractmethod
27-
def apply_to(self, b):
28-
"""apply_to b, compose arrows (right to left)"""
29-
3027
# noinspection PyPep8Naming
3128
@abc.abstractmethod
32-
def act_on(self, X):
33-
"""act on X, must associate with composition"""
29+
def act_on(self, b):
30+
"""act on b, must associate with composition"""
3431

3532
# noinspection PyPep8Naming
3633
def transform(self, X):
3734
"""transform X, may or may not associate with composition"""
3835
return self.act_on(X)
3936

40-
def __rshift__(self, other): # override self >> other
41-
return other.apply_to(self)
42-
43-
def __rrshift__(self, other): # override other >> self
44-
if isinstance(other, Arrow):
45-
return self.apply_to(other)
46-
return self.act_on(other)
47-
4837

4938
class DataOpArrow(Arrow):
5039
"""
@@ -79,39 +68,38 @@ def get_feature_names(self):
7968
cp = self.outgoing_columns.copy()
8069
return cp
8170

82-
def apply_to(self, b):
71+
def act_on(self, b):
8372
"""replace self input table with b"""
8473
if isinstance(b, data_algebra.data_ops.ViewRepresentation):
8574
b = DataOpArrow(b)
86-
assert isinstance(b, DataOpArrow)
87-
# check categorical arrow composition conditions
88-
missing = set(self.incoming_columns) - set(b.outgoing_columns)
89-
if len(missing) > 0:
90-
raise ValueError("missing required columns: " + str(missing))
91-
excess = set(b.outgoing_columns) - set(self.incoming_columns)
92-
if len(excess) > 0:
93-
raise ValueError("extra incoming columns: " + str(excess))
94-
new_pipeline = self.pipeline.replace_leaves({self.free_table_key: b.pipeline})
95-
new_pipeline.get_tables() # check tables are compatible
96-
res = DataOpArrow(
97-
pipeline=new_pipeline,
98-
free_table_key=b.free_table_key,
99-
)
100-
return res
101-
102-
# noinspection PyPep8Naming
103-
def act_on(self, X):
75+
if isinstance(b, DataOpArrow):
76+
# check categorical arrow composition conditions
77+
missing = set(self.incoming_columns) - set(b.outgoing_columns)
78+
if len(missing) > 0:
79+
raise ValueError("missing required columns: " + str(missing))
80+
excess = set(b.outgoing_columns) - set(self.incoming_columns)
81+
if len(excess) > 0:
82+
raise ValueError("extra incoming columns: " + str(excess))
83+
new_pipeline = self.pipeline.replace_leaves({self.free_table_key: b.pipeline})
84+
new_pipeline.get_tables() # check tables are compatible
85+
res = DataOpArrow(
86+
pipeline=new_pipeline,
87+
free_table_key=b.free_table_key,
88+
)
89+
return res
90+
if isinstance(b, ShiftPipeAction):
91+
return b.act_on(self) # fall back
10492
# assume a pandas.DataFrame compatible object
10593
# noinspection PyUnresolvedReferences
106-
cols = set(X.columns)
94+
cols = set(b.columns)
10795
missing = set(self.incoming_columns) - cols
10896
if len(missing) > 0:
10997
raise ValueError("missing required columns: " + str(missing))
11098
excess = cols - set(self.incoming_columns)
11199
assert len(excess) == 0
112100
if len(excess) > 0:
113-
X = X[self.incoming_columns]
114-
return self.pipeline.act_on(X)
101+
b = b[self.incoming_columns]
102+
return self.pipeline.act_on(b)
115103

116104
def dom(self):
117105
return DataOpArrow(

data_algebra/cdata.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import data_algebra.data_model
1111
import data_algebra.util
1212
import data_algebra.data_ops
13+
from data_algebra.shift_pipe_action import ShiftPipeAction
1314

1415

1516
def _str_list_to_html(lst: Iterable[str]) -> str:
@@ -257,7 +258,7 @@ def map_from_rows(self):
257258
return RecordMap(blocks_out=self, strict=self.strict)
258259

259260

260-
class RecordMap:
261+
class RecordMap(ShiftPipeAction):
261262
"""
262263
Class for specifying general record to record transforms.
263264
"""
@@ -280,6 +281,7 @@ def __init__(
280281
:param blocks_out: outgoing record specification, None for row-records.
281282
:param strict: if True insist block be strict, and in and out blocks agree on row-form columns.∂
282283
"""
284+
ShiftPipeAction.__init__(self)
283285
assert isinstance(strict, bool)
284286
self.strict = strict
285287
if blocks_in is not None:
@@ -426,10 +428,6 @@ def transform(
426428
blocks_out=self.blocks_out,
427429
)
428430
return X
429-
430-
def act_on(self, X):
431-
assert isinstance(X, data_algebra.data_ops.ViewRepresentation)
432-
return X.convert_records(self)
433431

434432
def compose(self, other):
435433
"""
@@ -534,12 +532,16 @@ def inverse(self):
534532
"""
535533
assert self.strict
536534
return RecordMap(blocks_in=self.blocks_out, blocks_out=self.blocks_in, strict=True)
537-
538-
def __rshift__(self, other): # override self >> other
539-
return self.act_on(other)
540-
541-
def __rrshift__(self, other): # override other >> self
542-
return self.transform(other)
535+
536+
def act_on(self, b):
537+
if isinstance(b, RecordMap):
538+
self.compose(b)
539+
if isinstance(b, data_algebra.data_ops.ViewRepresentation):
540+
return b.convert_records(self)
541+
if isinstance(b, ShiftPipeAction):
542+
return b.act_on(self) # fall back to peer's action
543+
# assume table like
544+
return self.transform(b)
543545

544546
def fmt(self) -> str:
545547
"""Format for informal presentation."""

data_algebra/data_ops.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
ordered_union,
2525
)
2626
import data_algebra.util
27+
from data_algebra.shift_pipe_action import ShiftPipeAction
2728

2829

2930
_have_black = False
@@ -411,36 +412,36 @@ def __ne__(self, other):
411412

412413
# composition
413414

414-
# noinspection PyPep8Naming
415-
def act_on(self, X, *, data_model=None):
415+
def act_on(self, b):
416416
"""
417-
apply self to X, must associate with composition
417+
apply self to b, must associate with composition
418418
Operator is strict about column names.
419419
420-
:param X: input data frame
421-
:param data_model implementation to use
422-
:return: transformed result
420+
:param b: input data frame
421+
:return: transformed or composed result
423422
"""
424423
tables = self.get_tables()
425-
if isinstance(X, ViewRepresentation):
424+
if isinstance(b, ViewRepresentation):
426425
# insert to only table or if more than one, table with matching key
427426
if len(tables) == 1:
428427
key = list(tables.keys())[0]
429428
else:
430-
key = X.key
429+
key = b.key
431430
assert isinstance(key, str)
432431
old = tables[key]
433-
assert set(X.column_names) == set(old.column_names) # this is defending associativity of composition against table narrowing
434-
return self.replace_leaves({key: X})
432+
assert set(b.column_names) == set(old.column_names) # this is defending associativity of composition against table narrowing
433+
return self.replace_leaves({key: b})
434+
# see if b is ShiftPipeAction, so it can handle the mapping (using fact data is not a ShiftPipeAction instance)
435+
if isinstance(b, ShiftPipeAction):
436+
return b.act_on(self)
435437
# assume a table
436438
assert len(tables) == 1
437439
key = list(tables.keys())[0]
438440
assert isinstance(key, str)
439441
old = tables[key]
440-
assert set(X.columns) == set(old.column_names)
442+
assert set(b.columns) == set(old.column_names)
441443
return self.transform(
442-
X=X,
443-
data_model=data_model,
444+
b,
444445
strict=True,
445446
)
446447

data_algebra/data_ops_types.py

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import data_algebra.expr_rep
99
import data_algebra.cdata
1010
import data_algebra.OrderedSet
11+
from data_algebra.shift_pipe_action import ShiftPipeAction
1112

1213

1314
class MethodUse(NamedTuple):
@@ -19,12 +20,13 @@ class MethodUse(NamedTuple):
1920
is_ordered: bool = False
2021

2122

22-
class OperatorPlatform(abc.ABC):
23+
class OperatorPlatform(ShiftPipeAction):
2324
"""Abstract class representing ability to apply data_algebra operations."""
2425

2526
node_name: str
2627

2728
def __init__(self, *, node_name: str):
29+
ShiftPipeAction.__init__(self)
2830
assert isinstance(node_name, str)
2931
self.node_name = node_name
3032

@@ -73,24 +75,6 @@ def replace_leaves(self, replacement_map: Dict[str, Any]):
7375
:return: new operator DAG
7476
"""
7577

76-
# noinspection PyPep8Naming
77-
@abc.abstractmethod
78-
def act_on(self, X, *, data_model=None):
79-
"""
80-
apply self to X, must associate with composition
81-
Operator is strict about column names.
82-
83-
:param X: input data frame
84-
:param data_model implementation to use
85-
:return: transformed result
86-
"""
87-
88-
def __rshift__(self, other): # override self >> other
89-
return other.act_on(self)
90-
91-
def __rrshift__(self, other): # override other >> self
92-
return self.act_on(other)
93-
9478
# imitate a method
9579
def use(self, user_function, *args, **kwargs):
9680
"""

0 commit comments

Comments
 (0)