Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sdks/python/apache_beam/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,6 @@ def __init__(
if format == 'csv':
kwargs['filename_column'] = filename_column
self._reader = globals()['read_%s' % format](*args, **kwargs)
self._reader = globals()['read_%s' % format](*args, **kwargs)
self._include_indexes = include_indexes
self._objects_as_strings = objects_as_strings
self._filename_column = filename_column
Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
except ImportError:
jsonschema = None

_LOGGER = logging.getLogger(__name__)


class CreateTimestamped(beam.PTransform):
_yaml_requires_inputs = False
Expand Down Expand Up @@ -244,6 +246,10 @@ def test_csv_to_json(self):
input = os.path.join(tmpdir, 'input.csv')
output = os.path.join(tmpdir, 'output.json')
data.to_csv(input, index=False)
with open(input, 'r') as f:
lines = f.readlines()
_LOGGER.debug("input.csv has these {lines} lines.")
self.assertEqual(len(lines), len(data) + 1) # +1 for header

with beam.Pipeline() as p:
result = p | YamlTransform(
Expand All @@ -256,9 +262,11 @@ def test_csv_to_json(self):
- type: WriteToJson
config:
path: %s
num_shards: 1
num_shards: 1
- type: LogForTesting
''' % (repr(input), repr(output)))

all_output = list(glob.glob(output + "*"))
self.assertEqual(len(all_output), 1)
output_shard = list(glob.glob(output + "*"))[0]
result = pd.read_json(
output_shard, orient='records',
Expand Down
Loading