Skip to content

Commit 583c90c

Browse files
committed
work on replacement test
rebuild and retest
1 parent 7e4822b commit 583c90c

File tree

13 files changed

+6838
-6906
lines changed

13 files changed

+6838
-6906
lines changed

build/lib/data_algebra/arrow.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,7 @@ def apply_to(self, b):
137137
raise ValueError("forbidden incoming columns: " + str(excess))
138138
if self.strict:
139139
raise ValueError("extra incoming columns: " + str(excess))
140-
new_pipeline = self.pipeline.apply_to(
141-
b.pipeline, target_table_key=self.free_table_key
142-
)
140+
new_pipeline = self.pipeline.replace_leaves({self.free_table_key: b.pipeline})
143141
new_pipeline.get_tables() # check tables are compatible
144142
res = DataOpArrow(
145143
pipeline=new_pipeline,

build/lib/data_algebra/data_ops.py

Lines changed: 78 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,17 +1013,19 @@ def forbidden_columns(
10131013
forbidden = set()
10141014
return {self.key: set(forbidden)}
10151015

1016-
def apply_to(self, a, *, target_table_key=None):
1016+
def replace_leaves(self, replacement_map: Dict[str, Any]):
10171017
"""
1018-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
1018+
Replace leaves of DAG
10191019
10201020
:param a: operators to apply to
1021-
:param target_table_key: table key to replace with self, None counts as "match all"
1021+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
10221022
:return: new operator DAG
10231023
"""
1024-
if (target_table_key is None) or (target_table_key == self.key):
1025-
# replace table with a
1026-
return a
1024+
assert isinstance(replacement_map, dict)
1025+
try:
1026+
return replacement_map[self.key]
1027+
except KeyError:
1028+
pass
10271029
# copy self
10281030
r = TableDescription(
10291031
table_name=self.table_name,
@@ -1374,16 +1376,17 @@ def __init__(
13741376
self, column_names=column_names, sources=[source], node_name="ExtendNode"
13751377
)
13761378

1377-
def apply_to(self, a, *, target_table_key=None):
1379+
def replace_leaves(self, replacement_map: Dict[str, Any]):
13781380
"""
1379-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
1381+
Replace leaves of DAG
13801382
13811383
:param a: operators to apply to
1382-
:param target_table_key: table key to replace with self, None counts as "match all"
1384+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
13831385
:return: new operator DAG
13841386
"""
1387+
assert isinstance(replacement_map, dict)
13851388
new_sources = [
1386-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
1389+
s.replace_leaves(replacement_map) for s in self.sources
13871390
]
13881391
return new_sources[0].extend_parsed_(
13891392
parsed_ops=self.ops,
@@ -1635,16 +1638,17 @@ def get_method_uses_(self, methods_seen: Set[MethodUse]) -> None:
16351638
MethodUse(k, is_project=True, is_windowed=False, is_ordered=False)
16361639
)
16371640

1638-
def apply_to(self, a, *, target_table_key=None):
1641+
def replace_leaves(self, replacement_map: Dict[str, Any]):
16391642
"""
1640-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
1643+
Replace leaves of DAG
16411644
16421645
:param a: operators to apply to
1643-
:param target_table_key: table key to replace with self, None counts as "match all"
1646+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
16441647
:return: new operator DAG
16451648
"""
1649+
assert isinstance(replacement_map, dict)
16461650
new_sources = [
1647-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
1651+
s.replace_leaves(replacement_map) for s in self.sources
16481652
]
16491653
return new_sources[0].project_parsed_(
16501654
parsed_ops=self.ops, group_by=self.group_by
@@ -1773,16 +1777,17 @@ def get_method_uses_(self, methods_seen: Set[MethodUse]) -> None:
17731777
MethodUse(k, is_project=False, is_windowed=False, is_ordered=False)
17741778
)
17751779

1776-
def apply_to(self, a, *, target_table_key=None):
1780+
def replace_leaves(self, replacement_map: Dict[str, Any]):
17771781
"""
1778-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
1782+
Replace leaves of DAG
17791783
17801784
:param a: operators to apply to
1781-
:param target_table_key: table key to replace with self, None counts as "match all"
1785+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
17821786
:return: new operator DAG
17831787
"""
1788+
assert isinstance(replacement_map, dict)
17841789
new_sources = [
1785-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
1790+
s.replace_leaves(replacement_map) for s in self.sources
17861791
]
17871792
return new_sources[0].select_rows_parsed_(parsed_ops=self.ops)
17881793

@@ -1888,16 +1893,17 @@ def forbidden_columns(
18881893
forbidden = set(forbidden).intersection(self.column_selection)
18891894
return self.sources[0].forbidden_columns(forbidden=forbidden)
18901895

1891-
def apply_to(self, a, *, target_table_key=None):
1896+
def replace_leaves(self, replacement_map: Dict[str, Any]):
18921897
"""
1893-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
1898+
Replace leaves of DAG
18941899
18951900
:param a: operators to apply to
1896-
:param target_table_key: table key to replace with self, None counts as "match all"
1901+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
18971902
:return: new operator DAG
18981903
"""
1904+
assert isinstance(replacement_map, dict)
18991905
new_sources = [
1900-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
1906+
s.replace_leaves(replacement_map) for s in self.sources
19011907
]
19021908
return new_sources[0].select_columns(columns=self.column_selection)
19031909

@@ -2000,16 +2006,17 @@ def forbidden_columns(
20002006
forbidden = set(forbidden) - set(self.column_deletions)
20012007
return self.sources[0].forbidden_columns(forbidden=forbidden)
20022008

2003-
def apply_to(self, a, *, target_table_key=None):
2009+
def replace_leaves(self, replacement_map: Dict[str, Any]):
20042010
"""
2005-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2011+
Replace leaves of DAG
20062012
20072013
:param a: operators to apply to
2008-
:param target_table_key: table key to replace with self, None counts as "match all"
2014+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
20092015
:return: new operator DAG
20102016
"""
2017+
assert isinstance(replacement_map, dict)
20112018
new_sources = [
2012-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2019+
s.replace_leaves(replacement_map) for s in self.sources
20132020
]
20142021
return new_sources[0].drop_columns(column_deletions=self.column_deletions)
20152022

@@ -2101,16 +2108,17 @@ def __init__(self, source, columns, *, reverse=None, limit=None):
21012108
node_name="OrderRowsNode",
21022109
)
21032110

2104-
def apply_to(self, a, *, target_table_key=None):
2111+
def replace_leaves(self, replacement_map: Dict[str, Any]):
21052112
"""
2106-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2113+
Replace leaves of DAG
21072114
21082115
:param a: operators to apply to
2109-
:param target_table_key: table key to replace with self, None counts as "match all"
2116+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
21102117
:return: new operator DAG
21112118
"""
2119+
assert isinstance(replacement_map, dict)
21122120
new_sources = [
2113-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2121+
s.replace_leaves(replacement_map) for s in self.sources
21142122
]
21152123
return new_sources[0].order_rows(
21162124
columns=self.order_columns, reverse=self.reverse, limit=self.limit
@@ -2246,16 +2254,17 @@ def forbidden_columns(
22462254
new_forbidden.update(self.new_columns)
22472255
return self.sources[0].forbidden_columns(forbidden=new_forbidden)
22482256

2249-
def apply_to(self, a, *, target_table_key=None):
2257+
def replace_leaves(self, replacement_map: Dict[str, Any]):
22502258
"""
2251-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2259+
Replace leaves of DAG
22522260
22532261
:param a: operators to apply to
2254-
:param target_table_key: table key to replace with self, None counts as "match all"
2262+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
22552263
:return: new operator DAG
22562264
"""
2265+
assert isinstance(replacement_map, dict)
22572266
new_sources = [
2258-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2267+
s.replace_leaves(replacement_map) for s in self.sources
22592268
]
22602269
return new_sources[0].map_columns(column_remapping=self.column_remapping)
22612270

@@ -2383,16 +2392,17 @@ def forbidden_columns(
23832392
new_forbidden.update(self.new_columns)
23842393
return self.sources[0].forbidden_columns(forbidden=new_forbidden)
23852394

2386-
def apply_to(self, a, *, target_table_key=None):
2395+
def replace_leaves(self, replacement_map: Dict[str, Any]):
23872396
"""
2388-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2397+
Replace leaves of DAG
23892398
23902399
:param a: operators to apply to
2391-
:param target_table_key: table key to replace with self, None counts as "match all"
2400+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
23922401
:return: new operator DAG
23932402
"""
2403+
assert isinstance(replacement_map, dict)
23942404
new_sources = [
2395-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2405+
s.replace_leaves(replacement_map) for s in self.sources
23962406
]
23972407
return new_sources[0].rename_columns(column_remapping=self.column_remapping)
23982408

@@ -2536,16 +2546,17 @@ def __init__(
25362546
if (self.jointype == "CROSS") and (len(self.on_a) != 0):
25372547
raise ValueError("CROSS joins must have an empty 'on' list")
25382548

2539-
def apply_to(self, a, *, target_table_key=None):
2549+
def replace_leaves(self, replacement_map: Dict[str, Any]):
25402550
"""
2541-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2551+
Replace leaves of DAG
25422552
25432553
:param a: operators to apply to
2544-
:param target_table_key: table key to replace with self, None counts as "match all"
2554+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
25452555
:return: new operator DAG
25462556
"""
2557+
assert isinstance(replacement_map, dict)
25472558
new_sources = [
2548-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2559+
s.replace_leaves(replacement_map) for s in self.sources
25492560
]
25502561
return new_sources[0].natural_join(
25512562
b=new_sources[1], on=[(va, vb) for (va, vb) in zip(self.on_a, self.on_b)], jointype=self.jointype
@@ -2663,16 +2674,17 @@ def __init__(self, a, b, *, id_column="table_name", a_name="a", b_name="b"):
26632674
self.a_name = a_name
26642675
self.b_name = b_name
26652676

2666-
def apply_to(self, a, *, target_table_key=None):
2677+
def replace_leaves(self, replacement_map: Dict[str, Any]):
26672678
"""
2668-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2679+
Replace leaves of DAG
26692680
26702681
:param a: operators to apply to
2671-
:param target_table_key: table key to replace with self, None counts as "match all"
2682+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
26722683
:return: new operator DAG
26732684
"""
2685+
assert isinstance(replacement_map, dict)
26742686
new_sources = [
2675-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2687+
s.replace_leaves(replacement_map) for s in self.sources
26762688
]
26772689
return new_sources[0].concat_rows(
26782690
b=new_sources[1],
@@ -2778,16 +2790,17 @@ def __init__(self, *, source, record_map):
27782790
node_name="ConvertRecordsNode",
27792791
)
27802792

2781-
def apply_to(self, a, *, target_table_key=None):
2793+
def replace_leaves(self, replacement_map: Dict[str, Any]):
27822794
"""
2783-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2795+
Replace leaves of DAG
27842796
27852797
:param a: operators to apply to
2786-
:param target_table_key: table key to replace with self, None counts as "match all"
2798+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
27872799
:return: new operator DAG
27882800
"""
2801+
assert isinstance(replacement_map, dict)
27892802
new_sources = [
2790-
s.apply_to(a, target_table_key=target_table_key) for s in self.sources
2803+
s.replace_leaves(replacement_map) for s in self.sources
27912804
]
27922805
return new_sources[0].convert_records(record_map=self.record_map)
27932806

@@ -2910,15 +2923,26 @@ def __init__(
29102923
node_name="SQLNode",
29112924
)
29122925

2913-
def apply_to(self, a, *, target_table_key=None):
2926+
def replace_leaves(self, replacement_map: Dict[str, Any]):
29142927
"""
2915-
Apply self to operator DAG a. Basic OperatorPlatform, composable API.
2928+
Replace leaves of DAG
29162929
29172930
:param a: operators to apply to
2918-
:param target_table_key: table key to replace with self, None counts as "match all"
2931+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
29192932
:return: new operator DAG
29202933
"""
2921-
return self
2934+
assert isinstance(replacement_map, dict)
2935+
try:
2936+
return replacement_map[self.view_name]
2937+
except KeyError:
2938+
pass
2939+
# copy self
2940+
r = SQLNode(
2941+
sql=self.sql.copy(),
2942+
column_names=self.column_names.copy(),
2943+
view_name=self.view_name
2944+
)
2945+
return r
29222946

29232947
def _equiv_nodes(self, other):
29242948
if not isinstance(other, SQLNode):

build/lib/data_algebra/data_ops_types.py

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -79,50 +79,15 @@ def act_on(self, X, *, data_model=None):
7979
)
8080

8181
@abc.abstractmethod
82-
def apply_to(self, a, *, target_table_key=None):
82+
def replace_leaves(self, replacement_map: Dict[str, Any]):
8383
"""
84-
apply self to operator DAG a
84+
Replace leaves of DAG
8585
8686
:param a: operators to apply to
87-
:param target_table_key: table key to replace with self, None counts as "match all"
87+
:param replacement_map, table/sqlkeys mapped to replacement Operator platforms
8888
:return: new operator DAG
8989
"""
9090

91-
def __rrshift__(self, other): # override other >> self
92-
"""
93-
override other >> self
94-
self.apply_to/act_on(other)
95-
96-
:param other:
97-
:return:
98-
"""
99-
if isinstance(other, OperatorPlatform):
100-
return self.apply_to(other)
101-
return self.act_on(other)
102-
103-
def __rshift__(self, other): # override self >> other
104-
"""
105-
override self >> other
106-
other.apply_to(self)
107-
108-
:param other:
109-
:return:
110-
"""
111-
# can't use type >> type if only __rrshift__ is defined (must have __rshift__ in this case)
112-
if isinstance(other, OperatorPlatform):
113-
return other.apply_to(self)
114-
raise TypeError("unexpected type: " + str(type(other)))
115-
116-
# composition
117-
def add(self, other):
118-
"""
119-
other.apply_to(self)
120-
121-
:param other:
122-
:return:
123-
"""
124-
return other.apply_to(self)
125-
12691
# imitate a method
12792
def use(self, user_function, *args, **kwargs):
12893
"""
@@ -383,7 +348,7 @@ def get_feature_names(self, input_features=None):
383348
cp = cp + [f for f in input_features if f not in cp_set]
384349
return cp
385350

386-
# noinspection PyUnusedLocal,PyMethodMayBeStatic
351+
# noinspection PyMethodMayBeStatic
387352
def get_params(self, deep=False):
388353
"""sklearn interface, noop"""
389354
return dict()

build/lib/data_algebra/op_container.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ def act_on(self, X, *, data_model=None):
9292
self.set(self.ops.act_on(X=X, data_model=data_model))
9393
return self
9494

95-
def apply_to(self, a, *, target_table_key=None):
96-
self.set(self.ops.apply_to(a=a, target_table_key=target_table_key))
95+
def replace_leaves(self, replacement_map: Dict[str, Any]):
96+
self.set(self.ops.replace_leaves(replacement_map))
9797
return self
9898

9999
def __rrshift__(self, other): # override other >> self
@@ -122,9 +122,9 @@ def columns_produced(self):
122122

123123
# query generation
124124

125-
def to_near_sql_implementation_(self, db_model, *, using, temp_id_source):
125+
def to_near_sql_implementation_(self, db_model, *, using, temp_id_source, sql_format_options=None):
126126
return self.ops.to_near_sql_implementation_(
127-
db_model=db_model, using=using, temp_id_source=temp_id_source
127+
db_model=db_model, using=using, temp_id_source=temp_id_source, sql_format_options=sql_format_options
128128
)
129129

130130
# define builders for all non-initial ops types on base class

0 commit comments

Comments
 (0)