Skip to content

Commit 44e04a3

Browse files
committed
merge in master again to get jackson fix
2 parents 20c66ef + ab1650d commit 44e04a3

File tree

19 files changed

+331
-165
lines changed

19 files changed

+331
-165
lines changed

core/src/main/scala/org/apache/spark/status/KVUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[spark] object KVUtils extends Logging {
4242
private[spark] class KVStoreScalaSerializer extends KVStoreSerializer {
4343

4444
mapper.registerModule(DefaultScalaModule)
45-
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
45+
mapper.setSerializationInclusion(JsonInclude.Include.NON_ABSENT)
4646

4747
}
4848

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ private[spark] class ExecutorMetricsJsonSerializer
139139
jsonGenerator.writeObject(metricsMap)
140140
}
141141
}
142+
143+
override def isEmpty(provider: SerializerProvider, value: Option[ExecutorMetrics]): Boolean =
144+
value.isEmpty
142145
}
143146

144147
class JobData private[spark](

pom.xml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,12 @@
166166
<scala.binary.version>2.11</scala.binary.version>
167167
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
168168
<fasterxml.jackson.version>2.9.6</fasterxml.jackson.version>
169+
<<<<<<< HEAD
169170
<fasterxml.jackson.databind.version>2.9.6</fasterxml.jackson.databind.version>
170171
<snappy.version>1.1.7.2</snappy.version>
172+
=======
173+
<snappy.version>1.1.7.1</snappy.version>
174+
>>>>>>> ab1650d2938db4901b8c28df945d6a0691a19d31
171175
<netlib.java.version>1.1.2</netlib.java.version>
172176
<calcite.version>1.2.0-incubating</calcite.version>
173177
<commons-codec.version>1.11</commons-codec.version>
@@ -743,7 +747,7 @@
743747
<dependency>
744748
<groupId>com.fasterxml.jackson.core</groupId>
745749
<artifactId>jackson-databind</artifactId>
746-
<version>${fasterxml.jackson.databind.version}</version>
750+
<version>${fasterxml.jackson.version}</version>
747751
</dependency>
748752
<dependency>
749753
<groupId>com.fasterxml.jackson.core</groupId>
@@ -755,7 +759,7 @@
755759
<dependency>
756760
<groupId>com.fasterxml.jackson.module</groupId>
757761
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
758-
<version>${fasterxml.jackson.databind.version}</version>
762+
<version>${fasterxml.jackson.version}</version>
759763
<exclusions>
760764
<exclusion>
761765
<groupId>com.google.guava</groupId>

python/pyspark/sql/tests.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5646,8 +5646,9 @@ def test_register_grouped_map_udf(self):
56465646

56475647
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
56485648
with QuietTest(self.sc):
5649-
with self.assertRaisesRegexp(ValueError, 'f must be either SQL_BATCHED_UDF or '
5650-
'SQL_SCALAR_PANDAS_UDF'):
5649+
with self.assertRaisesRegexp(
5650+
ValueError,
5651+
'f.*SQL_BATCHED_UDF.*SQL_SCALAR_PANDAS_UDF.*SQL_GROUPED_AGG_PANDAS_UDF.*'):
56515652
self.spark.catalog.registerFunction("foo_udf", foo_udf)
56525653

56535654
def test_decorator(self):
@@ -6463,6 +6464,21 @@ def test_invalid_args(self):
64636464
'mixture.*aggregate function.*group aggregate pandas UDF'):
64646465
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
64656466

