Skip to content

Commit e766a32

Browse files
nchammasHyukjinKwon
authored andcommitted
[SPARK-30091][SQL][PYTHON] Document mergeSchema option directly in the PySpark Parquet APIs
### What changes were proposed in this pull request? This change properly documents the `mergeSchema` option directly in the Python APIs for reading Parquet data. ### Why are the changes needed? The docstring for `DataFrameReader.parquet()` mentions `mergeSchema` but doesn't show it in the API. It seems like a simple oversight. Before this PR, you'd have to do this to use `mergeSchema`: ```python spark.read.option('mergeSchema', True).parquet('test-parquet').show() ``` After this PR, you can use the option as (I believe) it was intended to be used: ```python spark.read.parquet('test-parquet', mergeSchema=True).show() ``` ### Does this PR introduce any user-facing change? Yes, this PR changes the signatures of `DataFrameReader.parquet()` and `DataStreamReader.parquet()` to match their docstrings. ### How was this patch tested? Testing the `mergeSchema` option directly seems to be left to the Scala side of the codebase. I tested my change manually to confirm the API works. I also confirmed that setting `spark.sql.parquet.mergeSchema` at the session does not get overridden by leaving `mergeSchema` at its default when calling `parquet()`: ``` >>> spark.conf.set('spark.sql.parquet.mergeSchema', True) >>> spark.range(3).write.parquet('test-parquet/id') >>> spark.range(3).withColumnRenamed('id', 'name').write.parquet('test-parquet/name') >>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet').show() +----+----+ | id|name| +----+----+ |null| 1| |null| 2| |null| 0| | 1|null| | 2|null| | 0|null| +----+----+ >>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet', mergeSchema=False).show() +----+ | id| +----+ |null| |null| |null| | 1| | 2| | 0| +----+ ``` Closes apache#26730 from nchammas/parquet-merge-schema. Authored-by: Nicholas Chammas <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 708cf16 commit e766a32

File tree

2 files changed

+16
-17
lines changed

2 files changed

+16
-17
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -305,22 +305,22 @@ def table(self, tableName):
305305

306306
@since(1.4)
307307
def parquet(self, *paths, **options):
308-
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
308+
"""
309+
Loads Parquet files, returning the result as a :class:`DataFrame`.
309310
311+
:param mergeSchema: sets whether we should merge schemas collected from all
312+
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
313+
The default value is specified in ``spark.sql.parquet.mergeSchema``.
310314
:param recursiveFileLookup: recursively scan a directory for files. Using this option
311315
disables `partition discovery`_.
312316
313-
You can set the following Parquet-specific option(s) for reading Parquet files:
314-
* ``mergeSchema``: sets whether we should merge schemas collected from all \
315-
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
316-
The default value is specified in ``spark.sql.parquet.mergeSchema``.
317-
318317
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
319318
>>> df.dtypes
320319
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
321320
"""
321+
mergeSchema = options.get('mergeSchema', None)
322322
recursiveFileLookup = options.get('recursiveFileLookup', None)
323-
self._set_opts(recursiveFileLookup=recursiveFileLookup)
323+
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
324324
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
325325

326326
@ignore_unicode_prefix

python/pyspark/sql/streaming.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -535,26 +535,25 @@ def orc(self, path, recursiveFileLookup=None):
535535
raise TypeError("path can be only a single string")
536536

537537
@since(2.0)
538-
def parquet(self, path, recursiveFileLookup=None):
539-
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
538+
def parquet(self, path, mergeSchema=None, recursiveFileLookup=None):
539+
"""
540+
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
541+
542+
.. note:: Evolving.
540543
544+
:param mergeSchema: sets whether we should merge schemas collected from all
545+
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
546+
The default value is specified in ``spark.sql.parquet.mergeSchema``.
541547
:param recursiveFileLookup: recursively scan a directory for files. Using this option
542548
disables `partition discovery`_.
543549
544-
You can set the following Parquet-specific option(s) for reading Parquet files:
545-
* ``mergeSchema``: sets whether we should merge schemas collected from all \
546-
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
547-
The default value is specified in ``spark.sql.parquet.mergeSchema``.
548-
549-
.. note:: Evolving.
550-
551550
>>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
552551
>>> parquet_sdf.isStreaming
553552
True
554553
>>> parquet_sdf.schema == sdf_schema
555554
True
556555
"""
557-
self._set_opts(recursiveFileLookup=recursiveFileLookup)
556+
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
558557
if isinstance(path, basestring):
559558
return self._df(self._jreader.parquet(path))
560559
else:

0 commit comments

Comments
 (0)