Skip to content

Commit dcd5aae

Browse files
authored
Merge pull request apache-spark-on-k8s#393 from palantir/dv/upstream
2 parents d68b3af + ab0e4e7 commit dcd5aae

File tree

16 files changed

+288
-87
lines changed

16 files changed

+288
-87
lines changed

dev/deps/spark-deps-hadoop-palantir

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ arrow-format-0.8.0.jar
1313
arrow-memory-0.8.0.jar
1414
arrow-vector-0.8.0.jar
1515
automaton-1.11-8.jar
16-
avro-1.8.1.jar
17-
avro-ipc-1.8.1.jar
18-
avro-mapred-1.8.1-hadoop2.jar
16+
avro-1.8.2.jar
17+
avro-ipc-1.8.2.jar
18+
avro-mapred-1.8.2-hadoop2.jar
1919
aws-java-sdk-core-1.11.45.jar
2020
aws-java-sdk-kms-1.11.45.jar
2121
aws-java-sdk-s3-1.11.45.jar

docs/sql-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,8 +1345,8 @@ the following case-insensitive options:
13451345
These options must all be specified if any of them is specified. In addition,
13461346
<code>numPartitions</code> must be specified. They describe how to partition the table when
13471347
reading in parallel from multiple workers.
1348-
<code>partitionColumn</code> must be a numeric column from the table in question. Notice
1349-
that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
1348+
<code>partitionColumn</code> must be a numeric, date, or timestamp column from the table in question.
1349+
Notice that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
13501350
partition stride, not for filtering the rows in table. So all rows in the table will be
13511351
partitioned and returned. This option applies only to reading.
13521352
</td>

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.avro
1919

2020
import java.io._
2121
import java.net.URI
22-
import java.util.zip.Deflater
2322

2423
import scala.util.control.NonFatal
2524

@@ -31,18 +30,18 @@ import org.apache.avro.mapreduce.AvroJob
3130
import org.apache.hadoop.conf.Configuration
3231
import org.apache.hadoop.fs.{FileStatus, Path}
3332
import org.apache.hadoop.mapreduce.Job
34-
import org.slf4j.LoggerFactory
3533

3634
import org.apache.spark.TaskContext
35+
import org.apache.spark.internal.Logging
3736
import org.apache.spark.sql.SparkSession
3837
import org.apache.spark.sql.catalyst.InternalRow
3938
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
4039
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
4140
import org.apache.spark.sql.types.StructType
4241
import org.apache.spark.util.SerializableConfiguration
4342

44-
private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
45-
private val log = LoggerFactory.getLogger(getClass)
43+
private[avro] class AvroFileFormat extends FileFormat
44+
with DataSourceRegister with Logging with Serializable {
4645

4746
override def equals(other: Any): Boolean = other match {
4847
case _: AvroFileFormat => true
@@ -121,23 +120,23 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
121120

122121
parsedOptions.compression match {
123122
case "uncompressed" =>
124-
log.info("writing uncompressed Avro records")
123+
logInfo("writing uncompressed Avro records")
125124
job.getConfiguration.setBoolean(COMPRESS_KEY, false)
126125

127126
case "snappy" =>
128-
log.info("compressing Avro output using Snappy")
127+
logInfo("compressing Avro output using Snappy")
129128
job.getConfiguration.setBoolean(COMPRESS_KEY, true)
130129
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
131130

132131
case "deflate" =>
133132
val deflateLevel = spark.sessionState.conf.avroDeflateLevel
134-
log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
133+
logInfo(s"compressing Avro output using deflate (level=$deflateLevel)")
135134
job.getConfiguration.setBoolean(COMPRESS_KEY, true)
136135
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
137136
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
138137

139138
case unknown: String =>
140-
log.error(s"unsupported compression codec $unknown")
139+
logError(s"unsupported compression codec $unknown")
141140
}
142141

143142
new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
@@ -157,7 +156,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
157156
val parsedOptions = new AvroOptions(options, hadoopConf)
158157

159158
(file: PartitionedFile) => {
160-
val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
161159
val conf = broadcastedConf.value.value
162160
val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)
163161

@@ -176,7 +174,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
176174
DataFileReader.openReader(in, datumReader)
177175
} catch {
178176
case NonFatal(e) =>
179-
log.error("Exception while opening DataFileReader", e)
177+
logError("Exception while opening DataFileReader", e)
180178
in.close()
181179
throw e
182180
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.spark.sql.jdbc
1919

20+
import java.math.BigDecimal
2021
import java.sql.{Connection, Date, Timestamp}
2122
import java.util.{Properties, TimeZone}
22-
import java.math.BigDecimal
2323

24-
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode}
24+
import org.apache.spark.sql.{Row, SaveMode}
2525
import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec}
26+
import org.apache.spark.sql.execution.datasources.LogicalRelation
27+
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCRelation}
2628
import org.apache.spark.sql.internal.SQLConf
2729
import org.apache.spark.sql.test.SharedSQLContext
2830
import org.apache.spark.sql.types._
@@ -86,7 +88,8 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
8688
conn.prepareStatement(
8789
"CREATE TABLE tableWithCustomSchema (id NUMBER, n1 NUMBER(1), n2 NUMBER(1))").executeUpdate()
8890
conn.prepareStatement(
89-
"INSERT INTO tableWithCustomSchema values(12312321321321312312312312123, 1, 0)").executeUpdate()
91+
"INSERT INTO tableWithCustomSchema values(12312321321321312312312312123, 1, 0)")
92+
.executeUpdate()
9093
conn.commit()
9194