6467+
def test_register_vectorized_udf_basic(self):
6468+
from pyspark.sql.functions import pandas_udf
6469+
from pyspark.rdd import PythonEvalType
6470+
6471+
sum_pandas_udf = pandas_udf(
6472+
lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
6473+
6474+
self.assertEqual(sum_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
6475+
group_agg_pandas_udf = self.spark.udf.register("sum_pandas_udf", sum_pandas_udf)
6476+
self.assertEqual(group_agg_pandas_udf.evalType, PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
6477+
q = "SELECT sum_pandas_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
6478+
actual = sorted(map(lambda r: r[0], self.spark.sql(q).collect()))
6479+
expected = [1, 5]
6480+
self.assertEqual(actual, expected)
6481+
64666482

64676483
@unittest.skipIf(
64686484
not _have_pandas or not _have_pyarrow,

python/pyspark/sql/udf.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,15 @@ def register(self, name, f, returnType=None):
299299
>>> spark.sql("SELECT add_one(id) FROM range(3)").collect() # doctest: +SKIP
300300
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
301301
302+
>>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
303+
... def sum_udf(v):
304+
... return v.sum()
305+
...
306+
>>> _ = spark.udf.register("sum_udf", sum_udf) # doctest: +SKIP
307+
>>> q = "SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
308+
>>> spark.sql(q).collect() # doctest: +SKIP
309+
[Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]
310+
302311
.. note:: Registration for a user-defined function (case 2.) was added from
303312
Spark 2.3.0.
304313
"""
@@ -311,9 +320,11 @@ def register(self, name, f, returnType=None):
311320
"Invalid returnType: data type can not be specified when f is"
312321
"a user-defined function, but got %s." % returnType)
313322
if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
314-
PythonEvalType.SQL_SCALAR_PANDAS_UDF]:
323+
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
324+
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
315325
raise ValueError(
316-
"Invalid f: f must be either SQL_BATCHED_UDF or SQL_SCALAR_PANDAS_UDF")
326+
"Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF or "
327+
"SQL_GROUPED_AGG_PANDAS_UDF")
317328
register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
318329
evalType=f.evalType,
319330
deterministic=f.deterministic)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
2020
import java.sql.{Date, Timestamp}
2121
import java.util.{Calendar, Locale, TimeZone}
2222

23+
import scala.util.Random
24+
2325
import org.apache.spark.SparkFunSuite
2426
import org.apache.spark.sql.Row
2527
import org.apache.spark.sql.catalyst.InternalRow
@@ -110,7 +112,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
110112
}
111113

112114
test("cast string to timestamp") {
113-
for (tz <- ALL_TIMEZONES) {
115+
for (tz <- Random.shuffle(ALL_TIMEZONES).take(50)) {
114116
def checkCastStringToTimestamp(str: String, expected: Timestamp): Unit = {
115117
checkEvaluation(cast(Literal(str), TimestampType, Option(tz.getID)), expected)
116118
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,16 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
346346
projection(row)
347347
}
348348

349+
test("SPARK-22226: splitExpressions should not generate codes beyond 64KB") {
350+
val colNumber = 10000
351+
val attrs = (1 to colNumber).map(colIndex => AttributeReference(s"_$colIndex", IntegerType)())
352+
val lit = Literal(1000)
353+
val exprs = attrs.flatMap { a =>
354+
Seq(If(lit < a, lit, a), sqrt(a))
355+
}
356+
UnsafeProjection.create(exprs, attrs)
357+
}
358+
349359
test("SPARK-22543: split large predicates into blocks due to JVM code size limit") {
350360
val length = 600
351361

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,9 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
273273
for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
274274
val timeZoneId = Option(tz.getID)
275275
c.setTimeZone(tz)
276-
(0 to 24).foreach { h =>
277-
(0 to 60 by 15).foreach { m =>
278-
(0 to 60 by 15).foreach { s =>
276+
(0 to 24 by 6).foreach { h =>
277+
(0 to 60 by 30).foreach { m =>
278+
(0 to 60 by 30).foreach { s =>
279279
c.set(2015, 18, 3, h, m, s)
280280
checkEvaluation(
281281
Hour(Literal(new Timestamp(c.getTimeInMillis)), timeZoneId),

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
244244
"1234")
245245
}
246246

247+
test("some big value") {
248+
val value = "x" * 3000
249+
checkEvaluation(
250+
GetJsonObject(NonFoldableLiteral((s"""{"big": "$value"}""")),
251+
NonFoldableLiteral("$.big")), value)
252+
}
253+
247254
val jsonTupleQuery = Literal("f1") ::
248255
Literal("f2") ::
249256
Literal("f3") ::
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
================================================================================================
2+
Dataset Benchmark
3+
================================================================================================
4+
5+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
6+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
7+
back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
8+
------------------------------------------------------------------------------------------------
9+
RDD 11800 / 12042 8.5 118.0 1.0X
10+
DataFrame 1927 / 2189 51.9 19.3 6.1X
11+
Dataset 2483 / 2605 40.3 24.8 4.8X
12+
13+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
14+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
15+
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
16+
------------------------------------------------------------------------------------------------
17+
RDD 16286 / 16301 6.1 162.9 1.0X
18+
DataFrame 8101 / 8104 12.3 81.0 2.0X
19+
Dataset 17445 / 17811 5.7 174.4 0.9X
20+
21+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
22+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
23+
back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
24+
------------------------------------------------------------------------------------------------
25+
RDD 2971 / 3184 33.7 29.7 1.0X
26+
DataFrame 1243 / 1296 80.5 12.4 2.4X
27+
Dataset 3062 / 3091 32.7 30.6 1.0X
28+
29+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
30+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
31+
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
32+
------------------------------------------------------------------------------------------------
33+
RDD 5253 / 5269 19.0 52.5 1.0X
34+
DataFrame 211 / 234 473.4 2.1 24.9X
35+
Dataset 9550 / 9552 10.5 95.5 0.6X
36+
37+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
38+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
39+
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
40+
------------------------------------------------------------------------------------------------
41+
RDD sum 5086 / 5108 19.7 50.9 1.0X
42+
DataFrame sum 65 / 73 1548.9 0.6 78.8X
43+
Dataset sum using Aggregator 9024 / 9320 11.1 90.2 0.6X
44+
Dataset complex Aggregator 15079 / 15171 6.6 150.8 0.3X
45+
46+

0 commit comments

Comments
 (0)