Skip to content

Commit 3d67200

Browse files
authored
[BEAM-9547] Roll forward #12858 (#12920)
* [BEAM-9547] Raise NotImplementedError and WontImplementError throughout DeferredDataframe (#12858) * Draft of NotImplementedErrors for DataFrame * Default to jira BEAM-9547 * pivot is NotImplementedError * remove mistaken change to frames * Don't use __ror__ to convert df to PCollection, it's already a DataFrame operator
1 parent 379ba0e commit 3d67200

File tree

4 files changed

+207
-84
lines changed

4 files changed

+207
-84
lines changed

sdks/python/apache_beam/dataframe/frame_base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,13 @@ def wrapper(self, *args, **kwargs):
251251
return wrapper
252252

253253

254+
def not_implemented_method(op, jira='BEAM-9547'):
255+
def wrapper(self, *args, **kwargs):
256+
raise NotImplementedError("'%s' is not yet supported (%s)" % (op, jira))
257+
258+
return wrapper
259+
260+
254261
def copy_and_mutate(func):
255262
def wrapper(self, *args, **kwargs):
256263
copy = self.copy()

sdks/python/apache_beam/dataframe/frames.py

Lines changed: 90 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ def fillna(self, value, method):
7070
preserves_partition_by=partitionings.Singleton(),
7171
requires_partition_by=partitionings.Nothing()))
7272

73+
reindex = frame_base.not_implemented_method('reindex')
74+
7375
to_numpy = to_string = frame_base.wont_implement_method('non-deferred value')
7476

7577
transform = frame_base._elementwise_method(
@@ -117,6 +119,11 @@ def aggregate(self, func, axis=0, *args, **kwargs):
117119

118120
head = tail = frame_base.wont_implement_method('order-sensitive')
119121

122+
memory_usage = frame_base.wont_implement_method('non-deferred value')
123+
124+
# In Series __contains__ checks the index
125+
__contains__ = frame_base.wont_implement_method('non-deferred value')
126+
120127
@frame_base.args_to_kwargs(pd.Series)
121128
@frame_base.populate_defaults(pd.Series)
122129
def nlargest(self, keep, **kwargs):
@@ -226,29 +233,6 @@ def str(self):
226233
return _DeferredStringMethods(expr)
227234

228235

229-
for base in ['add',
230-
'sub',
231-
'mul',
232-
'div',
233-
'truediv',
234-
'floordiv',
235-
'mod',
236-
'pow',
237-
'and',
238-
'or']:
239-
for p in ['%s', 'r%s', '__%s__', '__r%s__']:
240-
# TODO: non-trivial level?
241-
name = p % base
242-
setattr(
243-
DeferredSeries,
244-
name,
245-
frame_base._elementwise_method(name, restrictions={'level': None}))
246-
setattr(
247-
DeferredSeries,
248-
'__i%s__' % base,
249-
frame_base._elementwise_method('__i%s__' % base, inplace=True))
250-
for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
251-
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
252236
for name in ['apply', 'map', 'transform']:
253237
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
254238

@@ -259,6 +243,10 @@ class DeferredDataFrame(frame_base.DeferredFrame):
259243
def T(self):
260244
return self.transpose()
261245

246+
@property
247+
def columns(self):
248+
return self._expr.proxy().columns
249+
262250
def groupby(self, by):
263251
# TODO: what happens to the existing index?
264252
# We set the columns to index as we have a notion of being partitioned by
@@ -280,13 +268,24 @@ def __getattr__(self, name):
280268

281269
def __getitem__(self, key):
282270
# TODO: Replicate pd.DataFrame.__getitem__ logic
271+
if isinstance(key, frame_base.DeferredBase):
272+
# Fail early if key is a DeferredBase as it interacts surprisingly with
273+
# key in self._expr.proxy().columns
274+
raise NotImplementedError(
275+
"Indexing with a deferred frame is not yet supported. Consider "
276+
"using df.loc[...]")
277+
283278
if (isinstance(key, list) and
284279
all(key_column in self._expr.proxy().columns
285280
for key_column in key)) or key in self._expr.proxy().columns:
286281
return self._elementwise(lambda df: df[key], 'get_column')
287282
else:
288283
raise NotImplementedError(key)
289284

285+
def __contains__(self, key):
286+
# Checks if proxy has the given column
287+
return self._expr.proxy().__contains__(key)
288+
290289
def __setitem__(self, key, value):
291290
if isinstance(key, str):
292291
# yapf: disable
@@ -314,13 +313,37 @@ def set_index(self, keys, **kwargs):
314313
requires_partition_by=partitionings.Nothing(),
315314
preserves_partition_by=partitionings.Nothing()))
316315

