Skip to content

Commit 8d7e3d4

Browse files
committed
Revert "[SPARK-50298][PYTHON][CONNECT] Implement verifySchema parameter of createDataFrame in Spark Connect"
This reverts commit e1477a3.
1 parent f5bb11c commit 8d7e3d4

File tree

4 files changed

+31
-54
lines changed

4 files changed

+31
-54
lines changed

python/pyspark/sql/connect/conversion.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ def convert_other(value: Any) -> Any:
322322
return lambda value: value
323323

324324
@staticmethod
325-
def convert(data: Sequence[Any], schema: StructType, verifySchema: bool = False) -> "pa.Table":
325+
def convert(data: Sequence[Any], schema: StructType) -> "pa.Table":
326326
assert isinstance(data, list) and len(data) > 0
327327

328328
assert schema is not None and isinstance(schema, StructType)
@@ -372,8 +372,8 @@ def convert(data: Sequence[Any], schema: StructType, verifySchema: bool = False)
372372
]
373373
)
374374
)
375-
table = pa.Table.from_arrays(pylist, schema=pa_schema)
376-
return table.cast(pa_schema, safe=verifySchema)
375+
376+
return pa.Table.from_arrays(pylist, schema=pa_schema)
377377

378378

379379
class ArrowTableToRowsConversion:

python/pyspark/sql/connect/session.py

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
)
5151
import urllib
5252

