Skip to content

Commit e4639fa

Browse files
smurakoziMarcelo Vanzin
authored andcommitted
[SPARK-21672][CORE] Remove SHS-specific application / attempt data …
…structures ## What changes were proposed in this pull request? In general, the SHS pages now use the public API types to represent applications. Some internal code paths still used its own view of what applications and attempts look like (`ApplicationHistoryInfo` and `ApplicationAttemptInfo`), declared in ApplicationHistoryProvider.scala. This pull request removes these classes and updates the rest of the code to use `status.api.v1.ApplicationInfo` and `status.api.v1.ApplicationAttemptInfo` instead. Furthermore `status.api.v1.ApplicationInfo` and `status.api.v1.ApplicationAttemptInfo` were changed to case class to - facilitate copying instances - equality checking in test code - nicer toString() To simplify the code a bit `v1.` prefixes were also removed from occurrences of v1.ApplicationInfo and v1.ApplicationAttemptInfo as there is no more ambiguity between classes in history and status.api.v1. ## How was this patch tested? By running existing automated tests. Author: Sandor Murakozi <[email protected]> Closes #19920 from smurakozi/SPARK-21672.
1 parent 26e6645 commit e4639fa

File tree

9 files changed

+63
-111
lines changed

9 files changed

+63
-111
lines changed

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,9 @@ import java.util.zip.ZipOutputStream
2323
import scala.xml.Node
2424

2525
import org.apache.spark.SparkException
26+
import org.apache.spark.status.api.v1.ApplicationInfo
2627
import org.apache.spark.ui.SparkUI
2728

28-
private[spark] case class ApplicationAttemptInfo(
29-
attemptId: Option[String],
30-
startTime: Long,
31-
endTime: Long,
32-
lastUpdated: Long,
33-
sparkUser: String,
34-
completed: Boolean = false,
35-
appSparkVersion: String)
36-
37-
private[spark] case class ApplicationHistoryInfo(
38-
id: String,
39-
name: String,
40-
attempts: List[ApplicationAttemptInfo]) {
41-
42-
/**
43-
* Has this application completed?
44-
* @return true if the most recent attempt has completed
45-
*/
46-
def completed: Boolean = {
47-
attempts.nonEmpty && attempts.head.completed
48-
}
49-
}
50-
5129
/**
5230
* A loaded UI for a Spark application.
5331
*
@@ -119,7 +97,7 @@ private[history] abstract class ApplicationHistoryProvider {
11997
*
12098
* @return List of all know applications.
12199
*/
122-
def getListing(): Iterator[ApplicationHistoryInfo]
100+
def getListing(): Iterator[ApplicationInfo]
123101

124102
/**
125103
* Returns the Spark UI for a specific application.
@@ -152,9 +130,9 @@ private[history] abstract class ApplicationHistoryProvider {
152130
def writeEventLogs(appId: String, attemptId: Option[String], zipStream: ZipOutputStream): Unit
153131

154132
/**
155-
* @return the [[ApplicationHistoryInfo]] for the appId if it exists.
133+
* @return the [[ApplicationInfo]] for the appId if it exists.
156134
*/
157-
def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo]
135+
def getApplicationInfo(appId: String): Option[ApplicationInfo]
158136

