Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit b6a81f4

Browse files
brkyvztdas
authored andcommitted
[SPARK-18888] partitionBy in DataStreamWriter in Python throws _to_seq not defined
## What changes were proposed in this pull request? `_to_seq` wasn't imported. ## How was this patch tested? Added partitionBy to existing write path unit test Author: Burak Yavuz <[email protected]> Closes apache#16297 from brkyvz/SPARK-18888.
1 parent 900ce55 commit b6a81f4

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

python/pyspark/sql/streaming.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
from pyspark import since, keyword_only
3030
from pyspark.rdd import ignore_unicode_prefix
31+
from pyspark.sql.column import _to_seq
3132
from pyspark.sql.readwriter import OptionUtils, to_str
3233
from pyspark.sql.types import *
3334
from pyspark.sql.utils import StreamingQueryException

python/pyspark/sql/tests.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
from pyspark.sql.types import *
5151
from pyspark.sql.types import UserDefinedType, _infer_type
5252
from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
53-
from pyspark.sql.functions import UserDefinedFunction, sha2
53+
from pyspark.sql.functions import UserDefinedFunction, sha2, lit
5454
from pyspark.sql.window import Window
5555
from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
5656

@@ -1056,7 +1056,8 @@ def test_stream_read_options_overwrite(self):
10561056
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
10571057

10581058
def test_stream_save_options(self):
1059-
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
1059+
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') \
1060+
.withColumn('id', lit(1))
10601061
for q in self.spark._wrapped.streams.active:
10611062
q.stop()
10621063
tmpPath = tempfile.mkdtemp()
@@ -1065,7 +1066,7 @@ def test_stream_save_options(self):
10651066
out = os.path.join(tmpPath, 'out')
10661067
chk = os.path.join(tmpPath, 'chk')
10671068
q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
1068-
.format('parquet').outputMode('append').option('path', out).start()
1069+
.format('parquet').partitionBy('id').outputMode('append').option('path', out).start()
10691070
try:
10701071
self.assertEqual(q.name, 'this_query')
10711072
self.assertTrue(q.isActive)

0 commit comments

Comments
 (0)