Skip to content

Commit c97fdcc

Browse files
VVBondarenkorozza
andcommitted
Add support for spark.sql.datetime.java8API.enabled
Motivation: when using connector with thrift server, any queries on data containing date/time types are crashing. It's caused by thrift server enabling `spark.sql.datetime.java8API.enabled` flag for all of its sessions. Supported JSR-310 types which are already supported by Spark SQL: - LocalDate - LocalDateTime - Instant Other types are causing Encoder to fail, so they won't work even if taken into account. As `spark.sql.datetime.java8API.enabled` is disabled by default, it shouldn't cause compatibility issues. Adds support for the Spark 3.4 TimeTimestampNTZType SPARK-453 Original PR: #125 --------- Co-authored-by: Ross Lawley <[email protected]>
1 parent 8aacd6c commit c97fdcc

File tree

5 files changed

+119
-19
lines changed

5 files changed

+119
-19
lines changed

src/integrationTest/java/com/mongodb/spark/sql/connector/RoundTripTest.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
package com.mongodb.spark.sql.connector;
1919

2020
import static com.mongodb.spark.sql.connector.mongodb.MongoSparkConnectorHelper.CATALOG;
21+
import static com.mongodb.spark.sql.connector.schema.ConverterHelper.TIMESTAMP_NTZ_TYPE;
22+
import static java.time.ZoneOffset.UTC;
2123
import static java.util.Arrays.asList;
2224
import static java.util.Collections.singletonList;
2325
import static org.junit.jupiter.api.Assertions.assertEquals;
2426
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
27+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
2528

2629
import com.mongodb.spark.sql.connector.beans.BoxedBean;
2730
import com.mongodb.spark.sql.connector.beans.ComplexBean;
@@ -34,7 +37,7 @@
3437
import java.sql.Timestamp;
3538
import java.time.Instant;
3639
import java.time.LocalDate;
37-
import java.time.ZoneOffset;
40+
import java.time.LocalDateTime;
3841
import java.util.HashMap;
3942
import java.util.List;
4043
import java.util.Map;
@@ -48,6 +51,7 @@
4851
import org.junit.jupiter.api.Test;
4952
import org.junit.jupiter.params.ParameterizedTest;
5053
import org.junit.jupiter.params.provider.EnumSource;
54+
import org.junit.jupiter.params.provider.ValueSource;
5155

