Skip to content

Commit 982795a

Browse files
committed
work on composability
1 parent c0295cd commit 982795a

File tree

6 files changed

+274
-26
lines changed

6 files changed

+274
-26
lines changed

Examples/LogisticExample/Logistic2.ipynb

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,31 +58,100 @@
5858
{
5959
"name": "stdout",
6060
"text": [
61-
"TableDescription(table_name='d', column_names=['subjectID', 'surveyCategory', 'assessmentTotal', 'irrelevantCol1', 'irrelevantCol2']) .\\\n extend({'probability': '(assessmentTotal * 0.237).exp()'}) .\\\n extend({'total': 'probability.sum()'}, partition_by=['subjectID']) .\\\n extend({'probability': 'probability / total'}) .\\\n extend({'sort_key': '-probability'}) .\\\n extend({'row_number': '_row_number()'}, partition_by=['subjectID'], order_by=['sort_key']) .\\\n select_rows('row_number == 1') .\\\n select_columns(['subjectID', 'surveyCategory', 'probability']) .\\\n rename_columns({'diagnosis': 'surveyCategory'})\n"
61+
"[\n Extend({'probability': '(assessmentTotal * 0.237).exp()'}, partition_by=None, order_by=None, reverse=None),\n Extend({'total': 'probability.sum()'}, partition_by='subjectID', order_by=None, reverse=None),\n Extend({'probability': 'probability/total'}, partition_by=None, order_by=None, reverse=None),\n]\n"
6262
],
6363
"output_type": "stream"
6464
}
6565
],
6666
"source": [
67-
"prob_caclulation = Locum(). \\\n",
67+
"prob_calculation = Locum(). \\\n",
6868
" extend({'probability': '(assessmentTotal * 0.237).exp()'}). \\\n",
6969
" extend({'total': 'probability.sum()'},\n",
7070
" partition_by='subjectID'). \\\n",
7171
" extend({'probability': 'probability/total'})\n",
7272
"\n",
73+
"print(prob_calculation)"
74+
],
75+
"metadata": {
76+
"collapsed": false,
77+
"pycharm": {
78+
"name": "#%%\n",
79+
"is_executing": false
80+
}
81+
}
82+
},
83+
{
84+
"cell_type": "code",
85+
"execution_count": 3,
86+
"outputs": [
87+
{
88+
"name": "stdout",
89+
"text": [
90+
"[\n Extend({'sort_key': '-probability'}, partition_by=None, order_by=None, reverse=None),\n Extend({'row_number': '_row_number()'}, partition_by=['subjectID'], order_by=['sort_key'], reverse=None),\n SelectRows('row_number == 1'),\n]\n"
91+
],
92+
"output_type": "stream"
93+
}
94+
],
95+
"source": [
7396
"top_rank = Locum(). \\\n",
7497
" extend({'sort_key': '-probability'}). \\\n",
7598
" extend({'row_number': '_row_number()'},\n",
7699
" partition_by=['subjectID'],\n",
77100
" order_by=['sort_key']). \\\n",
78101
" select_rows('row_number == 1')\n",
79102
"\n",
103+
"print(top_rank)"
104+
],
105+
"metadata": {
106+
"collapsed": false,
107+
"pycharm": {
108+
"name": "#%%\n",
109+
"is_executing": false
110+
}
111+
}
112+
},
113+
{
114+
"cell_type": "code",
115+
"execution_count": 4,
116+
"outputs": [
117+
{
118+
"name": "stdout",
119+
"text": [
120+
"[\n SelectColumns(['subjectID', 'surveyCategory', 'probability']),\n RenameColumns({'diagnosis': 'surveyCategory'}),\n]\n"
121+
],
122+
"output_type": "stream"
123+
}
124+
],
125+
"source": [
80126
"clean_up_columns = Locum(). \\\n",
81127
" select_columns(['subjectID', 'surveyCategory', 'probability']). \\\n",
82128
" rename_columns({'diagnosis': 'surveyCategory'})\n",
83129
"\n",
130+
"print(clean_up_columns)"
131+
],
132+
"metadata": {
133+
"collapsed": false,
134+
"pycharm": {
135+
"name": "#%%\n",
136+
"is_executing": false
137+
}
138+
}
139+
},
140+
{
141+
"cell_type": "code",
142+
"execution_count": 5,
143+
"outputs": [
144+
{
145+
"name": "stdout",
146+
"text": [
147+
"TableDescription(table_name='d', column_names=['subjectID', 'surveyCategory', 'assessmentTotal', 'irrelevantCol1', 'irrelevantCol2']) .\\\n extend({'probability': '(assessmentTotal * 0.237).exp()'}) .\\\n extend({'total': 'probability.sum()'}, partition_by=['subjectID']) .\\\n extend({'probability': 'probability / total'}) .\\\n extend({'sort_key': '-probability'}) .\\\n extend({'row_number': '_row_number()'}, partition_by=['subjectID'], order_by=['sort_key']) .\\\n select_rows('row_number == 1') .\\\n select_columns(['subjectID', 'surveyCategory', 'probability']) .\\\n rename_columns({'diagnosis': 'surveyCategory'})\n"
148+
],
149+
"output_type": "stream"
150+
}
151+
],
152+
"source": [
84153
"ops = data_algebra.data_ops.describe_table(d_local, 'd') +\\\n",
85-
" prob_caclulation +\\\n",
154+
" prob_calculation +\\\n",
86155
" top_rank +\\\n",
87156
" clean_up_columns\n",
88157
"\n",
@@ -98,7 +167,7 @@
98167
},
99168
{
100169
"cell_type": "code",
101-
"execution_count": 3,
170+
"execution_count": 6,
102171
"outputs": [
103172
{
104173
"data": {
@@ -107,7 +176,7 @@
107176
},
108177
"metadata": {},
109178
"output_type": "execute_result",
110-
"execution_count": 3
179+
"execution_count": 6
111180
}
112181
],
113182
"source": [

build/lib/data_algebra/data_pipe.py

Lines changed: 111 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ def apply(self, other, **kwargs):
5050
parse_env=parse_env,
5151
)
5252

53+
def __repr__(self):
54+
return ("Extend(" + self._ops.__repr__()
55+
+ ", partition_by=" + self.partition_by.__repr__()
56+
+ ", order_by=" + self.order_by.__repr__()
57+
+ ", reverse=" + self.reverse.__repr__()
58+
+ ")"
59+
)
60+
61+
def __str__(self):
62+
return self.__repr__()
63+
5364

5465
class Project(data_algebra.pipe.PipeStep):
5566
"""Class to specify aggregating or summarizing columns."""
@@ -69,6 +80,15 @@ def apply(self, other, **kwargs):
6980
parse_env = kwargs.get("parse_env", None)
7081
return other.project(ops=self._ops, group_by=self.group_by, parse_env=parse_env)
7182

83+
def __repr__(self):
84+
return ("Project(" + self._ops.__repr__()
85+
+ ", group_by=" + self.group_by.__repr__()
86+
+ ")"
87+
)
88+
89+
def __str__(self):
90+
return self.__repr__()
91+
7292

7393
class SelectRows(data_algebra.pipe.PipeStep):
7494
"""Class to specify a choice of rows.
@@ -88,6 +108,14 @@ def apply(self, other, **kwargs):
88108
parse_env = kwargs.get("parse_env", None)
89109
return other.select_rows(expr=self.expr, parse_env=parse_env)
90110

111+
def __repr__(self):
112+
return ("SelectRows(" + self.expr.__repr__()
113+
+ ")"
114+
)
115+
116+
def __str__(self):
117+
return self.__repr__()
118+
91119

92120
class SelectColumns(data_algebra.pipe.PipeStep):
93121
"""Class to specify a choice of columns.
@@ -107,6 +135,14 @@ def apply(self, other, **kwargs):
107135
)
108136
return other.select_columns(self.column_selection)
109137

138+
def __repr__(self):
139+
return ("SelectColumns(" + self.column_selection.__repr__()
140+
+ ")"
141+
)
142+
143+
def __str__(self):
144+
return self.__repr__()
145+
110146

111147
class DropColumns(data_algebra.pipe.PipeStep):
112148
"""Class to specify removal of columns.
@@ -126,6 +162,14 @@ def apply(self, other, **kwargs):
126162
)
127163
return other.drop_columns(self.column_deletions)
128164

165+
def __repr__(self):
166+
return ("DropColumns(" + self.column_deletions.__repr__()
167+
+ ")"
168+
)
169+
170+
def __str__(self):
171+
return self.__repr__()
172+
129173

130174
class OrderRows(data_algebra.pipe.PipeStep):
131175
"""Class to specify a columns to determine row order.
@@ -151,6 +195,15 @@ def apply(self, other, **kwargs):
151195
columns=self.order_columns, reverse=self.reverse, limit=self.limit
152196
)
153197

