Skip to content

Commit 3dd3a62

Browse files
nchammasHyukjinKwon
authored andcommitted
[SPARK-27990][SPARK-29903][PYTHON] Add recursiveFileLookup option to Python DataFrameReader
### What changes were proposed in this pull request? As a follow-up to apache#24830, this PR adds the `recursiveFileLookup` option to the Python DataFrameReader API. ### Why are the changes needed? This PR maintains Python feature parity with Scala. ### Does this PR introduce any user-facing change? Yes. Before this PR, you'd only be able to use this option as follows: ```python spark.read.option("recursiveFileLookup", True).text("test-data").show() ``` With this PR, you can reference the option from within the format-specific method: ```python spark.read.text("test-data", recursiveFileLookup=True).show() ``` This option now also shows up in the Python API docs. ### How was this patch tested? I tested this manually by creating the following directories with dummy data: ``` test-data ├── 1.txt └── nested └── 2.txt test-parquet ├── nested │ ├── _SUCCESS │ ├── part-00000-...-.parquet ├── _SUCCESS ├── part-00000-...-.parquet ``` I then ran the following tests and confirmed the output looked good: ```python spark.read.parquet("test-parquet", recursiveFileLookup=True).show() spark.read.text("test-data", recursiveFileLookup=True).show() spark.read.csv("test-data", recursiveFileLookup=True).show() ``` `python/pyspark/sql/tests/test_readwriter.py` seems pretty sparse. I'm happy to add my tests there, though it seems we have been deferring testing like this to the Scala side of things. Closes apache#26718 from nchammas/SPARK-27990-recursiveFileLookup-python. Authored-by: Nicholas Chammas <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent f3abee3 commit 3dd3a62

File tree

2 files changed

+56
-16
lines changed

2 files changed

+56
-16
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
171171
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
172172
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
173173
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
174-
dropFieldIfAllNull=None, encoding=None, locale=None):
174+
dropFieldIfAllNull=None, encoding=None, locale=None, recursiveFileLookup=None):
175175
"""
176176
Loads JSON files and returns the results as a :class:`DataFrame`.
177177
@@ -247,6 +247,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
247247
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
248248
it uses the default value, ``en-US``. For instance, ``locale`` is used while
249249
parsing dates and timestamps.
250+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
251+
disables `partition discovery`_.
252+
253+
.. _partition discovery: /sql-data-sources-parquet.html#partition-discovery
250254
251255
>>> df1 = spark.read.json('python/test_support/sql/people.json')
252256
>>> df1.dtypes
@@ -266,7 +270,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
266270
timestampFormat=timestampFormat, multiLine=multiLine,
267271
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
268272
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
269-
locale=locale)
273+
locale=locale, recursiveFileLookup=recursiveFileLookup)
270274
if isinstance(path, basestring):
271275
path = [path]
272276
if type(path) == list:
@@ -300,9 +304,12 @@ def table(self, tableName):
300304
return self._df(self._jreader.table(tableName))
301305

302306
@since(1.4)
303-
def parquet(self, *paths):
307+
def parquet(self, *paths, **options):
304308
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
305309
310+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
311+
disables `partition discovery`_.
312+
306313
You can set the following Parquet-specific option(s) for reading Parquet files:
307314
* ``mergeSchema``: sets whether we should merge schemas collected from all \
308315
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
@@ -312,11 +319,13 @@ def parquet(self, *paths):
312319
>>> df.dtypes
313320
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
314321
"""
322+
recursiveFileLookup = options.get('recursiveFileLookup', None)
323+
self._set_opts(recursiveFileLookup=recursiveFileLookup)
315324
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
316325

317326
@ignore_unicode_prefix
318327
@since(1.6)
319-
def text(self, paths, wholetext=False, lineSep=None):
328+
def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
320329
"""
321330
Loads text files and returns a :class:`DataFrame` whose schema starts with a
322331
string column named "value", and followed by partitioned columns if there
@@ -329,6 +338,8 @@ def text(self, paths, wholetext=False, lineSep=None):
329338
:param wholetext: if true, read each file from input path(s) as a single row.
330339
:param lineSep: defines the line separator that should be used for parsing. If None is
331340
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
341+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
342+
disables `partition discovery`_.
332343
333344
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
334345
>>> df.collect()
@@ -337,7 +348,8 @@ def text(self, paths, wholetext=False, lineSep=None):
337348
>>> df.collect()
338349
[Row(value=u'hello\\nthis')]
339350
"""
340-
self._set_opts(wholetext=wholetext, lineSep=lineSep)
351+
self._set_opts(
352+
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
341353
if isinstance(paths, basestring):
342354
paths = [paths]
343355
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@@ -349,7 +361,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
349361
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
350362
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
351363
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
352-
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
364+
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
365+
recursiveFileLookup=None):
353366
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
354367
355368
This function will go through the input once to determine the input schema if
@@ -457,6 +470,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
457470
:param lineSep: defines the line separator that should be used for parsing. If None is
458471
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
459472
Maximum length is 1 character.
473+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
474+
disables `partition discovery`_.
460475
461476
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
462477
>>> df.dtypes
@@ -476,7 +491,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
476491
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
477492
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
478493
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
479-
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep)
494+
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
495+
recursiveFileLookup=recursiveFileLookup)
480496
if isinstance(path, basestring):
481497
path = [path]
482498
if type(path) == list:
@@ -504,13 +520,17 @@ def func(iterator):
504520
raise TypeError("path can be only string, list or RDD")
505521

