|
37 | 37 | from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
|
38 | 38 | from pyspark.sql.readwriter import DataFrameWriter
|
39 | 39 | from pyspark.sql.streaming import DataStreamWriter
|
| 40 | +from pyspark.sql.types import IntegralType |
40 | 41 | from pyspark.sql.types import *
|
41 | 42 |
|
42 | 43 | __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
|
@@ -1891,14 +1892,20 @@ def toPandas(self):
|
1891 | 1892 | "if using spark.sql.execution.arrow.enable=true"
|
1892 | 1893 | raise ImportError("%s\n%s" % (e.message, msg))
|
1893 | 1894 | else:
|
| 1895 | + pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) |
| 1896 | + |
1894 | 1897 | dtype = {}
|
1895 | 1898 | for field in self.schema:
|
1896 | 1899 | pandas_type = _to_corrected_pandas_type(field.dataType)
|
1897 |
| - if pandas_type is not None: |
| 1900 | + # SPARK-21766: if an integer field is nullable and has null values, it can be |
| 1901 | + # inferred by pandas as float column. Once we convert the column with NaN back |
| 1902 | + # to integer type e.g., np.int16, we will hit exception. So we use the inferred |
| 1903 | + # float type, not the corrected type from the schema in this case. |
| 1904 | + if pandas_type is not None and \ |
| 1905 | + not(isinstance(field.dataType, IntegralType) and field.nullable and |
| 1906 | + pdf[field.name].isnull().any()): |
1898 | 1907 | dtype[field.name] = pandas_type
|
1899 | 1908 |
|
1900 |
| - pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) |
1901 |
| - |
1902 | 1909 | for f, t in dtype.items():
|
1903 | 1910 | pdf[f] = pdf[f].astype(t, copy=False)
|
1904 | 1911 | return pdf
|
|
0 commit comments