198+
def __repr__(self):
199+
return ("OrderRows(" + self.order_columns.__repr__()
200+
+ ", reverse=" + self.reverse.__repr__()
201+
+ ")"
202+
)
203+
204+
def __str__(self):
205+
return self.__repr__()
206+
154207

155208
class RenameColumns(data_algebra.pipe.PipeStep):
156209
"""Class to rename columns.
@@ -169,6 +222,14 @@ def apply(self, other, **kwargs):
169222
)
170223
return other.rename_columns(column_remapping=self.column_remapping)
171224

225+
def __repr__(self):
226+
return ("RenameColumns(" + self.column_remapping.__repr__()
227+
+ ")"
228+
)
229+
230+
def __str__(self):
231+
return self.__repr__()
232+
172233

173234
class NaturalJoin(data_algebra.pipe.PipeStep):
174235
_by: List[str]
@@ -190,6 +251,17 @@ def apply(self, other, **kwargs):
190251
)
191252
return other.natural_join(b=self._b, by=self._by, jointype=self._jointype)
192253

254+
def __repr__(self):
255+
return ("NaturalJoin("
256+
+ ", b=" + self._b.__repr__()
257+
+ ", by=" + self._by.__repr__()
258+
+ ", jointype=" + self._jointype.__repr__()
259+
+ ")"
260+
)
261+
262+
def __str__(self):
263+
return self.__repr__()
264+
193265

194266
class ConvertRecords(data_algebra.pipe.PipeStep):
195267
def __init__(self, record_map, *, blocks_out_table=None):
@@ -205,6 +277,16 @@ def apply(self, other, **kwargs):
205277
return other.convert_records(record_map=self.record_map,
206278
blocks_out_table=self.blocks_out_table)
207279

280+
def __repr__(self):
281+
return ("ConvertRecords(" + self.record_map.__repr__()
282+
+ ", record_map=" + self.record_map.__repr__()
283+
+ ", blocks_out_table=" + self.blocks_out_table.__repr__()
284+
+ ")"
285+
)
286+
287+
def __str__(self):
288+
return self.__repr__()
289+
208290

209291
class Locum(data_algebra.data_ops.OperatorPlatform):
210292
"""Class to represent future opertions."""
@@ -213,32 +295,47 @@ def __init__(self):
213295
data_algebra.data_ops.OperatorPlatform.__init__(self)
214296
self.ops = []
215297

216-
# noinspection PyPep8Naming
217-
def realize(self, X):
218-
pipeline = data_algebra.data_ops.describe_table(X, table_name="X")
298+
def apply_to(self, pipeline):
299+
if not isinstance(pipeline, data_algebra.data_ops.OperatorPlatform):
300+
raise TypeError("Expected othter to be a data_algebra.data_ops.OperatorPlatform")
219301
for s in self.ops:
220302
# pipeline = pipeline >> s
221303
pipeline = s.apply(pipeline)
222304
return pipeline
223305

306+
def append(self, other):
307+
if isinstance(other, Locum):
308+
for o in other.ops:
309+
self.ops.append(o)
310+
elif isinstance(other, data_algebra.pipe.PipeStep):
311+
self.ops.append(other)
312+
else:
313+
raise TypeError("unexpeted type for Locum + " + str(type(other)))
314+
return self
315+
316+
def realize(self, x):
317+
pipeline = data_algebra.data_ops.describe_table(x, table_name="x")
318+
return self.apply_to(pipeline)
319+
224320
# noinspection PyPep8Naming
225321
def transform(self, X):
322+
if isinstance(X, data_algebra.data_ops.OperatorPlatform):
323+
return self.apply_to(X)
226324
pipeline = self.realize(X)
227325
return pipeline.transform(X)
228326

229327
def __rrshift__(self, other): # override other >> self
230328
return self.transform(other)
231329

232-
def __add__(self, other):
233-
if not isinstance(other, Locum):
234-
raise TypeError("Expected other to be of type data_algebra.data_pipe.Locum")
330+
def __add__(self, other): # override self + other
235331
res = Locum()
236-
for o in self.ops:
237-
res.ops.append(o)
238-
for o in other.ops:
239-
res.ops.append(o)
332+
res.append(self)
333+
res.append(other)
240334
return res
241335

336+
def __radd__(self, other): # override other + self
337+
return self.apply_to(other)
338+
242339
# print
243340

244341
def __repr__(self):
@@ -248,13 +345,13 @@ def __repr__(self):
248345

249346
def __str__(self):
250347
return '[\n ' + \
251-
'\n '.join([str(o) + ',' for o in self.ops]) + \
252-
'\n]'
348+
'\n '.join([str(o) + ',' for o in self.ops]) + \
349+
'\n]'
253350

254351
# implement method chaining collection of pending operations
255352

256353
def extend(
257-
self, ops, *, partition_by=None, order_by=None, reverse=None, parse_env=None
354+
self, ops, *, partition_by=None, order_by=None, reverse=None, parse_env=None
258355
):
259356
if parse_env is not None:
260357
raise ValueError("Expected parse_env to be None")
@@ -306,7 +403,7 @@ def order_rows(self, columns, *, reverse=None, limit=None):
306403
return self
307404

308405
def convert_records(
309-
self, record_map, *, blocks_out_table=None
406+
self, record_map, *, blocks_out_table=None
310407
):
311408
op = ConvertRecords(record_map=record_map, blocks_out_table=blocks_out_table)
312409
self.ops.append(op)

coverage.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ data_algebra/cdata_impl.py 152 60 61%
4040
data_algebra/dask_model.py 121 23 81%
4141
data_algebra/data_model.py 41 15 63%
4242
data_algebra/data_ops.py 815 173 79%
43-
data_algebra/data_pipe.py 183 41 78%
43+
data_algebra/data_pipe.py 231 64 72%
4444
data_algebra/data_types.py 39 19 51%
4545
data_algebra/datatable_model.py 131 81 38%
4646
data_algebra/db_model.py 364 83 77%
@@ -54,7 +54,7 @@ data_algebra/pipe.py 65 19 71%
5454
data_algebra/util.py 84 7 92%
5555
data_algebra/yaml.py 120 15 88%
5656
-----------------------------------------------------
57-
TOTAL 2945 802 73%
57+
TOTAL 2993 825 72%
5858

5959

60-
========================== 31 passed in 7.62 seconds ===========================
60+
========================== 31 passed in 7.03 seconds ===========================

0 commit comments

Comments
 (0)