5256
public class RoundTripTest extends MongoSparkConnectorTestCase {
5357

@@ -105,24 +109,29 @@ void testBoxedBean(final TruncateMode mode) {
105109
assertIterableEquals(dataSetOriginal, dataSetMongo);
106110
}
107111

108-
@Test
109-
void testDateTimeBean() {
112+
@ParameterizedTest()
113+
@ValueSource(strings = {"true", "false"})
114+
void testDateTimeBean(final String java8DateTimeAPI) {
115+
assumeTrue(TIMESTAMP_NTZ_TYPE != null);
110116
TimeZone original = TimeZone.getDefault();
111117
try {
112-
TimeZone.setDefault(TimeZone.getTimeZone(ZoneOffset.UTC));
118+
TimeZone.setDefault(TimeZone.getTimeZone(UTC));
113119

114120
// Given
115121
long oneHour = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
116122
long oneDay = oneHour * 24;
117123

124+
Instant epoch = Instant.EPOCH;
118125
List<DateTimeBean> dataSetOriginal = singletonList(new DateTimeBean(
119126
new Date(oneDay * 365),
120127
new Timestamp(oneDay + oneHour),
121128
LocalDate.of(2000, 1, 1),
122-
Instant.EPOCH));
129+
epoch,
130+
LocalDateTime.ofInstant(epoch, UTC)));
123131

124132
// when
125-
SparkSession spark = getOrCreateSparkSession();
133+
SparkSession spark = getOrCreateSparkSession(
134+
getSparkConf().set("spark.sql.datetime.java8API.enabled", java8DateTimeAPI));
126135
Encoder<DateTimeBean> encoder = Encoders.bean(DateTimeBean.class);
127136

128137
Dataset<DateTimeBean> dataset = spark.createDataset(dataSetOriginal, encoder);

src/integrationTest/java/com/mongodb/spark/sql/connector/beans/DateTimeBean.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,28 @@
2121
import java.sql.Timestamp;
2222
import java.time.Instant;
2323
import java.time.LocalDate;
24+
import java.time.LocalDateTime;
2425
import java.util.Objects;
2526

2627
public class DateTimeBean implements Serializable {
2728
private java.sql.Date sqlDate;
2829
private java.sql.Timestamp sqlTimestamp;
2930
private java.time.LocalDate localDate;
3031
private java.time.Instant instant;
32+
private java.time.LocalDateTime localDateTime;
3133

3234
public DateTimeBean() {}
3335

3436
public DateTimeBean(
3537
final Date sqlDate,
3638
final Timestamp sqlTimestamp,
3739
final LocalDate localDate,
38-
final Instant instant) {
40+
final Instant instant,
41+
final LocalDateTime localDateTime) {
3942
this.sqlDate = sqlDate;
4043
this.sqlTimestamp = sqlTimestamp;
4144
this.localDate = localDate;
45+
this.localDateTime = localDateTime;
4246
this.instant = instant;
4347
}
4448

@@ -66,6 +70,14 @@ public void setLocalDate(final LocalDate localDate) {
6670
this.localDate = localDate;
6771
}
6872

73+
public LocalDateTime getLocalDateTime() {
74+
return localDateTime;
75+
}
76+
77+
public void setLocalDateTime(final LocalDateTime localDateTime) {
78+
this.localDateTime = localDateTime;
79+
}
80+
6981
public Instant getInstant() {
7082
return instant;
7183
}
@@ -86,20 +98,28 @@ public boolean equals(final Object o) {
8698
return Objects.equals(sqlDate, that.sqlDate)
8799
&& Objects.equals(sqlTimestamp, that.sqlTimestamp)
88100
&& Objects.equals(localDate, that.localDate)
101+
&& Objects.equals(localDateTime, that.localDateTime)
89102
&& Objects.equals(instant, that.instant);
90103
}
91104

92105
@Override
93106
public int hashCode() {
94-
return Objects.hash(sqlDate, sqlTimestamp, localDate, instant);
107+
return Objects.hash(sqlDate, sqlTimestamp, localDate, localDateTime, instant);
95108
}
96109

97110
@Override
98111
public String toString() {
99-
return "DateTimeBean{" + "sqlDate="
100-
+ sqlDate + ", sqlTimestamp="
101-
+ sqlTimestamp + ", localDate="
102-
+ localDate + ", instant="
103-
+ instant + '}';
112+
return "DateTimeBean{"
113+
+ "sqlDate="
114+
+ sqlDate
115+
+ ", sqlTimestamp="
116+
+ sqlTimestamp
117+
+ ", localDate="
118+
+ localDate
119+
+ ", localDateTime="
120+
+ localDateTime
121+
+ ", instant="
122+
+ instant
123+
+ '}';
104124
}
105125
}

src/main/java/com/mongodb/spark/sql/connector/schema/BsonDocumentToRowConverter.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.mongodb.spark.sql.connector.schema.ConverterHelper.BSON_VALUE_CODEC;
2121
import static com.mongodb.spark.sql.connector.schema.ConverterHelper.getJsonWriterSettings;
22+
import static com.mongodb.spark.sql.connector.schema.ConverterHelper.isTimestampNTZ;
2223
import static com.mongodb.spark.sql.connector.schema.ConverterHelper.toJson;
2324
import static java.lang.String.format;
2425
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -44,6 +45,7 @@
4445
import org.apache.spark.sql.Row;
4546
import org.apache.spark.sql.catalyst.InternalRow;
4647
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
48+
import org.apache.spark.sql.internal.SQLConf;
4749
import org.apache.spark.sql.types.ArrayType;
4850
import org.apache.spark.sql.types.BinaryType;
4951
import org.apache.spark.sql.types.BooleanType;
@@ -76,6 +78,8 @@
7678
import org.bson.types.Decimal128;
7779
import org.jetbrains.annotations.NotNull;
7880
import org.jetbrains.annotations.VisibleForTesting;
81+
import org.slf4j.Logger;
82+
import org.slf4j.LoggerFactory;
7983

8084
/**
8185
* The helper for conversion of BsonDocuments to GenericRowWithSchema instances.
@@ -89,13 +93,15 @@
8993
@NotNull
9094
public final class BsonDocumentToRowConverter implements Serializable {
9195
private static final long serialVersionUID = 1L;
96+
private static final Logger LOGGER = LoggerFactory.getLogger(BsonDocumentToRowConverter.class);
9297
private final Function<Row, InternalRow> rowToInternalRowFunction;
9398
private final StructType schema;
9499
private final boolean outputExtendedJson;
95100
private final boolean isPermissive;
96101
private final boolean dropMalformed;
97102
private final String columnNameOfCorruptRecord;
98103
private final boolean schemaContainsCorruptRecordColumn;
104+
private final boolean dataTimeJava8APIEnabled;
99105

100106
private boolean corruptedRecord;
101107

@@ -114,6 +120,7 @@ public BsonDocumentToRowConverter(final StructType originalSchema, final ReadCon
114120
this.columnNameOfCorruptRecord = readConfig.getColumnNameOfCorruptRecord();
115121
this.schemaContainsCorruptRecordColumn = !columnNameOfCorruptRecord.isEmpty()
116122
&& Arrays.asList(schema.fieldNames()).contains(columnNameOfCorruptRecord);
123+
this.dataTimeJava8APIEnabled = SQLConf.get().datetimeJava8ApiEnabled();
117124
}
118125

119126
/** @return the schema for the converter */
@@ -165,6 +172,7 @@ GenericRowWithSchema toRow(final BsonDocument bsonDocument) {
165172
@VisibleForTesting
166173
Object convertBsonValue(
167174
final String fieldName, final DataType dataType, final BsonValue bsonValue) {
175+
LOGGER.info("converting bson to value: {} {} {}", fieldName, dataType, bsonValue);
168176
try {
169177
if (bsonValue.isNull()) {
170178
return null;
@@ -179,9 +187,22 @@ Object convertBsonValue(
179187
} else if (dataType instanceof BooleanType) {
180188
return convertToBoolean(fieldName, dataType, bsonValue);
181189
} else if (dataType instanceof DateType) {
182-
return convertToDate(fieldName, dataType, bsonValue);
190+
Date date = convertToDate(fieldName, dataType, bsonValue);
191+
if (dataTimeJava8APIEnabled) {
192+
return date.toLocalDate();
193+
} else {
194+
return date;
195+
}
183196
} else if (dataType instanceof TimestampType) {
184-
return convertToTimestamp(fieldName, dataType, bsonValue);
197+
Timestamp timestamp = convertToTimestamp(fieldName, dataType, bsonValue);
198+
if (dataTimeJava8APIEnabled) {
199+
return timestamp.toInstant();
200+
} else {
201+
return timestamp;
202+
}
203+
} else if (isTimestampNTZ(dataType)) {
204+
Timestamp timestamp = convertToTimestamp(fieldName, dataType, bsonValue);
205+
return timestamp.toLocalDateTime();
185206
} else if (dataType instanceof FloatType) {
186207
return convertToFloat(fieldName, dataType, bsonValue);
187208
} else if (dataType instanceof IntegerType) {

src/main/java/com/mongodb/spark/sql/connector/schema/ConverterHelper.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.time.ZonedDateTime;
2323
import java.time.format.DateTimeFormatter;
2424
import java.util.Base64;
25+
import org.apache.spark.sql.types.DataType;
26+
import org.apache.spark.sql.types.DataTypes;
2527
import org.bson.BsonDocument;
2628
import org.bson.BsonValue;
2729
import org.bson.codecs.BsonValueCodec;
@@ -39,6 +41,29 @@ static JsonWriterSettings getJsonWriterSettings(final boolean outputExtendedJson
3941
return outputExtendedJson ? EXTENDED_JSON_WRITER_SETTINGS : RELAXED_JSON_WRITER_SETTINGS;
4042
}
4143

44+
/**
45+
* The {{TimestampNTZType}} if available or null
46+
*
47+
* <p>Only available in Spark 3.4+
48+
* <p>TODO: SPARK-450 remove code for Spark 4.0
49+
*/
50+
public static final DataType TIMESTAMP_NTZ_TYPE;
51+
52+
static {
53+
DataType timestampNTZType;
54+
try {
55+
timestampNTZType =
56+
(DataType) DataTypes.class.getDeclaredField("TimestampNTZType").get(DataType.class);
57+
} catch (IllegalAccessException | NoSuchFieldException e) {
58+
timestampNTZType = null;
59+
}
60+
TIMESTAMP_NTZ_TYPE = timestampNTZType;
61+
}
62+
63+
static boolean isTimestampNTZ(final DataType dataType) {
64+
return TIMESTAMP_NTZ_TYPE != null && TIMESTAMP_NTZ_TYPE.acceptsType(dataType);
65+
}
66+
4267
private static final JsonWriterSettings RELAXED_JSON_WRITER_SETTINGS =
4368
JsonWriterSettings.builder()
4469
.outputMode(JsonMode.RELAXED)

src/main/java/com/mongodb/spark/sql/connector/schema/RowToBsonDocumentConverter.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.mongodb.spark.sql.connector.schema;
1919

20+
import static com.mongodb.spark.sql.connector.schema.ConverterHelper.isTimestampNTZ;
2021
import static java.lang.String.format;
2122
import static java.util.Arrays.asList;
2223

@@ -26,6 +27,10 @@
2627
import com.mongodb.spark.sql.connector.interop.JavaScala;
2728
import java.io.Serializable;
2829
import java.math.BigDecimal;
30+
import java.sql.Timestamp;
31+
import java.time.Instant;
32+
import java.time.LocalDate;
33+
import java.time.LocalDateTime;
2934
import java.util.Arrays;
3035
import java.util.Date;
3136
import java.util.List;
@@ -137,9 +142,11 @@ public static ObjectToBsonValue createObjectToBsonValue(
137142
try {
138143
return cachedObjectToBsonValue.apply(data);
139144
} catch (Exception e) {
140-
throw new DataException(format(
141-
"Cannot cast %s into a BsonValue. %s has no matching BsonValue. Error: %s",
142-
data, dataType, e.getMessage()));
145+
throw new DataException(
146+
format(
147+
"Cannot cast %s into a BsonValue. %s has no matching BsonValue. Error: %s",
148+
data, dataType, e.getMessage()),
149+
e);
143150
}
144151
};
145152
}
@@ -177,12 +184,30 @@ private static ObjectToBsonValue objectToBsonValue(
177184
} else if (DataTypes.StringType.acceptsType(dataType)) {
178185
return (data) -> processString((String) data, convertJson);
179186
} else if (DataTypes.DateType.acceptsType(dataType)
180-
|| DataTypes.TimestampType.acceptsType(dataType)) {
187+
|| DataTypes.TimestampType.acceptsType(dataType)
188+
|| isTimestampNTZ(dataType)) {
181189
return (data) -> {
182190
if (data instanceof Date) {
183191
// Covers java.util.Date, java.sql.Date, java.sql.Timestamp
184192
return new BsonDateTime(((Date) data).getTime());
185193
}
194+
if (data instanceof Instant) {
195+
return new BsonDateTime(((Instant) data).toEpochMilli());
196+
}
197+
if (data instanceof LocalDateTime) {
198+
LocalDateTime dateTime = (LocalDateTime) data;
199+
return new BsonDateTime(Timestamp.valueOf(dateTime).getTime());
200+
}
201+
if (data instanceof LocalDate) {
202+
long epochSeconds = ((LocalDate) data).toEpochDay() * 24L * 3600L;
203+
return new BsonDateTime(epochSeconds * 1000L);
204+
}
205+
206+
/*
207+
NOTE 1: ZonedDateTime, OffsetDateTime, OffsetTime are not explicitly supported by Spark and cause the Encoder resolver to fail
208+
due to cyclic dependency in the ZoneOffset. Subject for review after it changes (if ever).
209+
NOTE 2: LocalTime type is not represented neither in Bson nor in Spark
210+
*/
186211
throw new MongoSparkException(
187212
"Unsupported date type: " + data.getClass().getSimpleName());
188213
};

0 commit comments

Comments
 (0)