Skip to content

Commit b6348d6

Browse files
authored
Add spark.dotnet.ignoreSparkPatchVersionCheck conf to ignore Spark's patch version in DotnetRunner (#862)
1 parent d5e95b4 commit b6348d6

File tree

16 files changed

+553
-40
lines changed

16 files changed

+553
-40
lines changed

src/scala/microsoft-spark-2-3/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.apache.spark
1919
import org.apache.spark.api.dotnet.DotnetBackend
2020
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
2121
import org.apache.spark.internal.Logging
22+
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
2223
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
2324
import org.apache.spark.util.{RedirectThread, Utils}
2425
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
@@ -34,6 +35,7 @@ import scala.util.Try
3435
*/
3536
object DotnetRunner extends Logging {
3637
private val DEBUG_PORT = 5567
38+
private val supportedSparkMajorMinorVersionPrefix = "2.3"
3739
private val supportedSparkVersions = Set[String]("2.3.0", "2.3.1", "2.3.2", "2.3.3", "2.3.4")
3840

3941
val SPARK_VERSION = DotnetUtils.normalizeSparkVersion(spark.SPARK_VERSION)
@@ -43,7 +45,16 @@ object DotnetRunner extends Logging {
4345
throw new IllegalArgumentException("At least one argument is expected.")
4446
}
4547

46-
validateSparkVersions
48+
DotnetUtils.validateSparkVersions(
49+
sys.props
50+
.getOrElse(
51+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK.key,
52+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK.defaultValue.get.toString)
53+
.toBoolean,
54+
spark.SPARK_VERSION,
55+
SPARK_VERSION,
56+
supportedSparkMajorMinorVersionPrefix,
57+
supportedSparkVersions)
4758

4859
val settings = initializeSettings(args)
4960

@@ -164,15 +175,6 @@ object DotnetRunner extends Logging {
164175
}
165176
}
166177

167-
private def validateSparkVersions: Unit = {
168-
if (!supportedSparkVersions(SPARK_VERSION)) {
169-
val supportedVersions = supportedSparkVersions.toSeq.sorted.mkString(", ")
170-
throw new IllegalArgumentException(
171-
s"Unsupported spark version used: ${spark.SPARK_VERSION}. Normalized spark version used: ${SPARK_VERSION}." +
172-
s" Supported versions: ${supportedVersions}")
173-
}
174-
}
175-
176178
// When the executable is downloaded as part of zip file, check if the file exists
177179
// after zip file is unzipped under the given dir. Once it is found, change the
178180
// permission to executable (only for Unix systems, since the zip file may have been

src/scala/microsoft-spark-2-3/src/main/scala/org/apache/spark/internal/config/dotnet/Dotnet.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ import org.apache.spark.internal.config.ConfigBuilder
1111
private[spark] object Dotnet {
1212
val DOTNET_NUM_BACKEND_THREADS = ConfigBuilder("spark.dotnet.numDotnetBackendThreads").intConf
1313
.createWithDefault(10)
14+
15+
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
16+
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
17+
.createWithDefault(false)
1418
}

src/scala/microsoft-spark-2-3/src/main/scala/org/apache/spark/util/dotnet/Utils.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import java.util.{Timer, TimerTask}
1414

1515
import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream, ZipFile}
1616
import org.apache.commons.io.{FileUtils, IOUtils}
17+
import org.apache.spark.SparkConf
1718
import org.apache.spark.internal.Logging
19+
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
1820

1921
import scala.collection.JavaConverters._
2022
import scala.collection.Set
@@ -191,6 +193,43 @@ object Utils extends Logging {
191193
.mkString(".")
192194
}
193195

