Skip to content

Commit de1c77b

Browse files
committed
Revert "[SPARK-50291][PYTHON] Standardize verifySchema parameter of createDataFrame in Spark Classic"
This reverts commit aea9e87.
1 parent 8d7e3d4 commit de1c77b

File tree

5 files changed

+29
-99
lines changed

5 files changed

+29
-99
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
)
2828
from warnings import warn
2929

30-
from pyspark._globals import _NoValue, _NoValueType
3130
from pyspark.errors.exceptions.captured import unwrap_spark_exception
3231
from pyspark.loose_version import LooseVersion
3332
from pyspark.util import _load_from_socket
@@ -353,7 +352,7 @@ def createDataFrame(
353352
self,
354353
data: "PandasDataFrameLike",
355354
schema: Union[StructType, str],
356-
verifySchema: Union[_NoValueType, bool] = ...,
355+
verifySchema: bool = ...,
357356
) -> "DataFrame":
358357
...
359358

@@ -362,7 +361,7 @@ def createDataFrame(
362361
self,
363362
data: "pa.Table",
364363
schema: Union[StructType, str],
365-
verifySchema: Union[_NoValueType, bool] = ...,
364+
verifySchema: bool = ...,
366365
) -> "DataFrame":
367366
...
368367

@@ -371,7 +370,7 @@ def createDataFrame( # type: ignore[misc]
371370
data: Union["PandasDataFrameLike", "pa.Table"],
372371
schema: Optional[Union[StructType, List[str]]] = None,
373372
samplingRatio: Optional[float] = None,
374-
verifySchema: Union[_NoValueType, bool] = _NoValue,
373+
verifySchema: bool = True,
375374
) -> "DataFrame":
376375
from pyspark.sql import SparkSession
377376

