Skip to content

Commit 296d0f9

Browse files
committed
[SPARK-53004][CORE] Support abbreviate in SparkStringUtils
### What changes were proposed in this pull request? This PR aims to support `abbreviate` in `SparkStringUtils`. In addition, this PR adds a new Scalastyle rule to ban `StringUtils. abbreviate` in favor of the built-in implementation. ### Why are the changes needed? To improve Spark's string utility and reduce the 3rd party library dependency. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51711 from dongjoon-hyun/SPARK-53004. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 95abf2f commit 296d0f9

File tree

11 files changed

+31
-23
lines changed

11 files changed

+31
-23
lines changed

common/utils/src/main/scala/org/apache/spark/util/SparkStringUtils.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@ private[spark] trait SparkStringUtils {
3838
SPACE_DELIMITED_UPPERCASE_HEX.parseHex(hex.stripPrefix("[").stripSuffix("]"))
3939
}
4040

41+
def abbreviate(str: String, abbrevMarker: String, len: Int): String = {
42+
if (str == null || abbrevMarker == null) {
43+
null
44+
} else if (str.length() <= len || str.length() <= abbrevMarker.length()) {
45+
str
46+
} else {
47+
str.substring(0, len - abbrevMarker.length()) + abbrevMarker
48+
}
49+
}
50+
51+
def abbreviate(str: String, len: Int): String = abbreviate(str, "...", len)
52+
4153
def sideBySide(left: String, right: String): Seq[String] = {
4254
sideBySide(left.split("\n").toImmutableArraySeq, right.split("\n").toImmutableArraySeq)
4355
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s
1919
import java.util.{Locale, UUID}
2020

2121
import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
22-
import org.apache.commons.lang3.StringUtils
2322

2423
import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException}
2524
import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
@@ -314,7 +313,7 @@ private[spark] object KubernetesConf {
314313
// must be 63 characters or less to follow the DNS label standard, so take the 63 characters
315314
// of the appName name as the label. In addition, label value must start and end with
316315
// an alphanumeric character.
317-
StringUtils.abbreviate(
316+
Utils.abbreviate(
318317
s"$appName"
319318
.trim
320319
.toLowerCase(Locale.ROOT)

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import scala.concurrent.Promise
2828
import scala.concurrent.duration.Duration
2929
import scala.util.control.NonFatal
3030

31-
import org.apache.commons.lang3.{StringUtils => ComStrUtils}
3231
import org.apache.hadoop.fs.{FileSystem, Path}
3332
import org.apache.hadoop.security.UserGroupInformation
3433
import org.apache.hadoop.yarn.api._
@@ -389,7 +388,7 @@ private[spark] class ApplicationMaster(
389388
logInfo(log"Final app status: ${MDC(LogKeys.APP_STATE, finalStatus)}, " +
390389
log"exitCode: ${MDC(LogKeys.EXIT_CODE, exitCode)}" +
391390
Option(msg).map(msg => log", (reason: ${MDC(LogKeys.REASON, msg)})").getOrElse(log""))
392-
finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt)
391+
finalMsg = Utils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt)
393392
finished = true
394393
if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
395394
logDebug("shutting down reporter thread")

scalastyle-config.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,11 @@ This file is divided into 3 sections:
329329
<customMessage>Use org.apache.spark.StringSubstitutor instead</customMessage>
330330
</check>
331331

332+
<check customId="commonslang3abbreviate" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
333+
<parameters><parameter name="regex">StringUtils\.abbreviate\(</parameter></parameters>
334+
<customMessage>Use Utils.abbreviate method instead</customMessage>
335+
</check>
336+
332337
<check customId="uribuilder" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
333338
<parameters><parameter name="regex">UriBuilder\.fromUri</parameter></parameters>
334339
<customMessage>Use Utils.getUriBuilder instead.</customMessage>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import scala.util.control.NonFatal
2727
import com.fasterxml.jackson.annotation.JsonInclude.Include
2828
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
2929
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
30-
import org.apache.commons.lang3.StringUtils
3130
import org.json4s.JsonAST.{JArray, JBool, JDecimal, JDouble, JInt, JLong, JNull, JObject, JString, JValue}
3231
import org.json4s.jackson.JsonMethods._
3332

@@ -50,6 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
5049
import org.apache.spark.sql.types._
5150
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
5251
import org.apache.spark.util.ArrayImplicits._
52+
import org.apache.spark.util.Utils
5353

5454
/**
5555
* Interface providing util to convert JValue to String representation of catalog entities.
@@ -1150,7 +1150,7 @@ case class HiveTableRelation(
11501150
val metadataEntries = metadata.toSeq.map {
11511151
case (key, value) if key == "CatalogTable" => value
11521152
case (key, value) =>
1153-
key + ": " + StringUtils.abbreviate(value, SQLConf.get.maxMetadataStringLength)
1153+
key + ": " + Utils.abbreviate(value, SQLConf.get.maxMetadataStringLength)
11541154
}
11551155

11561156
val metadataStr = truncatedString(metadataEntries, "[", ", ", "]", maxFields)

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.jdk.CollectionConverters._
2323
import scala.util.control.NonFatal
2424

2525
import com.google.protobuf.Message
26-
import org.apache.commons.lang3.StringUtils
2726

2827
import org.apache.spark.SparkSQLException
2928
import org.apache.spark.connect.proto
@@ -209,16 +208,14 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
209208
tag))
210209
}
211210
session.sparkContext.setJobDescription(
212-
s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
211+
s"Spark Connect - ${Utils.abbreviate(debugString, 128)}")
213212
session.sparkContext.setInterruptOnCancel(true)
214213

215214
// Add debug information to the query execution so that the jobs are traceable.
216215
session.sparkContext.setLocalProperty(
217216
"callSite.short",
218-
s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
219-
session.sparkContext.setLocalProperty(
220-
"callSite.long",
221-
StringUtils.abbreviate(debugString, 2048))
217+
s"Spark Connect - ${Utils.abbreviate(debugString, 128)}")
218+
session.sparkContext.setLocalProperty("callSite.long", Utils.abbreviate(debugString, 2048))
222219

223220
executeHolder.request.getPlan.getOpTypeCase match {
224221
case proto.Plan.OpTypeCase.COMMAND => handleCommand(executeHolder.request)

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import io.grpc.netty.NettyServerBuilder
2929
import io.grpc.protobuf.ProtoUtils
3030
import io.grpc.protobuf.services.ProtoReflectionService
3131
import io.grpc.stub.StreamObserver
32-
import org.apache.commons.lang3.StringUtils
3332

3433
import org.apache.spark.{SparkContext, SparkEnv}
3534
import org.apache.spark.connect.proto
@@ -504,7 +503,7 @@ object SparkConnectService extends Logging {
504503
}
505504

506505
def extractErrorMessage(st: Throwable): String = {
507-
val message = StringUtils.abbreviate(st.getMessage, 2048)
506+
val message = Utils.abbreviate(st.getMessage, 2048)
508507
convertNullString(message)
509508
}
510509

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import com.google.rpc.{Code => RPCCode, ErrorInfo, Status => RPCStatus}
3030
import io.grpc.Status
3131
import io.grpc.protobuf.StatusProto
3232
import io.grpc.stub.StreamObserver
33-
import org.apache.commons.lang3.StringUtils
3433
import org.apache.commons.lang3.exception.ExceptionUtils
3534
import org.json4s.JsonDSL._
3635
import org.json4s.jackson.JsonMethods
@@ -44,6 +43,7 @@ import org.apache.spark.sql.connect.config.Connect
4443
import org.apache.spark.sql.connect.service.{ExecuteEventsManager, SessionHolder, SessionKey, SparkConnectService}
4544
import org.apache.spark.sql.internal.SQLConf
4645
import org.apache.spark.util.ArrayImplicits._
46+
import org.apache.spark.util.Utils
4747

4848
private[connect] object ErrorUtils extends Logging {
4949

@@ -225,7 +225,7 @@ private[connect] object ErrorUtils extends Logging {
225225
val maxSize = Math.min(
226226
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
227227
maxMetadataSize)
228-
errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize.toInt))
228+
errorInfo.putMetadata("stackTrace", Utils.abbreviate(stackTrace.get, maxSize.toInt))
229229
} else {
230230
errorInfo
231231
}
@@ -297,7 +297,7 @@ private[connect] object ErrorUtils extends Logging {
297297
e,
298298
Status.UNKNOWN
299299
.withCause(e)
300-
.withDescription(StringUtils.abbreviate(e.getMessage, 2048))
300+
.withDescription(Utils.abbreviate(e.getMessage, 2048))
301301
.asRuntimeException())
302302
}
303303
partial

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
1919

2020
import java.util.concurrent.TimeUnit._
2121

22-
import org.apache.commons.lang3.StringUtils
2322
import org.apache.hadoop.fs.Path
2423

2524
import org.apache.spark.internal.LogKeys.{COUNT, MAX_SPLIT_BYTES, OPEN_COST_IN_BYTES}
@@ -65,7 +64,7 @@ trait DataSourceScanExec extends LeafExecNode with StreamSourceAwareSparkPlan {
6564
override def simpleString(maxFields: Int): String = {
6665
val metadataEntries = metadata.toSeq.sorted.map {
6766
case (key, value) =>
68-
key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength)
67+
key + ": " + Utils.abbreviate(redact(value), maxMetadataValueLength)
6968
}
7069
val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields)
7170
redact(

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.columnar
1919

2020
import com.esotericsoftware.kryo.{DefaultSerializer, Kryo, Serializer => KryoSerializer}
2121
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
22-
import org.apache.commons.lang3.StringUtils
2322

2423
import org.apache.spark.{SparkException, TaskContext}
2524
import org.apache.spark.network.util.JavaUtils
@@ -267,7 +266,7 @@ case class CachedRDDBuilder(
267266
private val materializedPartitions = cachedPlan.session.sparkContext.longAccumulator
268267

269268
val cachedName = tableName.map(n => s"In-memory table $n")
270-
.getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))
269+
.getOrElse(Utils.abbreviate(cachedPlan.toString, 1024))
271270

272271
val supportsColumnarInput: Boolean = {
273272
cachedPlan.supportsColumnar &&

0 commit comments

Comments
 (0)