53-
from pyspark._globals import _NoValue, _NoValueType
5453
from pyspark.sql.connect.dataframe import DataFrame
5554
from pyspark.sql.dataframe import DataFrame as ParentDataFrame
5655
from pyspark.sql.connect.logging import logger
@@ -450,7 +449,7 @@ def createDataFrame(
450449
data: Union["pd.DataFrame", "np.ndarray", "pa.Table", Iterable[Any]],
451450
schema: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str, ...]]] = None,
452451
samplingRatio: Optional[float] = None,
453-
verifySchema: Union[_NoValueType, bool] = _NoValue,
452+
verifySchema: Optional[bool] = None,
454453
) -> "ParentDataFrame":
455454
assert data is not None
456455
if isinstance(data, DataFrame):
@@ -462,6 +461,9 @@ def createDataFrame(
462461
if samplingRatio is not None:
463462
warnings.warn("'samplingRatio' is ignored. It is not supported with Spark Connect.")
464463

464+
if verifySchema is not None:
465+
warnings.warn("'verifySchema' is ignored. It is not supported with Spark Connect.")
466+
465467
_schema: Optional[Union[AtomicType, StructType]] = None
466468
_cols: Optional[List[str]] = None
467469
_num_cols: Optional[int] = None
@@ -574,10 +576,7 @@ def createDataFrame(
574576
"spark.sql.session.timeZone", "spark.sql.execution.pandas.convertToArrowArraySafely"
575577
)
576578

577-
if verifySchema is _NoValue:
578-
verifySchema = safecheck == "true"
579-
580-
ser = ArrowStreamPandasSerializer(cast(str, timezone), verifySchema)
579+
ser = ArrowStreamPandasSerializer(cast(str, timezone), safecheck == "true")
581580

582581
_table = pa.Table.from_batches(
583582
[
@@ -597,9 +596,6 @@ def createDataFrame(
597596
).cast(arrow_schema)
598597

599598
elif isinstance(data, pa.Table):
600-
if verifySchema is _NoValue:
601-
verifySchema = False
602-
603599
prefer_timestamp_ntz = is_timestamp_ntz_preferred()
604600

605601
(timezone,) = self._client.get_configs("spark.sql.session.timeZone")
@@ -617,10 +613,7 @@ def createDataFrame(
617613

618614
_table = (
619615
_check_arrow_table_timestamps_localize(data, schema, True, timezone)
620-
.cast(
621-
to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True),
622-
safe=verifySchema,
623-
)
616+
.cast(to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True))
624617
.rename_columns(schema.names)
625618
)
626619

@@ -659,12 +652,6 @@ def createDataFrame(
659652
# The _table should already have the proper column names.
660653
_cols = None
661654

662-
if verifySchema is not _NoValue:
663-
warnings.warn(
664-
"'verifySchema' is ignored. It is not supported"
665-
" with np.ndarray input on Spark Connect."
666-
)
667-
668655
else:
669656
_data = list(data)
670657

@@ -696,15 +683,12 @@ def createDataFrame(
696683
errorClass="CANNOT_DETERMINE_TYPE", messageParameters={}
697684
)
698685

699-
if verifySchema is _NoValue:
700-
verifySchema = True
701-
702686
from pyspark.sql.connect.conversion import LocalDataToArrowConversion
703687

704688
# Spark Connect will try its best to build the Arrow table with the
705689
# inferred schema in the client side, and then rename the columns and
706690
# cast the datatypes in the server side.
707-
_table = LocalDataToArrowConversion.convert(_data, _schema, cast(bool, verifySchema))
691+
_table = LocalDataToArrowConversion.convert(_data, _schema)
708692

709693
# TODO: Beside the validation on number of columns, we should also check
710694
# whether the Arrow Schema is compatible with the user provided Schema.

python/pyspark/sql/tests/connect/test_parity_arrow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ def test_toPandas_udt(self):
137137
def test_create_dataframe_namedtuples(self):
138138
self.check_create_dataframe_namedtuples(True)
139139

140+
@unittest.skip("Spark Connect does not support verifySchema.")
140141
def test_createDataFrame_verifySchema(self):
141-
self.check_createDataFrame_verifySchema(True)
142+
super().test_createDataFrame_verifySchema()
142143

143144

144145
if __name__ == "__main__":

python/pyspark/sql/tests/test_arrow.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -533,11 +533,6 @@ def test_createDataFrame_arrow_pandas(self):
533533
self.assertEqual(df_arrow.collect(), df_pandas.collect())
534534

535535
def test_createDataFrame_verifySchema(self):
536-
for arrow_enabled in [True, False]:
537-
with self.subTest(arrow_enabled=arrow_enabled):
538-
self.check_createDataFrame_verifySchema(arrow_enabled)
539-
540-
def check_createDataFrame_verifySchema(self, arrow_enabled):
541536
data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 300000000000]}
542537
# data.value should fail schema validation when verifySchema is True
543538
schema = StructType(
@@ -552,32 +547,29 @@ def check_createDataFrame_verifySchema(self, arrow_enabled):
552547
table = pa.table(data)
553548
df = self.spark.createDataFrame(table, schema=schema)
554549
self.assertEqual(df.collect(), expected)
550+
555551
with self.assertRaises(Exception):
556552
self.spark.createDataFrame(table, schema=schema, verifySchema=True)
557553

558-
if arrow_enabled:
559-
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": True}):
560-
# pandas DataFrame with Arrow optimization
561-
pdf = pd.DataFrame(data)
554+
# pandas DataFrame with Arrow optimization
555+
pdf = pd.DataFrame(data)
556+
df = self.spark.createDataFrame(pdf, schema=schema)
557+
# verifySchema defaults to `spark.sql.execution.pandas.convertToArrowArraySafely`,
558+
# which is false by default
559+
self.assertEqual(df.collect(), expected)
560+
with self.assertRaises(Exception):
561+
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
562562
df = self.spark.createDataFrame(pdf, schema=schema)
563-
# verifySchema defaults to `spark.sql.execution.pandas.convertToArrowArraySafely`,
564-
# which is false by default
565-
self.assertEqual(df.collect(), expected)
566-
with self.assertRaises(Exception):
567-
with self.sql_conf(
568-
{"spark.sql.execution.pandas.convertToArrowArraySafely": True}
569-
):
570-
df = self.spark.createDataFrame(pdf, schema=schema)
571-
with self.assertRaises(Exception):
572-
df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True)
573-
else:
574-
# pandas DataFrame without Arrow optimization
575-
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
576-
pdf = pd.DataFrame(data)
577-
with self.assertRaises(Exception):
578-
self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to True
579-
df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=False)
580-
self.assertEqual(df.collect(), expected)
563+
with self.assertRaises(Exception):
564+
df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True)
565+
566+
# pandas DataFrame without Arrow optimization
567+
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
568+
pdf = pd.DataFrame(data)
569+
with self.assertRaises(Exception):
570+
df = self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to True
571+
df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=False)
572+
self.assertEqual(df.collect(), expected)
581573

582574
def _createDataFrame_toggle(self, data, schema=None):
583575
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):

0 commit comments

Comments
 (0)