Skip to content

Commit f93b254

Browse files
author
Robert Kruszewski
committed
Merge branch 'master' into rk/resync
2 parents c980916 + 3e6a714 commit f93b254

File tree

4 files changed

+49
-4
lines changed

4 files changed

+49
-4
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
3838
from pyspark.sql.readwriter import DataFrameWriter
3939
from pyspark.sql.streaming import DataStreamWriter
40+
from pyspark.sql.types import IntegralType
4041
from pyspark.sql.types import *
4142

4243
__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
@@ -1891,14 +1892,20 @@ def toPandas(self):
18911892
"if using spark.sql.execution.arrow.enable=true"
18921893
raise ImportError("%s\n%s" % (e.message, msg))
18931894
else:
1895+
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
1896+
18941897
dtype = {}
18951898
for field in self.schema:
18961899
pandas_type = _to_corrected_pandas_type(field.dataType)
1897-
if pandas_type is not None:
1900+
# SPARK-21766: if an integer field is nullable and has null values, it can be
1901+
# inferred by pandas as float column. Once we convert the column with NaN back
1902+
# to integer type e.g., np.int16, we will hit exception. So we use the inferred
1903+
# float type, not the corrected type from the schema in this case.
1904+
if pandas_type is not None and \
1905+
not(isinstance(field.dataType, IntegralType) and field.nullable and
1906+
pdf[field.name].isnull().any()):
18981907
dtype[field.name] = pandas_type
18991908

1900-
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
1901-
19021909
for f, t in dtype.items():
19031910
pdf[f] = pdf[f].astype(t, copy=False)
19041911
return pdf

python/pyspark/sql/tests.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2564,6 +2564,18 @@ def test_to_pandas(self):
25642564
self.assertEquals(types[2], np.bool)
25652565
self.assertEquals(types[3], np.float32)
25662566

2567+
@unittest.skipIf(not _have_pandas, "Pandas not installed")
2568+
def test_to_pandas_avoid_astype(self):
2569+
import numpy as np
2570+
schema = StructType().add("a", IntegerType()).add("b", StringType())\
2571+
.add("c", IntegerType())
2572+
data = [(1, "foo", 16777220), (None, "bar", None)]
2573+
df = self.spark.createDataFrame(data, schema)
2574+
types = df.toPandas().dtypes
2575+
self.assertEquals(types[0], np.float64) # doesn't convert to np.int32 due to NaN value.
2576+
self.assertEquals(types[1], np.object)
2577+
self.assertEquals(types[2], np.float64)
2578+
25672579
def test_create_dataframe_from_array_of_long(self):
25682580
import array
25692581
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))]

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ public void loadBytes(ColumnVector.Array array) {
515515
// Split out the slow path.
516516
@Override
517517
protected void reserveInternal(int newCapacity) {
518-
int oldCapacity = (this.data == 0L) ? 0 : capacity;
518+
int oldCapacity = (nulls == 0L) ? 0 : capacity;
519519
if (this.resultArray != null) {
520520
this.lengthData =
521521
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
198198
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
199199
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
200200
}
201+
202+
test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
203+
val arrayType = ArrayType(IntegerType, true)
204+
testVector = new OffHeapColumnVector(8, arrayType)
205+
206+
val data = testVector.arrayData()
207+
(0 until 8).foreach(i => data.putInt(i, i))
208+
(0 until 8).foreach(i => testVector.putArray(i, i, 1))
209+
210+
// Increase vector's capacity and reallocate the data to new bigger buffers.
211+
testVector.reserve(16)
212+
213+
// Check that none of the values got lost/overwritten.
214+
val array = new ColumnVector.Array(testVector)
215+
(0 until 8).foreach { i =>
216+
assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
217+
}
218+
}
219+
220+
test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") {
221+
val structType = new StructType().add("int", IntegerType).add("double", DoubleType)
222+
testVector = new OffHeapColumnVector(8, structType)
223+
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
224+
testVector.reserve(16)
225+
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
226+
}
201227
}

0 commit comments

Comments
 (0)