317-
def at(self, *args, **kwargs):
318-
raise NotImplementedError()
316+
at = frame_base.not_implemented_method('at')
319317

320318
@property
321319
def loc(self):
322320
return _DeferredLoc(self)
323321

322+
_get_index = _set_index = frame_base.not_implemented_method('index')
323+
index = property(_get_index, _set_index)
324+
325+
@property
326+
def axes(self):
327+
return (self.index, self.columns)
328+
329+
apply = frame_base.not_implemented_method('apply')
330+
explode = frame_base.not_implemented_method('explode')
331+
isin = frame_base.not_implemented_method('isin')
332+
assign = frame_base.not_implemented_method('assign')
333+
append = frame_base.not_implemented_method('append')
334+
combine = frame_base.not_implemented_method('combine')
335+
combine_first = frame_base.not_implemented_method('combine_first')
336+
cov = frame_base.not_implemented_method('cov')
337+
corr = frame_base.not_implemented_method('corr')
338+
count = frame_base.not_implemented_method('count')
339+
dot = frame_base.not_implemented_method('dot')
340+
drop = frame_base.not_implemented_method('drop')
341+
eval = frame_base.not_implemented_method('eval')
342+
reindex = frame_base.not_implemented_method('reindex')
343+
melt = frame_base.not_implemented_method('melt')
344+
pivot = frame_base.not_implemented_method('pivot')
345+
pivot_table = frame_base.not_implemented_method('pivot_table')
346+
324347
def aggregate(self, func, axis=0, *args, **kwargs):
325348
if axis is None:
326349
# Aggregate across all elements by first aggregating across columns,
@@ -383,6 +406,7 @@ def aggregate(self, func, axis=0, *args, **kwargs):
383406
applymap = frame_base._elementwise_method('applymap')
384407

385408
memory_usage = frame_base.wont_implement_method('non-deferred value')
409+
info = frame_base.wont_implement_method('non-deferred value')
386410

387411
all = frame_base._agg_method('all')
388412
any = frame_base._agg_method('any')
@@ -398,6 +422,8 @@ def aggregate(self, func, axis=0, *args, **kwargs):
398422