196+
/**
197+
* Validates the normalized spark version by verifying:
198+
* - Spark version starts with sparkMajorMinorVersionPrefix.
199+
* - If ignoreSparkPatchVersion is
200+
* - true: valid
201+
* - false: check if the spark version is in supportedSparkVersions.
202+
* @param ignoreSparkPatchVersion Ignore spark patch version.
203+
* @param sparkVersion The spark version.
204+
* @param normalizedSparkVersion: The normalized spark version.
205+
* @param supportedSparkMajorMinorVersionPrefix The spark major and minor version to validate against.
206+
* @param supportedSparkVersions The set of supported spark versions.
207+
*/
208+
def validateSparkVersions(
209+
ignoreSparkPatchVersion: Boolean,
210+
sparkVersion: String,
211+
normalizedSparkVersion: String,
212+
supportedSparkMajorMinorVersionPrefix: String,
213+
supportedSparkVersions: Set[String]): Unit = {
214+
if (!normalizedSparkVersion.startsWith(s"$supportedSparkMajorMinorVersionPrefix.")) {
215+
throw new IllegalArgumentException(
216+
s"Unsupported spark version used: '$sparkVersion'. " +
217+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
218+
s"Supported spark major.minor version: '$supportedSparkMajorMinorVersionPrefix'.")
219+
} else if (ignoreSparkPatchVersion) {
220+
logWarning(
221+
s"Ignoring spark patch version. Spark version used: '$sparkVersion'. " +
222+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
223+
s"Spark major.minor prefix used: '$supportedSparkMajorMinorVersionPrefix'.")
224+
} else if (!supportedSparkVersions(normalizedSparkVersion)) {
225+
val supportedVersions = supportedSparkVersions.toSeq.sorted.mkString(", ")
226+
throw new IllegalArgumentException(
227+
s"Unsupported spark version used: '$sparkVersion'. " +
228+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
229+
s"Supported versions: '$supportedVersions'.")
230+
}
231+
}
232+
194233
private[spark] def listZipFileEntries(file: File): Array[String] = {
195234
var zipFile: ZipFile = null
196235
try {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.util.dotnet
8+
9+
import org.apache.spark.SparkConf
10+
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
11+
import org.junit.Assert.{assertEquals, assertThrows}
12+
import org.junit.Test
13+
import org.junit.function.ThrowingRunnable
14+
15+
@Test
16+
class UtilsTest {
17+
18+
@Test
19+
def shouldIgnorePatchVersion(): Unit = {
20+
val sparkVersion = "2.3.5"
21+
val sparkMajorMinorVersionPrefix = "2.3"
22+
val supportedSparkVersions = Set[String]("2.3.0", "2.3.1", "2.3.2", "2.3.3", "2.3.4")
23+
24+
Utils.validateSparkVersions(
25+
true,
26+
sparkVersion,
27+
Utils.normalizeSparkVersion(sparkVersion),
28+
sparkMajorMinorVersionPrefix,
29+
supportedSparkVersions)
30+
}
31+
32+
@Test
33+
def shouldThrowForUnsupportedVersion(): Unit = {
34+
val sparkVersion = "2.3.5"
35+
val normalizedSparkVersion = Utils.normalizeSparkVersion(sparkVersion)
36+
val sparkMajorMinorVersionPrefix = "2.3"
37+
val supportedSparkVersions = Set[String]("2.3.0", "2.3.1", "2.3.2", "2.3.3", "2.3.4")
38+
39+
val exception = assertThrows(
40+
classOf[IllegalArgumentException],
41+
new ThrowingRunnable {
42+
override def run(): Unit = {
43+
Utils.validateSparkVersions(
44+
false,
45+
sparkVersion,
46+
normalizedSparkVersion,
47+
sparkMajorMinorVersionPrefix,
48+
supportedSparkVersions)
49+
}
50+
})
51+
52+
assertEquals(
53+
s"Unsupported spark version used: '$sparkVersion'. " +
54+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
55+
s"Supported versions: '${supportedSparkVersions.toSeq.sorted.mkString(", ")}'.",
56+
exception.getMessage)
57+
}
58+
59+
@Test
60+
def shouldThrowForUnsupportedMajorMinorVersion(): Unit = {
61+
val sparkVersion = "2.4.4"
62+
val normalizedSparkVersion = Utils.normalizeSparkVersion(sparkVersion)
63+
val sparkMajorMinorVersionPrefix = "2.3"
64+
val supportedSparkVersions = Set[String]("2.3.0", "2.3.1", "2.3.2", "2.3.3", "2.3.4")
65+
66+
val exception = assertThrows(
67+
classOf[IllegalArgumentException],
68+
new ThrowingRunnable {
69+
override def run(): Unit = {
70+
Utils.validateSparkVersions(
71+
false,
72+
sparkVersion,
73+
normalizedSparkVersion,
74+
sparkMajorMinorVersionPrefix,
75+
supportedSparkVersions)
76+
}
77+
})
78+
79+
assertEquals(
80+
s"Unsupported spark version used: '$sparkVersion'. " +
81+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
82+
s"Supported spark major.minor version: '$sparkMajorMinorVersionPrefix'.",
83+
exception.getMessage)
84+
}
85+
}

src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/deploy/dotnet/DotnetRunner.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.apache.spark
1919
import org.apache.spark.api.dotnet.DotnetBackend
2020
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
2121
import org.apache.spark.internal.Logging
22+
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
2223
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
2324
import org.apache.spark.util.{RedirectThread, Utils}
2425
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
@@ -34,6 +35,7 @@ import scala.util.Try
3435
*/
3536
object DotnetRunner extends Logging {
3637
private val DEBUG_PORT = 5567
38+
private val supportedSparkMajorMinorVersionPrefix = "2.4"
3739
private val supportedSparkVersions =
3840
Set[String]("2.4.0", "2.4.1", "2.4.3", "2.4.4", "2.4.5", "2.4.6", "2.4.7")
3941

@@ -44,7 +46,16 @@ object DotnetRunner extends Logging {
4446
throw new IllegalArgumentException("At least one argument is expected.")
4547
}
4648

47-
validateSparkVersions
49+
DotnetUtils.validateSparkVersions(
50+
sys.props
51+
.getOrElse(
52+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK.key,
53+
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK.defaultValue.get.toString)
54+
.toBoolean,
55+
spark.SPARK_VERSION,
56+
SPARK_VERSION,
57+
supportedSparkMajorMinorVersionPrefix,
58+
supportedSparkVersions)
4859

4960
val settings = initializeSettings(args)
5061

@@ -165,15 +176,6 @@ object DotnetRunner extends Logging {
165176
}
166177
}
167178

168-
private def validateSparkVersions: Unit = {
169-
if (!supportedSparkVersions(SPARK_VERSION)) {
170-
val supportedVersions = supportedSparkVersions.toSeq.sorted.mkString(", ")
171-
throw new IllegalArgumentException(
172-
s"Unsupported spark version used: ${spark.SPARK_VERSION}. Normalized spark version used: ${SPARK_VERSION}." +
173-
s" Supported versions: ${supportedVersions}")
174-
}
175-
}
176-
177179
// When the executable is downloaded as part of zip file, check if the file exists
178180
// after zip file is unzipped under the given dir. Once it is found, change the
179181
// permission to executable (only for Unix systems, since the zip file may have been

src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/internal/config/dotnet/Dotnet.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,8 @@ import org.apache.spark.internal.config.ConfigBuilder
1111
private[spark] object Dotnet {
1212
val DOTNET_NUM_BACKEND_THREADS = ConfigBuilder("spark.dotnet.numDotnetBackendThreads").intConf
1313
.createWithDefault(10)
14+
15+
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
16+
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
17+
.createWithDefault(false)
1418
}

src/scala/microsoft-spark-2-4/src/main/scala/org/apache/spark/util/dotnet/Utils.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import java.util.{Timer, TimerTask}
1414

1515
import org.apache.commons.compress.archivers.zip.{ZipArchiveEntry, ZipArchiveOutputStream, ZipFile}
1616
import org.apache.commons.io.{FileUtils, IOUtils}
17+
import org.apache.spark.SparkConf
1718
import org.apache.spark.internal.Logging
19+
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
1820

1921
import scala.collection.JavaConverters._
2022
import scala.collection.Set
@@ -190,6 +192,43 @@ object Utils extends Logging {
190192
.mkString(".")
191193
}
192194

195+
/**
196+
* Validates the normalized spark version by verifying:
197+
* - Spark version starts with sparkMajorMinorVersionPrefix.
198+
* - If ignoreSparkPatchVersion is
199+
* - true: valid
200+
* - false: check if the spark version is in supportedSparkVersions.
201+
* @param ignoreSparkPatchVersion Ignore spark patch version.
202+
* @param sparkVersion The spark version.
203+
* @param normalizedSparkVersion: The normalized spark version.
204+
* @param supportedSparkMajorMinorVersionPrefix The spark major and minor version to validate against.
205+
* @param supportedSparkVersions The set of supported spark versions.
206+
*/
207+
def validateSparkVersions(
208+
ignoreSparkPatchVersion: Boolean,
209+
sparkVersion: String,
210+
normalizedSparkVersion: String,
211+
supportedSparkMajorMinorVersionPrefix: String,
212+
supportedSparkVersions: Set[String]): Unit = {
213+
if (!normalizedSparkVersion.startsWith(s"$supportedSparkMajorMinorVersionPrefix.")) {
214+
throw new IllegalArgumentException(
215+
s"Unsupported spark version used: '$sparkVersion'. " +
216+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
217+
s"Supported spark major.minor version: '$supportedSparkMajorMinorVersionPrefix'.")
218+
} else if (ignoreSparkPatchVersion) {
219+
logWarning(
220+
s"Ignoring spark patch version. Spark version used: '$sparkVersion'. " +
221+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
222+
s"Spark major.minor prefix used: '$supportedSparkMajorMinorVersionPrefix'.")
223+
} else if (!supportedSparkVersions(normalizedSparkVersion)) {
224+
val supportedVersions = supportedSparkVersions.toSeq.sorted.mkString(", ")
225+
throw new IllegalArgumentException(
226+
s"Unsupported spark version used: '$sparkVersion'. " +
227+
s"Normalized spark version used: '$normalizedSparkVersion'. " +
228+
s"Supported versions: '$supportedVersions'.")
229+
}
230+
}
231+
193232
private[spark] def listZipFileEntries(file: File): Array[String] = {
194233
var zipFile: ZipFile = null
195234
try {

0 commit comments

Comments
 (0)