@@ -393,7 +392,7 @@ def createDataFrame( # type: ignore[misc]
393392
if schema is None:
394393
schema = data.schema.names
395394

396-
return self._create_from_arrow_table(data, schema, timezone, verifySchema)
395+
return self._create_from_arrow_table(data, schema, timezone)
397396

398397
# `data` is a PandasDataFrameLike object
399398
from pyspark.sql.pandas.utils import require_minimum_pandas_version
@@ -406,7 +405,7 @@ def createDataFrame( # type: ignore[misc]
406405

407406
if self._jconf.arrowPySparkEnabled() and len(data) > 0:
408407
try:
409-
return self._create_from_pandas_with_arrow(data, schema, timezone, verifySchema)
408+
return self._create_from_pandas_with_arrow(data, schema, timezone)
410409
except Exception as e:
411410
if self._jconf.arrowPySparkFallbackEnabled():
412411
msg = (
@@ -625,11 +624,7 @@ def _get_numpy_record_dtype(self, rec: "np.recarray") -> Optional["np.dtype"]:
625624
return np.dtype(record_type_list) if has_rec_fix else None
626625

627626
def _create_from_pandas_with_arrow(
628-
self,
629-
pdf: "PandasDataFrameLike",
630-
schema: Union[StructType, List[str]],
631-
timezone: str,
632-
verifySchema: Union[_NoValueType, bool],
627+
self, pdf: "PandasDataFrameLike", schema: Union[StructType, List[str]], timezone: str
633628
) -> "DataFrame":
634629
"""
635630
Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
@@ -662,10 +657,6 @@ def _create_from_pandas_with_arrow(
662657
)
663658
import pyarrow as pa
664659

665-
if verifySchema is _NoValue:
666-
# (With Arrow optimization) createDataFrame with `pandas.DataFrame`
667-
verifySchema = self._jconf.arrowSafeTypeConversion()
668-
669660
infer_pandas_dict_as_map = (
670661
str(self.conf.get("spark.sql.execution.pandas.inferPandasDictAsMap")).lower() == "true"
671662
)
@@ -734,7 +725,8 @@ def _create_from_pandas_with_arrow(
734725

735726
jsparkSession = self._jsparkSession
736727

737-
ser = ArrowStreamPandasSerializer(timezone, verifySchema)
728+
safecheck = self._jconf.arrowSafeTypeConversion()
729+
ser = ArrowStreamPandasSerializer(timezone, safecheck)
738730

739731
@no_type_check
740732
def reader_func(temp_filename):
@@ -753,11 +745,7 @@ def create_iter_server():
753745
return df
754746

755747
def _create_from_arrow_table(
756-
self,
757-
table: "pa.Table",
758-
schema: Union[StructType, List[str]],
759-
timezone: str,
760-
verifySchema: Union[_NoValueType, bool],
748+
self, table: "pa.Table", schema: Union[StructType, List[str]], timezone: str
761749
) -> "DataFrame":
762750
"""
763751
Create a DataFrame from a given pyarrow.Table by slicing it into partitions then
@@ -779,10 +767,6 @@ def _create_from_arrow_table(
779767

780768
require_minimum_pyarrow_version()
781769

782-
if verifySchema is _NoValue:
783-
# createDataFrame with `pyarrow.Table`
784-
verifySchema = False
785-
786770
prefer_timestamp_ntz = is_timestamp_ntz_preferred()
787771

788772
# Create the Spark schema from list of names passed in with Arrow types
@@ -802,8 +786,7 @@ def _create_from_arrow_table(
802786
schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz)
803787

804788
table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast(
805-
to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True),
806-
safe=verifySchema,
789+
to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True)
807790
)
808791

809792
# Chunk the Arrow Table into RecordBatches

python/pyspark/sql/session.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
TYPE_CHECKING,
3939
)
4040

41-
from pyspark._globals import _NoValue, _NoValueType
4241
from pyspark.conf import SparkConf
4342
from pyspark.util import is_remote_only
4443
from pyspark.sql.conf import RuntimeConfig
@@ -1272,7 +1271,7 @@ def createDataFrame(
12721271
data: Iterable["RowLike"],
12731272
schema: Union[StructType, str],
12741273
*,
1275-
verifySchema: Union[_NoValueType, bool] = ...,
1274+
verifySchema: bool = ...,
12761275
) -> DataFrame:
12771276
...
12781277

@@ -1282,7 +1281,7 @@ def createDataFrame(
12821281
data: "RDD[RowLike]",
12831282
schema: Union[StructType, str],
12841283
*,
1285-
verifySchema: Union[_NoValueType, bool] = ...,
1284+
verifySchema: bool = ...,
12861285
) -> DataFrame:
12871286
...
12881287

@@ -1291,7 +1290,7 @@ def createDataFrame(
12911290
self,
12921291
data: "RDD[AtomicValue]",
12931292
schema: Union[AtomicType, str],
1294-
verifySchema: Union[_NoValueType, bool] = ...,
1293+
verifySchema: bool = ...,
12951294
) -> DataFrame:
12961295
...
12971296

@@ -1300,7 +1299,7 @@ def createDataFrame(
13001299
self,
13011300
data: Iterable["AtomicValue"],
13021301
schema: Union[AtomicType, str],
1303-
verifySchema: Union[_NoValueType, bool] = ...,
1302+
verifySchema: bool = ...,
13041303
) -> DataFrame:
13051304
...
13061305

@@ -1319,7 +1318,7 @@ def createDataFrame(
13191318
self,
13201319
data: "PandasDataFrameLike",
13211320
schema: Union[StructType, str],
1322-
verifySchema: Union[_NoValueType, bool] = ...,
1321+
verifySchema: bool = ...,
13231322
) -> DataFrame:
13241323
...
13251324

@@ -1328,7 +1327,7 @@ def createDataFrame(
13281327
self,
13291328
data: "pa.Table",
13301329
schema: Union[StructType, str],
1331-
verifySchema: Union[_NoValueType, bool] = ...,
1330+
verifySchema: bool = ...,
13321331
) -> DataFrame:
13331332
...
13341333

@@ -1337,7 +1336,7 @@ def createDataFrame( # type: ignore[misc]
13371336
data: Union["RDD[Any]", Iterable[Any], "PandasDataFrameLike", "ArrayLike", "pa.Table"],
13381337
schema: Optional[Union[AtomicType, StructType, str]] = None,
13391338
samplingRatio: Optional[float] = None,
1340-
verifySchema: Union[_NoValueType, bool] = _NoValue,
1339+
verifySchema: bool = True,
13411340
) -> DataFrame:
13421341
"""
13431342
Creates a :class:`DataFrame` from an :class:`RDD`, a list, a :class:`pandas.DataFrame`,
@@ -1381,14 +1380,11 @@ def createDataFrame( # type: ignore[misc]
13811380
if ``samplingRatio`` is ``None``. This option is effective only when the input is
13821381
:class:`RDD`.
13831382
verifySchema : bool, optional
1384-
verify data types of every row against schema.
1385-
If not provided, createDataFrame with
1386-
- pyarrow.Table, verifySchema=False
1387-
- pandas.DataFrame with Arrow optimization, verifySchema defaults to
1388-
`spark.sql.execution.pandas.convertToArrowArraySafely`
1389-
- pandas.DataFrame without Arrow optimization, verifySchema=True
1390-
- regular Python instances, verifySchema=True
1391-
Arrow optimization is enabled/disabled via `spark.sql.execution.arrow.pyspark.enabled`.
1383+
verify data types of every row against schema. Enabled by default.
1384+
When the input is :class:`pyarrow.Table` or when the input class is
1385+
:class:`pandas.DataFrame` and `spark.sql.execution.arrow.pyspark.enabled` is enabled,
1386+
this option is not effective. It follows Arrow type coercion. This option is not
1387+
supported with Spark Connect.
13921388
13931389
.. versionadded:: 2.1.0
13941390
@@ -1588,13 +1584,8 @@ def _create_dataframe(
15881584
data: Union["RDD[Any]", Iterable[Any]],
15891585
schema: Optional[Union[DataType, List[str]]],
15901586
samplingRatio: Optional[float],
1591-
verifySchema: Union[_NoValueType, bool],
1587+
verifySchema: bool,
15921588
) -> DataFrame:
1593-
if verifySchema is _NoValue:
1594-
# createDataFrame with regular Python instances
1595-
# or (without Arrow optimization) createDataFrame with Pandas DataFrame
1596-
verifySchema = True
1597-
15981589
if isinstance(schema, StructType):
15991590
verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True
16001591

python/pyspark/sql/tests/connect/test_parity_arrow.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ def test_toPandas_udt(self):
137137
def test_create_dataframe_namedtuples(self):
138138
self.check_create_dataframe_namedtuples(True)
139139

140-
@unittest.skip("Spark Connect does not support verifySchema.")
141-
def test_createDataFrame_verifySchema(self):
142-
super().test_createDataFrame_verifySchema()
143-
144140

145141
if __name__ == "__main__":
146142
from pyspark.sql.tests.connect.test_parity_arrow import * # noqa: F401

python/pyspark/sql/tests/test_arrow.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -532,45 +532,6 @@ def test_createDataFrame_arrow_pandas(self):
532532
df_pandas = self.spark.createDataFrame(pdf)
533533
self.assertEqual(df_arrow.collect(), df_pandas.collect())
534534

535-
def test_createDataFrame_verifySchema(self):
536-
data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 300000000000]}
537-
# data.value should fail schema validation when verifySchema is True
538-
schema = StructType(
539-
[StructField("id", IntegerType(), True), StructField("value", IntegerType(), True)]
540-
)
541-
expected = [
542-
Row(id=1, value=1215752192),
543-
Row(id=2, value=-1863462912),
544-
Row(id=3, value=-647710720),
545-
]
546-
# Arrow table
547-
table = pa.table(data)
548-
df = self.spark.createDataFrame(table, schema=schema)
549-
self.assertEqual(df.collect(), expected)
550-
551-
with self.assertRaises(Exception):
552-
self.spark.createDataFrame(table, schema=schema, verifySchema=True)
553-
554-
# pandas DataFrame with Arrow optimization
555-
pdf = pd.DataFrame(data)
556-
df = self.spark.createDataFrame(pdf, schema=schema)
557-
# verifySchema defaults to `spark.sql.execution.pandas.convertToArrowArraySafely`,
558-
# which is false by default
559-
self.assertEqual(df.collect(), expected)
560-
with self.assertRaises(Exception):
561-
with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
562-
df = self.spark.createDataFrame(pdf, schema=schema)
563-
with self.assertRaises(Exception):
564-
df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True)
565-
566-
# pandas DataFrame without Arrow optimization
567-
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
568-
pdf = pd.DataFrame(data)
569-
with self.assertRaises(Exception):
570-
df = self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to True
571-
df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=False)
572-
self.assertEqual(df.collect(), expected)
573-
574535
def _createDataFrame_toggle(self, data, schema=None):
575536
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}):
576537
df_no_arrow = self.spark.createDataFrame(data, schema=schema)

python/pyspark/sql/tests/typing/test_session.yml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
- case: createDataFrameStructsValid
1919
main: |
20-
from pyspark._globals import _NoValueType
2120
from pyspark.sql import SparkSession
2221
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
2322
@@ -79,14 +78,14 @@
7978
main:18: note: Possible overload variants:
8079
main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: Iterable[RowLike], schema: Union[list[str], tuple[str, ...]] = ..., samplingRatio: Optional[float] = ...) -> DataFrame
8180
main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: RDD[RowLike], schema: Union[list[str], tuple[str, ...]] = ..., samplingRatio: Optional[float] = ...) -> DataFrame
82-
main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: Iterable[RowLike], schema: Union[StructType, str], *, verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
83-
main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: RDD[RowLike], schema: Union[StructType, str], *, verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
84-
main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: RDD[AtomicValue], schema: Union[AtomicType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
85-
main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: Iterable[AtomicValue], schema: Union[AtomicType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
81+
main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: Iterable[RowLike], schema: Union[StructType, str], *, verifySchema: bool = ...) -> DataFrame
82+
main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: RDD[RowLike], schema: Union[StructType, str], *, verifySchema: bool = ...) -> DataFrame
83+
main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: RDD[AtomicValue], schema: Union[AtomicType, str], verifySchema: bool = ...) -> DataFrame
84+
main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: Iterable[AtomicValue], schema: Union[AtomicType, str], verifySchema: bool = ...) -> DataFrame
8685
main:18: note: def createDataFrame(self, data: DataFrame, samplingRatio: Optional[float] = ...) -> DataFrame
8786
main:18: note: def createDataFrame(self, data: Any, samplingRatio: Optional[float] = ...) -> DataFrame
88-
main:18: note: def createDataFrame(self, data: DataFrame, schema: Union[StructType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
89-
main:18: note: def createDataFrame(self, data: Any, schema: Union[StructType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame
87+
main:18: note: def createDataFrame(self, data: DataFrame, schema: Union[StructType, str], verifySchema: bool = ...) -> DataFrame
88+
main:18: note: def createDataFrame(self, data: Any, schema: Union[StructType, str], verifySchema: bool = ...) -> DataFrame
9089
9190
- case: createDataFrameFromEmptyRdd
9291
main: |

0 commit comments

Comments
 (0)