399423
def mode(self, axis=0, *args, **kwargs):
400424
if axis == 1 or axis == 'columns':
425+
# Number of columns is max(number mode values for each row), so we can't
426+
# determine how many there will be before looking at the data.
401427
raise frame_base.WontImplementError('non-deferred column values')
402428
return frame_base.DeferredFrame.wrap(
403429
expressions.ComputedExpression(
@@ -766,8 +792,7 @@ def sort_values(self, axis, **kwargs):
766792
transform = frame_base._elementwise_method(
767793
'transform', restrictions={'axis': 0})
768794

769-
def transpose(self, *args, **kwargs):
770-
raise frame_base.WontImplementError('non-deferred column values')
795+
transpose = frame_base.wont_implement_method('non-deferred column values')
771796

772797
def unstack(self, *args, **kwargs):
773798
if self._expr.proxy().index.nlevels == 1:
@@ -799,7 +824,10 @@ def unstack(self, *args, **kwargs):
799824
class DeferredGroupBy(frame_base.DeferredFrame):
800825
def agg(self, fn):
801826
if not callable(fn):
802-
raise NotImplementedError(fn)
827+
# TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
828+
# running doctests for pandas.core.groupby.generic
829+
raise NotImplementedError('GroupBy.agg currently only supports callable '
830+
'arguments')
803831
return DeferredDataFrame(
804832
expressions.ComputedExpression(
805833
'agg',
@@ -963,3 +991,37 @@ class _DeferredStringMethods(frame_base.DeferredBase):
963991
setattr(_DeferredStringMethods,
964992
method,
965993
frame_base._elementwise_method(method))
994+
995+
for base in ['add',
996+
'sub',
997+
'mul',
998+
'div',
999+
'truediv',
1000+
'floordiv',
1001+
'mod',
1002+
'pow',
1003+
'and',
1004+
'or']:
1005+
for p in ['%s', 'r%s', '__%s__', '__r%s__']:
1006+
# TODO: non-trivial level?
1007+
name = p % base
1008+
setattr(
1009+
DeferredSeries,
1010+
name,
1011+
frame_base._elementwise_method(name, restrictions={'level': None}))
1012+
setattr(
1013+
DeferredDataFrame,
1014+
name,
1015+
frame_base._elementwise_method(name, restrictions={'level': None}))
1016+
setattr(
1017+
DeferredSeries,
1018+
'__i%s__' % base,
1019+
frame_base._elementwise_method('__i%s__' % base, inplace=True))
1020+
setattr(
1021+
DeferredDataFrame,
1022+
'__i%s__' % base,
1023+
frame_base._elementwise_method('__i%s__' % base, inplace=True))
1024+
1025+
for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
1026+
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
1027+
setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))

sdks/python/apache_beam/dataframe/io.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,14 @@ def read_csv(path, *args, **kwargs):
4040
return _ReadFromPandas(pd.read_csv, path, args, kwargs, incremental=True)
4141

4242

43+
def _as_pc(df):
44+
from apache_beam.dataframe import convert # avoid circular import
45+
# TODO(roberwb): Amortize the computation for multiple writes?
46+
return convert.to_pcollection(df, yield_elements='pandas')
47+
48+
4349
def to_csv(df, path, *args, **kwargs):
44-
return df | _WriteToPandas(
50+
return _as_pc(df) | _WriteToPandas(
4551
pd.DataFrame.to_csv, path, args, kwargs, incremental=True, binary=False)
4652

4753

@@ -68,7 +74,7 @@ def to_json(df, path, orient=None, *args, **kwargs):
6874
else:
6975
raise frame_base.WontImplementError('not dataframes or series')
7076
kwargs['orient'] = orient
71-
return df | _WriteToPandas(
77+
return _as_pc(df) | _WriteToPandas(
7278
pd.DataFrame.to_json,
7379
path,
7480
args,
@@ -87,7 +93,7 @@ def read_html(path, *args, **kwargs):
8793

8894

8995
def to_html(df, path, *args, **kwargs):
90-
return df | _WriteToPandas(
96+
return _as_pc(df) | _WriteToPandas(
9197
pd.DataFrame.to_html,
9298
path,
9399
args,
@@ -109,7 +115,7 @@ def _binary_writer(format):
109115
lambda df,
110116
path,
111117
*args,
112-
**kwargs: df | _WriteToPandas(func, path, args, kwargs))
118+
**kwargs: _as_pc(df) | _WriteToPandas(func, path, args, kwargs))
113119

114120

115121
for format in ('excel', 'feather', 'parquet', 'stata'):
@@ -214,13 +220,6 @@ def __init__(
214220
self.incremental = incremental
215221
self.binary = binary
216222

217-
def __ror__(self, other, label=None):
218-
if isinstance(other, frame_base.DeferredBase):
219-
from apache_beam.dataframe import convert # avoid circular import
220-
# TODO(roberwb): Amortize the computation for multiple writes?
221-
other = convert.to_pcollection(other, yield_elements='pandas')
222-
return super(_WriteToPandas, self).__ror__(other, label)
223-
224223
def expand(self, pcoll):
225224
dir, name = io.filesystems.FileSystems.split(self.path)
226225
return pcoll | fileio.WriteToFiles(

0 commit comments

Comments
 (0)