9295
sql(
@@ -108,15 +111,36 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
108111
""".stripMargin.replaceAll("\n", " "))
109112

110113

111-
conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))").executeUpdate()
114+
conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))")
115+
.executeUpdate()
112116
conn.prepareStatement(
113117
"INSERT INTO numerics VALUES (4, 1.23, 9999999999)").executeUpdate()
114118
conn.commit()
115119

116-
conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)").executeUpdate()
120+
conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)")
121+
.executeUpdate()
117122
conn.commit()
118-
}
119123

124+
conn.prepareStatement("CREATE TABLE datetimePartitionTest (id NUMBER(10), d DATE, t TIMESTAMP)")
125+
.executeUpdate()
126+
conn.prepareStatement(
127+
"""INSERT INTO datetimePartitionTest VALUES
128+
|(1, {d '2018-07-06'}, {ts '2018-07-06 05:50:00'})
129+
""".stripMargin.replaceAll("\n", " ")).executeUpdate()
130+
conn.prepareStatement(
131+
"""INSERT INTO datetimePartitionTest VALUES
132+
|(2, {d '2018-07-06'}, {ts '2018-07-06 08:10:08'})
133+
""".stripMargin.replaceAll("\n", " ")).executeUpdate()
134+
conn.prepareStatement(
135+
"""INSERT INTO datetimePartitionTest VALUES
136+
|(3, {d '2018-07-08'}, {ts '2018-07-08 13:32:01'})
137+
""".stripMargin.replaceAll("\n", " ")).executeUpdate()
138+
conn.prepareStatement(
139+
"""INSERT INTO datetimePartitionTest VALUES
140+
|(4, {d '2018-07-12'}, {ts '2018-07-12 09:51:15'})
141+
""".stripMargin.replaceAll("\n", " ")).executeUpdate()
142+
conn.commit()
143+
}
120144

121145
test("SPARK-16625 : Importing Oracle numeric types") {
122146
val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties)
@@ -399,4 +423,54 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
399423
assert(values.getDouble(0) === 1.1)
400424
assert(values.getFloat(1) === 2.2f)
401425
}
426+
427+
test("SPARK-22814 support date/timestamp types in partitionColumn") {
428+
val expectedResult = Set(
429+
(1, "2018-07-06", "2018-07-06 05:50:00"),
430+
(2, "2018-07-06", "2018-07-06 08:10:08"),
431+
(3, "2018-07-08", "2018-07-08 13:32:01"),
432+
(4, "2018-07-12", "2018-07-12 09:51:15")
433+
).map { case (id, date, timestamp) =>
434+
Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp))
435+
}
436+
437+
// DateType partition column
438+
val df1 = spark.read.format("jdbc")
439+
.option("url", jdbcUrl)
440+
.option("dbtable", "datetimePartitionTest")
441+
.option("partitionColumn", "d")
442+
.option("lowerBound", "2018-07-06")
443+
.option("upperBound", "2018-07-20")
444+
.option("numPartitions", 3)
445+
.load()
446+
447+
df1.logicalPlan match {
448+
case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
449+
val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
450+
assert(whereClauses === Set(
451+
""""D" < '2018-07-10' or "D" is null""",
452+
""""D" >= '2018-07-10' AND "D" < '2018-07-14'""",
453+
""""D" >= '2018-07-14'"""))
454+
}
455+
assert(df1.collect.toSet === expectedResult)
456+
457+
// TimestampType partition column
458+
val df2 = spark.read.format("jdbc")
459+
.option("url", jdbcUrl)
460+
.option("dbtable", "datetimePartitionTest")
461+
.option("partitionColumn", "t")
462+
.option("lowerBound", "2018-07-04 03:30:00.0")
463+
.option("upperBound", "2018-07-27 14:11:05.0")
464+
.option("numPartitions", 2)
465+
.load()
466+
467+
df2.logicalPlan match {
468+
case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
469+
val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
470+
assert(whereClauses === Set(
471+
""""T" < '2018-07-15 20:50:32.5' or "T" is null""",
472+
""""T" >= '2018-07-15 20:50:32.5'"""))
473+
}
474+
assert(df2.collect.toSet === expectedResult)
475+
}
402476
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@
148148
<codahale.metrics.version>3.2.5</codahale.metrics.version>
149149
<dropwizard.version>1.0.0</dropwizard.version>
150150
<spark-influx-sink.version>0.4.0</spark-influx-sink.version>
151-
<avro.version>1.8.1</avro.version>
151+
<avro.version>1.8.2</avro.version>
152152
<avro.mapred.classifier>hadoop2</avro.mapred.classifier>
153153
<jets3t.version>0.9.4</jets3t.version>
154154
<aws.kinesis.client.version>1.7.3</aws.kinesis.client.version>

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
6161
.withLabels(labels.asJava)
6262
.endMetadata()
6363
.withNewSpec()
64-
.withServiceAccountName("default")
6564
.addNewContainer()
6665
.withName("spark-example")
6766
.withImage(image)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ object DecimalPrecision extends TypeCoercionRule {
8989
}
9090

9191
/** Decimal precision promotion for +, -, *, /, %, pmod, and binary comparison. */
92-
private val decimalAndDecimal: PartialFunction[Expression, Expression] = {
92+
private[catalyst] val decimalAndDecimal: PartialFunction[Expression, Expression] = {
9393
// Skip nodes whose children have not been resolved yet
9494
case e if !e.childrenResolved => e
9595

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.aggregate
1919

20-
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
20+
import org.apache.spark.sql.catalyst.analysis.{DecimalPrecision, TypeCheckResult}
2121
import org.apache.spark.sql.catalyst.dsl.expressions._
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.util.TypeUtils
@@ -57,10 +57,9 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate {
5757

5858
// If all input are nulls, count will be 0 and we will get null after the division.
5959
override lazy val evaluateExpression = child.dataType match {
60-
case DecimalType.Fixed(p, s) =>
61-
// increase the precision and scale to prevent precision loss
62-
val dt = DecimalType.bounded(p + 14, s + 4)
63-
Cast(Cast(sum, dt) / Cast(count, DecimalType.bounded(DecimalType.MAX_PRECISION, 0)),
60+
case _: DecimalType =>
61+
Cast(
62+
DecimalPrecision.decimalAndDecimal(sum / Cast(count, DecimalType.LongDecimal)),
6463
resultType)
6564
case _ =>
6665
Cast(sum, resultType) / Cast(count, resultType)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ object DateTimeUtils {
9696
}
9797
}
9898

99-
def getThreadLocalDateFormat(): DateFormat = {
99+
def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = {
100100
val sdf = threadLocalDateFormat.get()
101-
sdf.setTimeZone(defaultTimeZone())
101+
sdf.setTimeZone(timeZone)
102102
sdf
103103
}
104104

@@ -144,7 +144,11 @@ object DateTimeUtils {
144144
}
145145

146146
def dateToString(days: SQLDate): String =
147-
getThreadLocalDateFormat.format(toJavaDate(days))
147+
getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days))
148+
149+
def dateToString(days: SQLDate, timeZone: TimeZone): String = {
150+
getThreadLocalDateFormat(timeZone).format(toJavaDate(days))
151+
}
148152

149153
// Converts Timestamp to string according to Hive TimestampWritable convention.
150154
def timestampToString(us: SQLTimestamp): String = {

sql/core/pom.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,6 @@
174174
<artifactId>parquet-avro</artifactId>
175175
<scope>test</scope>
176176
</dependency>
177-
<!--
178-
This version of avro test-dep is different from the one defined
179-
in the parent pom. The parent pom has avro 1.7.7 test-dep for Hadoop.
180-
Here, ParquetAvroCompatibilitySuite uses parquet-avro's AvroParquetWriter
181-
which uses avro 1.8.0+ specific API. In Maven 3, we need to have
182-
this here to have different versions for the same artifact.
183-
-->
184-
<dependency>
185-
<groupId>org.apache.avro</groupId>
186-
<artifactId>avro</artifactId>
187-
<version>1.8.1</version>
188-
<scope>test</scope>
189-
</dependency>
190177
<dependency>
191178
<groupId>org.mockito</groupId>
192179
<artifactId>mockito-core</artifactId>

0 commit comments

Comments
 (0)