Skip to content

Commit 4193d2f

Browse files
srowendongjoon-hyun
authored andcommitted
[SPARK-30012][CORE][SQL] Change classes extending scala collection classes to work with 2.13
### What changes were proposed in this pull request? Move some classes extending Scala collections into parallel source trees, to support 2.13; other minor collection-related modifications. Modify some classes extending Scala collections to work with 2.13 as well as 2.12. In many cases, this means introducing parallel source trees, as the type hierarchy changed in ways that one class can't support both. ### Why are the changes needed? To support building for Scala 2.13 in the future. ### Does this PR introduce any user-facing change? There should be no behavior change. ### How was this patch tested? Existing tests. Note that the 2.13 changes are not tested by the PR builder, of course. They compile in 2.13 but can't even be tested locally. Later, once the project can be compiled for 2.13, thus tested, it's possible the 2.13 implementations will need updates. Closes apache#26728 from srowen/SPARK-30012. Authored-by: Sean Owen <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a3394e4 commit 4193d2f

File tree

19 files changed

+510
-10
lines changed

19 files changed

+510
-10
lines changed

core/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
<properties>
3434
<sbt.project.name>core</sbt.project.name>
35-
<extra.source.dir>src/main/scala-${scala.binary.version}</extra.source.dir>
3635
</properties>
3736

3837
<dependencies>
@@ -530,7 +529,7 @@
530529
</goals>
531530
<configuration>
532531
<sources>
533-
<source>${extra.source.dir}</source>
532+
<source>src/main/scala-${scala.binary.version}</source>
534533
</sources>
535534
</configuration>
536535
</execution>

core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala renamed to core/src/main/scala-2.12/org/apache/spark/util/BoundedPriorityQueue.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import scala.collection.generic.Growable
3131
private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
3232
extends Iterable[A] with Growable[A] with Serializable {
3333

34+
// Note: this class supports Scala 2.12. A parallel source tree has a 2.13 implementation.
35+
3436
private val underlying = new JPriorityQueue[A](maxSize, ord)
3537

3638
override def iterator: Iterator[A] = underlying.iterator.asScala

core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala renamed to core/src/main/scala-2.12/org/apache/spark/util/TimeStampedHashMap.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ private[spark] case class TimeStampedValue[V](value: V, timestamp: Long)
4040
private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
4141
extends mutable.Map[A, B]() with Logging {
4242

43+
// Note: this class supports Scala 2.12. A parallel source tree has a 2.13 implementation.
44+
4345
private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
4446

4547
def get(key: A): Option[B] = {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import java.io.Serializable
21+
import java.util.{PriorityQueue => JPriorityQueue}
22+
23+
import scala.collection.JavaConverters._
24+
import scala.collection.mutable.Growable
25+
26+
/**
27+
* Bounded priority queue. This class wraps the original PriorityQueue
28+
* class and modifies it such that only the top K elements are retained.
29+
* The top K elements are defined by an implicit Ordering[A].
30+
*/
31+
private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
32+
extends Iterable[A] with Growable[A] with Serializable {
33+
34+
// Note: this class supports Scala 2.13. A parallel source tree has a 2.12 implementation.
35+
36+
private val underlying = new JPriorityQueue[A](maxSize, ord)
37+
38+
override def iterator: Iterator[A] = underlying.iterator.asScala
39+
40+
override def size: Int = underlying.size
41+
42+
override def knownSize: Int = size
43+
44+
override def addAll(xs: IterableOnce[A]): this.type = {
45+
xs.foreach { this += _ }
46+
this
47+
}
48+
49+
override def addOne(elem: A): this.type = {
50+
if (size < maxSize) {
51+
underlying.offer(elem)
52+
} else {
53+
maybeReplaceLowest(elem)
54+
}
55+
this
56+
}
57+
58+
def poll(): A = {
59+
underlying.poll()
60+
}
61+
62+
override def clear(): Unit = { underlying.clear() }
63+
64+
private def maybeReplaceLowest(a: A): Boolean = {
65+
val head = underlying.peek()
66+
if (head != null && ord.gt(a, head)) {
67+
underlying.poll()
68+
underlying.offer(a)
69+
} else {
70+
false
71+
}
72+
}
73+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import java.util.Map.Entry
21+
import java.util.Set
22+
import java.util.concurrent.ConcurrentHashMap
23+
24+
import scala.collection.JavaConverters._
25+
import scala.collection.mutable
26+
27+
import org.apache.spark.internal.Logging
28+
29+
private[spark] case class TimeStampedValue[V](value: V, timestamp: Long)
30+
31+
/**
32+
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
33+
* timestamp along with each key-value pair. If specified, the timestamp of each pair can be
34+
* updated every time it is accessed. Key-value pairs whose timestamp are older than a particular
35+
* threshold time can then be removed using the clearOldValues method. This is intended to
36+
* be a drop-in replacement of scala.collection.mutable.HashMap.
37+
*
38+
* @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed
39+
*/
40+
private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
41+
extends mutable.Map[A, B]() with Logging {
42+
43+
// Note: this class supports Scala 2.13. A parallel source tree has a 2.12 implementation.
44+
45+
private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
46+
47+
def get(key: A): Option[B] = {
48+
val value = internalMap.get(key)
49+
if (value != null && updateTimeStampOnGet) {
50+
internalMap.replace(key, value, TimeStampedValue(value.value, currentTime))
51+
}
52+
Option(value).map(_.value)
53+
}
54+
55+
def iterator: Iterator[(A, B)] = {
56+
getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value))
57+
}
58+
59+
def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet
60+
61+
override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = {
62+
val newMap = new TimeStampedHashMap[A, B1]
63+
val oldInternalMap = this.internalMap.asInstanceOf[ConcurrentHashMap[A, TimeStampedValue[B1]]]
64+
newMap.internalMap.putAll(oldInternalMap)
65+
kv match { case (a, b) => newMap.internalMap.put(a, TimeStampedValue(b, currentTime)) }
66+
newMap
67+
}
68+
69+
override def addOne(kv: (A, B)): this.type = {
70+
kv match { case (a, b) => internalMap.put(a, TimeStampedValue(b, currentTime)) }
71+
this
72+
}
73+
74+
override def subtractOne(key: A): this.type = {
75+
internalMap.remove(key)
76+
this
77+
}
78+
79+
override def update(key: A, value: B): Unit = {
80+
this += ((key, value))
81+
}
82+
83+
override def apply(key: A): B = {
84+
get(key).getOrElse { throw new NoSuchElementException() }
85+
}
86+
87+
override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = {
88+
internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v) }.filter(p)
89+
}
90+
91+
override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]()
92+
93+
override def size: Int = internalMap.size
94+
95+
override def foreach[U](f: ((A, B)) => U): Unit = {
96+
val it = getEntrySet.iterator
97+
while(it.hasNext) {
98+
val entry = it.next()
99+
val kv = (entry.getKey, entry.getValue.value)
100+
f(kv)
101+
}
102+
}
103+
104+
def putIfAbsent(key: A, value: B): Option[B] = {
105+
val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime))
106+
Option(prev).map(_.value)
107+
}
108+
109+
def putAll(map: Map[A, B]): Unit = {
110+
map.foreach { case (k, v) => update(k, v) }
111+
}
112+
113+
def toMap: Map[A, B] = iterator.toMap
114+
115+
def clearOldValues(threshTime: Long, f: (A, B) => Unit): Unit = {
116+
val it = getEntrySet.iterator
117+
while (it.hasNext) {
118+
val entry = it.next()
119+
if (entry.getValue.timestamp < threshTime) {
120+
f(entry.getKey, entry.getValue.value)
121+
logDebug("Removing key " + entry.getKey)
122+
it.remove()
123+
}
124+
}
125+
}
126+
127+
/** Removes old key-value pairs that have timestamp earlier than `threshTime`. */
128+
def clearOldValues(threshTime: Long): Unit = {
129+
clearOldValues(threshTime, (_, _) => ())
130+
}
131+
132+
private def currentTime: Long = System.currentTimeMillis
133+
134+
// For testing
135+
136+
def getTimeStampedValue(key: A): Option[TimeStampedValue[B]] = {
137+
Option(internalMap.get(key))
138+
}
139+
140+
def getTimestamp(key: A): Option[Long] = {
141+
getTimeStampedValue(key).map(_.timestamp)
142+
}
143+
}

