diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index d16191bc4ccf..02423f517eea 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -793,7 +793,6 @@ def __init__( if format == 'csv': kwargs['filename_column'] = filename_column self._reader = globals()['read_%s' % format](*args, **kwargs) - self._reader = globals()['read_%s' % format](*args, **kwargs) self._include_indexes = include_indexes self._objects_as_strings = objects_as_strings self._filename_column = filename_column diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 89e4dc8b951c..5cf2fa00f15d 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -34,6 +34,8 @@ except ImportError: jsonschema = None +_LOGGER = logging.getLogger(__name__) + class CreateTimestamped(beam.PTransform): _yaml_requires_inputs = False @@ -244,6 +246,10 @@ def test_csv_to_json(self): input = os.path.join(tmpdir, 'input.csv') output = os.path.join(tmpdir, 'output.json') data.to_csv(input, index=False) + with open(input, 'r') as f: + lines = f.readlines() + _LOGGER.debug("input.csv has these {lines} lines.") + self.assertEqual(len(lines), len(data) + 1) # +1 for header with beam.Pipeline() as p: result = p | YamlTransform( @@ -256,9 +262,11 @@ def test_csv_to_json(self): - type: WriteToJson config: path: %s - num_shards: 1 + num_shards: 1 + - type: LogForTesting ''' % (repr(input), repr(output))) - + all_output = list(glob.glob(output + "*")) + self.assertEqual(len(all_output), 1) output_shard = list(glob.glob(output + "*"))[0] result = pd.read_json( output_shard, orient='records',