159137
/**
160138
* @return html text to display when the application list is empty

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.scheduler._
4343
import org.apache.spark.scheduler.ReplayListenerBus._
4444
import org.apache.spark.status._
4545
import org.apache.spark.status.KVUtils._
46-
import org.apache.spark.status.api.v1
46+
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
4747
import org.apache.spark.ui.SparkUI
4848
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
4949
import org.apache.spark.util.kvstore._
@@ -252,19 +252,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
252252
}
253253
}
254254

255-
override def getListing(): Iterator[ApplicationHistoryInfo] = {
255+
override def getListing(): Iterator[ApplicationInfo] = {
256256
// Return the listing in end time descending order.
257257
listing.view(classOf[ApplicationInfoWrapper])
258258
.index("endTime")
259259
.reverse()
260260
.iterator()
261261
.asScala
262-
.map(_.toAppHistoryInfo())
262+
.map(_.toApplicationInfo())
263263
}
264264

265-
override def getApplicationInfo(appId: String): Option[ApplicationHistoryInfo] = {
265+
override def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
266266
try {
267-
Some(load(appId).toAppHistoryInfo())
267+
Some(load(appId).toApplicationInfo())
268268
} catch {
269269
case e: NoSuchElementException =>
270270
None
@@ -795,24 +795,16 @@ private[history] case class LogInfo(
795795
fileSize: Long)
796796

797797
private[history] class AttemptInfoWrapper(
798-
val info: v1.ApplicationAttemptInfo,
798+
val info: ApplicationAttemptInfo,
799799
val logPath: String,
800800
val fileSize: Long,
801801
val adminAcls: Option[String],
802802
val viewAcls: Option[String],
803803
val adminAclsGroups: Option[String],
804-
val viewAclsGroups: Option[String]) {
805-
806-
def toAppAttemptInfo(): ApplicationAttemptInfo = {
807-
ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
808-
info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
809-
info.completed, info.appSparkVersion)
810-
}
811-
812-
}
804+
val viewAclsGroups: Option[String])
813805

814806
private[history] class ApplicationInfoWrapper(
815-
val info: v1.ApplicationInfo,
807+
val info: ApplicationInfo,
816808
val attempts: List[AttemptInfoWrapper]) {
817809

818810
@JsonIgnore @KVIndexParam
@@ -824,9 +816,7 @@ private[history] class ApplicationInfoWrapper(
824816
@JsonIgnore @KVIndexParam("oldestAttempt")
825817
def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min
826818

827-
def toAppHistoryInfo(): ApplicationHistoryInfo = {
828-
ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo()))
829-
}
819+
def toApplicationInfo(): ApplicationInfo = info.copy(attempts = attempts.map(_.info))
830820

831821
}
832822

@@ -883,7 +873,7 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
883873
var memoryPerExecutorMB: Option[Int] = None
884874

885875
def toView(): ApplicationInfoWrapper = {
886-
val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor,
876+
val apiInfo = ApplicationInfo(id, name, coresGranted, maxCores, coresPerExecutor,
887877
memoryPerExecutorMB, Nil)
888878
new ApplicationInfoWrapper(apiInfo, List(attempt.toView()))
889879
}
@@ -906,7 +896,7 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
906896
var viewAclsGroups: Option[String] = None
907897

908898
def toView(): AttemptInfoWrapper = {
909-
val apiInfo = new v1.ApplicationAttemptInfo(
899+
val apiInfo = ApplicationAttemptInfo(
910900
attemptId,
911901
startTime,
912902
endTime,

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import javax.servlet.http.HttpServletRequest
2121

2222
import scala.xml.Node
2323

24+
import org.apache.spark.status.api.v1.ApplicationInfo
2425
import org.apache.spark.ui.{UIUtils, WebUIPage}
2526

2627
private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
@@ -30,7 +31,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
3031
val requestedIncomplete =
3132
Option(UIUtils.stripXSS(request.getParameter("showIncomplete"))).getOrElse("false").toBoolean
3233

33-
val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
34+
val allAppsSize = parent.getApplicationList()
35+
.count(isApplicationCompleted(_) != requestedIncomplete)
3436
val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
3537
val lastUpdatedTime = parent.getLastUpdatedTime()
3638
val providerConfig = parent.getProviderConfig()
@@ -88,4 +90,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
8890
private def makePageLink(showIncomplete: Boolean): String = {
8991
UIUtils.prependBaseUri("/?" + "showIncomplete=" + showIncomplete)
9092
}
93+
94+
private def isApplicationCompleted(appInfo: ApplicationInfo): Boolean = {
95+
appInfo.attempts.nonEmpty && appInfo.attempts.head.completed
96+
}
9197
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
3030
import org.apache.spark.deploy.SparkHadoopUtil
3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.internal.config._
33-
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot}
33+
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
3434
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
3535
import org.apache.spark.ui.JettyUtils._
3636
import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils}
@@ -175,7 +175,7 @@ class HistoryServer(
175175
*
176176
* @return List of all known applications.
177177
*/
178-
def getApplicationList(): Iterator[ApplicationHistoryInfo] = {
178+
def getApplicationList(): Iterator[ApplicationInfo] = {
179179
provider.getListing()
180180
}
181181

@@ -188,11 +188,11 @@ class HistoryServer(
188188
}
189189

190190
def getApplicationInfoList: Iterator[ApplicationInfo] = {
191-
getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
191+
getApplicationList()
192192
}
193193

194194
def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
195-
provider.getApplicationInfo(appId).map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
195+
provider.getApplicationInfo(appId)
196196
}
197197

