Skip to content

Commit 64817c4

Browse files
ueshincloud-fan
authored andcommitted
[SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp values for Pandas to respect session timezone
## What changes were proposed in this pull request? When converting Pandas DataFrame/Series from/to Spark DataFrame using `toPandas()` or pandas udfs, timestamp values behave to respect Python system timezone instead of session timezone. For example, let's say we use `"America/Los_Angeles"` as session timezone and have a timestamp value `"1970-01-01 00:00:01"` in the timezone. Btw, I'm in Japan so Python timezone would be `"Asia/Tokyo"`. The timestamp value from current `toPandas()` will be the following: ``` >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") >>> df = spark.createDataFrame([28801], "long").selectExpr("timestamp(value) as ts") >>> df.show() +-------------------+ | ts| +-------------------+ |1970-01-01 00:00:01| +-------------------+ >>> df.toPandas() ts 0 1970-01-01 17:00:01 ``` As you can see, the value becomes `"1970-01-01 17:00:01"` because it respects Python timezone. As we discussed in #18664, we consider this behavior is a bug and the value should be `"1970-01-01 00:00:01"`. ## How was this patch tested? Added tests and existing tests. Author: Takuya UESHIN <[email protected]> Closes #19607 from ueshin/issues/SPARK-22395.
1 parent 33d43bf commit 64817c4

File tree

12 files changed

+371
-53
lines changed

12 files changed

+371
-53
lines changed

docs/sql-programming-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1716,6 +1716,8 @@ options.
17161716
</table>
17171717

17181718
Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
1719+
- In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
1720+
- In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
17191721

17201722
## Upgrading From Spark SQL 2.1 to 2.2
17211723

python/pyspark/serializers.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,12 @@ def __repr__(self):
206206
return "ArrowSerializer"
207207

208208

209-
def _create_batch(series):
209+
def _create_batch(series, timezone):
210210
"""
211211
Create an Arrow record batch from the given pandas.Series or list of Series, with optional type.
212212
213213
:param series: A single pandas.Series, list of Series, or list of (series, arrow_type)
214+
:param timezone: A timezone to respect when handling timestamp values
214215
:return: Arrow RecordBatch
215216
"""
216217

@@ -227,7 +228,7 @@ def _create_batch(series):
227228
def cast_series(s, t):
228229
if type(t) == pa.TimestampType:
229230
# NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680
230-
return _check_series_convert_timestamps_internal(s.fillna(0))\
231+
return _check_series_convert_timestamps_internal(s.fillna(0), timezone)\
231232
.values.astype('datetime64[us]', copy=False)
232233
# NOTE: can not compare None with pyarrow.DataType(), fixed with Arrow >= 0.7.1
233234
elif t is not None and t == pa.date32():
@@ -253,6 +254,10 @@ class ArrowStreamPandasSerializer(Serializer):
253254
Serializes Pandas.Series as Arrow data with Arrow streaming format.
254255
"""
255256

257+
def __init__(self, timezone):
258+
super(ArrowStreamPandasSerializer, self).__init__()
259+
self._timezone = timezone
260+
256261
def dump_stream(self, iterator, stream):
257262
"""
258263
Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or
@@ -262,7 +267,7 @@ def dump_stream(self, iterator, stream):
262267
writer = None
263268
try:
264269
for series in iterator:
265-
batch = _create_batch(series)
270+
batch = _create_batch(series, self._timezone)
266271
if writer is None:
267272
write_int(SpecialLengths.START_ARROW_STREAM, stream)
268273
writer = pa.RecordBatchStreamWriter(stream, batch.schema)
@@ -280,7 +285,7 @@ def load_stream(self, stream):
280285
reader = pa.open_stream(stream)
281286
for batch in reader:
282287
# NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
283-
pdf = _check_dataframe_localize_timestamps(batch.to_pandas())
288+
pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
284289
yield [c for _, c in pdf.iteritems()]
285290

286291
def __repr__(self):

python/pyspark/sql/dataframe.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from pyspark.sql.streaming import DataStreamWriter
4040
from pyspark.sql.types import IntegralType
4141
from pyspark.sql.types import *
42+
from pyspark.util import _exception_message
4243

4344
__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
4445

@@ -1881,6 +1882,13 @@ def toPandas(self):
18811882
1 5 Bob
18821883
"""
18831884
import pandas as pd
1885+
1886+
if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
1887+
== "true":
1888+
timezone = self.sql_ctx.getConf("spark.sql.session.timeZone")
1889+
else:
1890+
timezone = None
1891+
18841892
if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
18851893
try:
18861894
from pyspark.sql.types import _check_dataframe_localize_timestamps
@@ -1889,13 +1897,13 @@ def toPandas(self):
18891897
if tables:
18901898
table = pyarrow.concat_tables(tables)
18911899
pdf = table.to_pandas()
1892-
return _check_dataframe_localize_timestamps(pdf)
1900+
return _check_dataframe_localize_timestamps(pdf, timezone)
18931901
else:
18941902
return pd.DataFrame.from_records([], columns=self.columns)
18951903
except ImportError as e:
18961904
msg = "note: pyarrow must be installed and available on calling Python process " \
18971905
"if using spark.sql.execution.arrow.enabled=true"
1898-
raise ImportError("%s\n%s" % (e.message, msg))
1906+
raise ImportError("%s\n%s" % (_exception_message(e), msg))
18991907
else:
19001908
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
19011909

@@ -1913,7 +1921,17 @@ def toPandas(self):
19131921

19141922
for f, t in dtype.items():
19151923
pdf[f] = pdf[f].astype(t, copy=False)
1916-
return pdf
1924+
1925+
if timezone is None:
1926+
return pdf
1927+
else:
1928+
from pyspark.sql.types import _check_series_convert_timestamps_local_tz
1929+
for field in self.schema:
1930+
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
1931+
if isinstance(field.dataType, TimestampType):
1932+
pdf[field.name] = \
1933+
_check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
1934+
return pdf
19171935

19181936
def _collectAsArrow(self):
19191937
"""

python/pyspark/sql/session.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
from pyspark.sql.dataframe import DataFrame
3535
from pyspark.sql.readwriter import DataFrameReader
3636
from pyspark.sql.streaming import DataStreamReader
37-
from pyspark.sql.types import Row, DataType, StringType, StructType, _make_type_verifier, \
38-
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
37+
from pyspark.sql.types import Row, DataType, StringType, StructType, TimestampType, \
38+
_make_type_verifier, _infer_schema, _has_nulltype, _merge_type, _create_converter, \
39+
_parse_datatype_string
3940
from pyspark.sql.utils import install_exception_handler
4041

4142
__all__ = ["SparkSession"]
@@ -444,11 +445,34 @@ def _get_numpy_record_dtype(self, rec):
444445
record_type_list.append((str(col_names[i]), curr_type))
445446
return np.dtype(record_type_list) if has_rec_fix else None
446447

447-
def _convert_from_pandas(self, pdf):
448+
def _convert_from_pandas(self, pdf, schema, timezone):
448449
"""
449450
Convert a pandas.DataFrame to list of records that can be used to make a DataFrame
450451
:return list of records
451452
"""
453+
if timezone is not None:
454+
from pyspark.sql.types import _check_series_convert_timestamps_tz_local
455+
copied = False
456+
if isinstance(schema, StructType):
457+
for field in schema:
458+
# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
459+
if isinstance(field.dataType, TimestampType):
460+
s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
461+
if not copied and s is not pdf[field.name]:
462+
# Copy once if the series is modified to prevent the original Pandas
463+
# DataFrame from being updated
464+
pdf = pdf.copy()
465+
copied = True
466+
pdf[field.name] = s
467+
else:
468+
for column, series in pdf.iteritems():
469+
s = _check_series_convert_timestamps_tz_local(pdf[column], timezone)
470+
if not copied and s is not pdf[column]:
471+
# Copy once if the series is modified to prevent the original Pandas
472+
# DataFrame from being updated
473+
pdf = pdf.copy()
474+
copied = True
475+
pdf[column] = s
452476

453477
# Convert pandas.DataFrame to list of numpy records
454478
np_records = pdf.to_records(index=False)
@@ -462,15 +486,19 @@ def _convert_from_pandas(self, pdf):
462486
# Convert list of numpy records to python lists
463487
return [r.tolist() for r in np_records]
464488

465-
def _create_from_pandas_with_arrow(self, pdf, schema):
489+
def _create_from_pandas_with_arrow(self, pdf, schema, timezone):
466490
"""
467491
Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting
468492
to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the
469493
data types will be used to coerce the data in Pandas to Arrow conversion.
470494
"""
471495
from pyspark.serializers import ArrowSerializer, _create_batch
472-
from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType
473-
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
496+
from pyspark.sql.types import from_arrow_schema, to_arrow_type, \
497+
_old_pandas_exception_message, TimestampType
498+
try:
499+
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
500+
except ImportError as e:
501+
raise ImportError(_old_pandas_exception_message(e))
474502

475503
# Determine arrow types to coerce data when creating batches
476504
if isinstance(schema, StructType):
@@ -488,7 +516,8 @@ def _create_from_pandas_with_arrow(self, pdf, schema):
488516
pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
489517

490518
# Create Arrow record batches
491-
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)])
519+
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)],
520+
timezone)
492521
for pdf_slice in pdf_slices]
493522

494523
# Create the Spark schema from the first Arrow batch (always at least 1 batch after slicing)
@@ -606,6 +635,11 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
606635
except Exception:
607636
has_pandas = False
608637
if has_pandas and isinstance(data, pandas.DataFrame):
638+
if self.conf.get("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
639+
== "true":
640+
timezone = self.conf.get("spark.sql.session.timeZone")
641+
else:
642+
timezone = None
609643

610644
# If no schema supplied by user then get the names of columns only
611645
if schema is None:
@@ -614,11 +648,11 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
614648
if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \
615649
and len(data) > 0:
616650
try:
617-
return self._create_from_pandas_with_arrow(data, schema)
651+
return self._create_from_pandas_with_arrow(data, schema, timezone)
618652
except Exception as e:
619653
warnings.warn("Arrow will not be used in createDataFrame: %s" % str(e))
620654
# Fallback to create DataFrame without arrow if raise some exception
621-
data = self._convert_from_pandas(data)
655+
data = self._convert_from_pandas(data, schema, timezone)
622656

623657
if isinstance(schema, StructType):
624658
verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True

0 commit comments

Comments
 (0)