506522
@since(1.5)
507-
def orc(self, path):
523+
def orc(self, path, recursiveFileLookup=None):
508524
"""Loads ORC files, returning the result as a :class:`DataFrame`.
509525
526+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
527+
disables `partition discovery`_.
528+
510529
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
511530
>>> df.dtypes
512531
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
513532
"""
533+
self._set_opts(recursiveFileLookup=recursiveFileLookup)
514534
if isinstance(path, basestring):
515535
path = [path]
516536
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))

python/pyspark/sql/streaming.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
411411
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
412412
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
413413
multiLine=None, allowUnquotedControlChars=None, lineSep=None, locale=None,
414-
dropFieldIfAllNull=None, encoding=None):
414+
dropFieldIfAllNull=None, encoding=None, recursiveFileLookup=None):
415415
"""
416416
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
417417
@@ -487,6 +487,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
487487
the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
488488
the encoding of input JSON will be detected automatically
489489
when the multiLine option is set to ``true``.
490+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
491+
disables `partition discovery`_.
492+
493+
.. _partition discovery: /sql-data-sources-parquet.html#partition-discovery
490494
491495
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
492496
>>> json_sdf.isStreaming
@@ -502,33 +506,41 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
502506
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
503507
timestampFormat=timestampFormat, multiLine=multiLine,
504508
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
505-
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding)
509+
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
510+
recursiveFileLookup=recursiveFileLookup)
506511
if isinstance(path, basestring):
507512
return self._df(self._jreader.json(path))
508513
else:
509514
raise TypeError("path can be only a single string")
510515

511516
@since(2.3)
512-
def orc(self, path):
517+
def orc(self, path, recursiveFileLookup=None):
513518
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.
514519
515520
.. note:: Evolving.
516521
522+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
523+
disables `partition discovery`_.
524+
517525
>>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
518526
>>> orc_sdf.isStreaming
519527
True
520528
>>> orc_sdf.schema == sdf_schema
521529
True
522530
"""
531+
self._set_opts(recursiveFileLookup=recursiveFileLookup)
523532
if isinstance(path, basestring):
524533
return self._df(self._jreader.orc(path))
525534
else:
526535
raise TypeError("path can be only a single string")
527536

528537
@since(2.0)
529-
def parquet(self, path):
538+
def parquet(self, path, recursiveFileLookup=None):
530539
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
531540
541+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
542+
disables `partition discovery`_.
543+
532544
You can set the following Parquet-specific option(s) for reading Parquet files:
533545
* ``mergeSchema``: sets whether we should merge schemas collected from all \
534546
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
@@ -542,14 +554,15 @@ def parquet(self, path):
542554
>>> parquet_sdf.schema == sdf_schema
543555
True
544556
"""
557+
self._set_opts(recursiveFileLookup=recursiveFileLookup)
545558
if isinstance(path, basestring):
546559
return self._df(self._jreader.parquet(path))
547560
else:
548561
raise TypeError("path can be only a single string")
549562

550563
@ignore_unicode_prefix
551564
@since(2.0)
552-
def text(self, path, wholetext=False, lineSep=None):
565+
def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None):
553566
"""
554567
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
555568
string column named "value", and followed by partitioned columns if there
@@ -564,14 +577,17 @@ def text(self, path, wholetext=False, lineSep=None):
564577
:param wholetext: if true, read each file from input path(s) as a single row.
565578
:param lineSep: defines the line separator that should be used for parsing. If None is
566579
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
580+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
581+
disables `partition discovery`_.
567582
568583
>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
569584
>>> text_sdf.isStreaming
570585
True
571586
>>> "value" in str(text_sdf.schema)
572587
True
573588
"""
574-
self._set_opts(wholetext=wholetext, lineSep=lineSep)
589+
self._set_opts(
590+
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
575591
if isinstance(path, basestring):
576592
return self._df(self._jreader.text(path))
577593
else:
@@ -584,7 +600,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
584600
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
585601
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
586602
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
587-
enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
603+
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
604+
recursiveFileLookup=None):
588605
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
589606
590607
This function will go through the input once to determine the input schema if
@@ -687,6 +704,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
687704
:param lineSep: defines the line separator that should be used for parsing. If None is
688705
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
689706
Maximum length is 1 character.
707+
:param recursiveFileLookup: recursively scan a directory for files. Using this option
708+
disables `partition discovery`_.
690709
691710
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
692711
>>> csv_sdf.isStreaming
@@ -704,7 +723,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
704723
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
705724
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
706725
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
707-
emptyValue=emptyValue, locale=locale, lineSep=lineSep)
726+
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
727+
recursiveFileLookup=recursiveFileLookup)
708728
if isinstance(path, basestring):
709729
return self._df(self._jreader.csv(path))
710730
else:

0 commit comments

Comments
 (0)