core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
112112

113113
override def length: Int = curSize
114114

115-
override def size: Int = curSize
116-
117115
override def iterator: Iterator[T] = new Iterator[T] {
118116
private var pos = 0
119117
override def hasNext: Boolean = pos < curSize

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ class ExternalAppendOnlyMap[K, V, C](
367367
private def removeFromBuffer[T](buffer: ArrayBuffer[T], index: Int): T = {
368368
val elem = buffer(index)
369369
buffer(index) = buffer(buffer.size - 1) // This also works if index == buffer.size - 1
370-
buffer.reduceToSize(buffer.size - 1)
370+
buffer.trimEnd(1)
371371
elem
372372
}
373373

dev/change-scala-version.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
set -e
2121

22-
VALID_VERSIONS=( 2.12 )
22+
VALID_VERSIONS=( 2.12 2.13 )
2323

2424
usage() {
2525
echo "Usage: $(basename $0) [-h|--help] <version>

repl/pom.xml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232

3333
<properties>
3434
<sbt.project.name>repl</sbt.project.name>
35-
<extra.source.dir>src/main/scala-${scala.binary.version}</extra.source.dir>
36-
<extra.testsource.dir>src/test/scala-${scala.binary.version}</extra.testsource.dir>
3735
</properties>
3836

3937
<dependencies>
@@ -146,7 +144,7 @@
146144
</goals>
147145
<configuration>
148146
<sources>
149-
<source>${extra.source.dir}</source>
147+
<source>src/main/scala-${scala.binary.version}</source>
150148
</sources>
151149
</configuration>
152150
</execution>
@@ -158,7 +156,7 @@
158156
</goals>
159157
<configuration>
160158
<sources>
161-
<source>${extra.testsource.dir}</source>
159+
<source>src/test/scala-${scala.binary.version}</source>
162160
</sources>
163161
</configuration>
164162
</execution>

sql/catalyst/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,24 @@
167167
<treatWarningsAsErrors>true</treatWarningsAsErrors>
168168
</configuration>
169169
</plugin>
170+
<plugin>
171+
<groupId>org.codehaus.mojo</groupId>
172+
<artifactId>build-helper-maven-plugin</artifactId>
173+
<executions>
174+
<execution>
175+
<id>add-sources</id>
176+
<phase>generate-sources</phase>
177+
<goals>
178+
<goal>add-source</goal>
179+
</goals>
180+
<configuration>
181+
<sources>
182+
<source>src/main/scala-${scala.binary.version}</source>
183+
</sources>
184+
</configuration>
185+
</execution>
186+
</executions>
187+
</plugin>
170188
</plugins>
171189
</build>
172190

0 commit comments

Comments
 (0)