198198
override def writeEventLogs(

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private[spark] class AppStatusListener(
7777
override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
7878
assert(event.appId.isDefined, "Application without IDs are not supported.")
7979

80-
val attempt = new v1.ApplicationAttemptInfo(
80+
val attempt = v1.ApplicationAttemptInfo(
8181
event.appAttemptId,
8282
new Date(event.time),
8383
new Date(-1),
@@ -87,7 +87,7 @@ private[spark] class AppStatusListener(
8787
false,
8888
sparkVersion)
8989

90-
appInfo = new v1.ApplicationInfo(
90+
appInfo = v1.ApplicationInfo(
9191
event.appId.get,
9292
event.appName,
9393
None,
@@ -122,7 +122,7 @@ private[spark] class AppStatusListener(
122122

123123
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
124124
val old = appInfo.attempts.head
125-
val attempt = new v1.ApplicationAttemptInfo(
125+
val attempt = v1.ApplicationAttemptInfo(
126126
old.attemptId,
127127
old.startTime,
128128
new Date(event.time),
@@ -132,7 +132,7 @@ private[spark] class AppStatusListener(
132132
true,
133133
old.appSparkVersion)
134134

135-
appInfo = new v1.ApplicationInfo(
135+
appInfo = v1.ApplicationInfo(
136136
appInfo.id,
137137
appInfo.name,
138138
None,

core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import java.util.{Date, List => JList}
2020
import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
2121
import javax.ws.rs.core.MediaType
2222

23-
import org.apache.spark.deploy.history.ApplicationHistoryInfo
24-
2523
@Produces(Array(MediaType.APPLICATION_JSON))
2624
private[v1] class ApplicationListResource extends ApiRequestContext {
2725

@@ -67,33 +65,3 @@ private[v1] class ApplicationListResource extends ApiRequestContext {
6765
startTimeOk && endTimeOk
6866
}
6967
}
70-
71-
private[spark] object ApplicationsListResource {
72-
def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = {
73-
new ApplicationInfo(
74-
id = app.id,
75-
name = app.name,
76-
coresGranted = None,
77-
maxCores = None,
78-
coresPerExecutor = None,
79-
memoryPerExecutorMB = None,
80-
attempts = app.attempts.map { internalAttemptInfo =>
81-
new ApplicationAttemptInfo(
82-
attemptId = internalAttemptInfo.attemptId,
83-
startTime = new Date(internalAttemptInfo.startTime),
84-
endTime = new Date(internalAttemptInfo.endTime),
85-
duration =
86-
if (internalAttemptInfo.endTime > 0) {
87-
internalAttemptInfo.endTime - internalAttemptInfo.startTime
88-
} else {
89-
0
90-
},
91-
lastUpdated = new Date(internalAttemptInfo.lastUpdated),
92-
sparkUser = internalAttemptInfo.sparkUser,
93-
completed = internalAttemptInfo.completed,
94-
appSparkVersion = internalAttemptInfo.appSparkVersion
95-
)
96-
}
97-
)
98-
}
99-
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,27 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
2424

2525
import org.apache.spark.JobExecutionStatus
2626

27-
class ApplicationInfo private[spark](
28-
val id: String,
29-
val name: String,
30-
val coresGranted: Option[Int],
31-
val maxCores: Option[Int],
32-
val coresPerExecutor: Option[Int],
33-
val memoryPerExecutorMB: Option[Int],
34-
val attempts: Seq[ApplicationAttemptInfo])
27+
case class ApplicationInfo private[spark](
28+
id: String,
29+
name: String,
30+
coresGranted: Option[Int],
31+
maxCores: Option[Int],
32+
coresPerExecutor: Option[Int],
33+
memoryPerExecutorMB: Option[Int],
34+
attempts: Seq[ApplicationAttemptInfo])
3535

3636
@JsonIgnoreProperties(
3737
value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"),
3838
allowGetters = true)
39-
class ApplicationAttemptInfo private[spark](
40-
val attemptId: Option[String],
41-
val startTime: Date,
42-
val endTime: Date,
43-
val lastUpdated: Date,
44-
val duration: Long,
45-
val sparkUser: String,
46-
val completed: Boolean = false,
47-
val appSparkVersion: String) {
39+
case class ApplicationAttemptInfo private[spark](
40+
attemptId: Option[String],
41+
startTime: Date,
42+
endTime: Date,
43+
lastUpdated: Date,
44+
duration: Long,
45+
sparkUser: String,
46+
completed: Boolean = false,
47+
appSparkVersion: String) {
4848

4949
def getStartTimeEpoch: Long = startTime.getTime
5050

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
1919

2020
import java.io._
2121
import java.nio.charset.StandardCharsets
22+
import java.util.Date
2223
import java.util.concurrent.TimeUnit
2324
import java.util.zip.{ZipInputStream, ZipOutputStream}
2425

@@ -42,6 +43,7 @@ import org.apache.spark.io._
4243
import org.apache.spark.scheduler._
4344
import org.apache.spark.security.GroupMappingServiceProvider
4445
import org.apache.spark.status.AppStatusStore
46+
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
4547
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
4648

4749
class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
@@ -114,9 +116,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
114116
end: Long,
115117
lastMod: Long,
116118
user: String,
117-
completed: Boolean): ApplicationHistoryInfo = {
118-
ApplicationHistoryInfo(id, name,
119-
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed, "")))
119+
completed: Boolean): ApplicationInfo = {
120+
121+
val duration = if (end > 0) end - start else 0
122+
new ApplicationInfo(id, name, None, None, None, None,
123+
List(ApplicationAttemptInfo(None, new Date(start),
124+
new Date(end), new Date(lastMod), duration, user, completed, "")))
120125
}
121126

122127
// For completed files, lastUpdated would be lastModified time.
@@ -667,7 +672,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
667672
* }
668673
*/
669674
private def updateAndCheck(provider: FsHistoryProvider)
670-
(checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
675+
(checkFn: Seq[ApplicationInfo] => Unit): Unit = {
671676
provider.checkForLogs()
672677
provider.cleanLogs()
673678
checkFn(provider.getListing().toSeq)

0 commit comments

Comments
 (0)