Skip to content

Commit 0350370

Browse files
krczvelvia
authored andcommitted
fix(job-server) store error class and stack trace in database (spark-jobserver#923)
* Implement storing and fetching error class and stack trace in DAOs * Create migrations adding columns in SQL databases * Modify tests to take care of new columns BREAKING CHANGES : * Schema change: new columns in SQL databases and Cassandra * File storage format incompatible * Stack trace returned as string instead of list of strings
1 parent 451f361 commit 0350370

16 files changed

+148
-121
lines changed

README.md

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -353,27 +353,19 @@ It is much more type safe, separates context configuration, job ID, named object
353353
Let's try running our sample job with an invalid configuration:
354354

355355
curl -i -d "bad.input=abc" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample"
356-
357356
HTTP/1.1 400 Bad Request
358-
Server: spray-can/1.2.0
359-
Date: Tue, 10 Jun 2014 22:07:18 GMT
357+
Server: spray-can/1.3.4
358+
Date: Thu, 14 Sep 2017 12:01:37 GMT
359+
Access-Control-Allow-Origin: *
360360
Content-Type: application/json; charset=UTF-8
361-
Content-Length: 929
361+
Content-Length: 738
362362

363363
{
364364
"status": "VALIDATION FAILED",
365365
"result": {
366-
"message": "No input.string config param",
366+
"message": "One(SparkJobInvalid(No input.string config param))",
367367
"errorClass": "java.lang.Throwable",
368-
"stack": ["spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:212)",
369-
"scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)",
370-
"scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)",
371-
"akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)",
372-
"akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)",
373-
"scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)",
374-
"scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)",
375-
"scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)",
376-
"scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)"]
368+
"stack": "java.lang.Throwable: One(SparkJobInvalid(No input.string config param))\n\tat spark.jobserver.JobManagerActor$$anonfun$getJobFuture$4.apply(JobManagerActor.scala:327)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:748)\n"
377369
}
378370
}
379371

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE JOBS ADD COLUMN ERROR_CLASS VARCHAR(255);
2+
ALTER TABLE JOBS ADD COLUMN ERROR_STACK_TRACE CLOB;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE JOBS ADD COLUMN ERROR_CLASS VARCHAR(255);
2+
ALTER TABLE JOBS ADD COLUMN ERROR_STACK_TRACE TEXT;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE JOBS ADD COLUMN ERROR_CLASS VARCHAR(255);
2+
ALTER TABLE JOBS ADD COLUMN ERROR_STACK_TRACE TEXT;

job-server/src/main/scala/spark/jobserver/JobStatusActor.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ package spark.jobserver
22

33
import scala.collection.mutable
44
import scala.util.Try
5-
65
import akka.actor.{ActorRef, Props}
76
import com.yammer.metrics.core.Meter
87
import org.joda.time.DateTime
98
import spark.jobserver.JobManagerActor.JobKilledException
109
import spark.jobserver.common.akka.InstrumentedActor
1110
import spark.jobserver.common.akka.metrics.YammerMetrics
12-
import spark.jobserver.io.{JobDAOActor, JobInfo}
11+
import spark.jobserver.io.{ErrorData, JobDAOActor, JobInfo}
1312

1413
object JobStatusActor {
1514
case class JobInit(jobInfo: JobInfo)
@@ -40,8 +39,9 @@ class JobStatusActor(jobDao: ActorRef) extends InstrumentedActor with YammerMetr
4039
override def postStop(): Unit = {
4140
val stopTime = DateTime.now()
4241
val stoppedInfos = infos.values.map { info =>
43-
info.copy(endTime = Some(stopTime),
44-
error = Some(new Exception(s"Context (${info.contextName}) for this job was terminated"))) }
42+
val errorData = ErrorData(s"Context (${info.contextName}) for this job was terminated", "", "")
43+
info.copy(endTime = Some(stopTime), error = Some(errorData))
44+
}
4545
stoppedInfos.foreach({info => jobDao ! JobDAOActor.SaveJobInfo(info)})
4646
}
4747

@@ -90,19 +90,20 @@ class JobStatusActor(jobDao: ActorRef) extends InstrumentedActor with YammerMetr
9090
case msg: JobValidationFailed =>
9191
processStatus(msg, "validation failed", remove = true) {
9292
case (info, msg: JobValidationFailed) =>
93-
info.copy(endTime = Some(msg.endTime), error = Some(msg.err))
93+
info.copy(endTime = Some(msg.endTime), error = Some(ErrorData(msg.err)))
9494
}
9595

9696
case msg: JobErroredOut =>
9797
processStatus(msg, "finished with an error", remove = true) {
9898
case (info, msg: JobErroredOut) =>
99-
info.copy(endTime = Some(msg.endTime), error = Some(msg.err))
99+
info.copy(endTime = Some(msg.endTime), error = Some(ErrorData(msg.err)))
100100
}
101101

102102
case msg: JobKilled =>
103103
processStatus(msg, "killed", remove = true) {
104104
case (info, msg: JobKilled) =>
105-
info.copy(endTime = Some(msg.endTime), error = Some(JobKilledException(info.jobId)))
105+
info.copy(endTime = Some(msg.endTime),
106+
error = Some(ErrorData(JobKilledException(info.jobId))))
106107
}
107108
}
108109

job-server/src/main/scala/spark/jobserver/WebApi.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import org.joda.time.DateTime
1616
import org.slf4j.LoggerFactory
1717
import spark.jobserver.JobManagerActor.JobKilledException
1818
import spark.jobserver.auth._
19-
import spark.jobserver.io.{BinaryType, JobInfo, JobStatus}
19+
import spark.jobserver.io.{BinaryType, ErrorData, JobInfo, JobStatus}
2020
import spark.jobserver.routes.DataRoutes
2121
import spark.jobserver.util.{SSLContextFactory, SparkJobUtils}
2222
import spray.http.HttpHeaders.`Content-Type`
@@ -26,6 +26,7 @@ import spray.io.ServerSSLEngineProvider
2626
import spray.json.DefaultJsonProtocol._
2727
import spray.routing.directives.AuthMagnet
2828
import spray.routing.{HttpService, RequestContext, Route}
29+
2930
import scala.concurrent.{Await, ExecutionContext, Future}
3031
import scala.util.Try
3132

@@ -74,27 +75,26 @@ object WebApi {
7475
}
7576

7677
def formatException(t: Throwable): Any =
77-
if (t.getCause != null) {
78-
Map("message" -> t.getMessage,
79-
"errorClass" -> t.getClass.getName,
80-
"cause" -> t.getCause.getMessage,
81-
"causingClass" -> t.getCause.getClass.getName,
82-
"stack" -> t.getCause.getStackTrace.map(_.toString).toSeq)
83-
} else {
84-
Map("message" -> t.getMessage,
85-
"errorClass" -> t.getClass.getName,
86-
"stack" -> t.getStackTrace.map(_.toString).toSeq)
87-
}
78+
Map("message" -> t.getMessage,
79+
"errorClass" -> t.getClass.getName,
80+
"stack" -> ErrorData.getStackTrace(t))
81+
82+
def formatException(t: ErrorData): Any = {
83+
Map("message" -> t.message,
84+
"errorClass" -> t.errorClass,
85+
"stack" -> t.stackTrace
86+
)
87+
}
8888

8989
def getJobReport(jobInfo: JobInfo, jobStarted: Boolean = false): Map[String, Any] = {
9090

9191
val statusMap = if (jobStarted) Map(StatusKey -> JobStatus.Started) else jobInfo match {
9292
case JobInfo(_, _, _, _, _, None, _) => Map(StatusKey -> JobStatus.Running)
93-
case JobInfo(_, _, _, _, _, _, Some(ex)) =>
94-
ex match {
95-
case e: JobKilledException => Map(StatusKey -> JobStatus.Killed, ResultKey -> formatException(ex))
96-
case _ => Map(StatusKey -> JobStatus.Error, ResultKey -> formatException(ex))
97-
}
93+
case JobInfo(_, _, _, _, _, _, Some(err))
94+
if err.errorClass == classOf[JobKilledException].getName =>
95+
Map(StatusKey -> JobStatus.Killed, ResultKey -> formatException(err))
96+
case JobInfo(_, _, _, _, _, _, Some(err)) =>
97+
Map(StatusKey -> JobStatus.Error, ResultKey -> formatException(err))
9898
case JobInfo(_, _, _, _, _, Some(e), None) => Map(StatusKey -> "FINISHED")
9999
}
100100
Map("jobId" -> jobInfo.jobId,

job-server/src/main/scala/spark/jobserver/io/JobCassandraDAO.scala

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ object Metadata {
4444
val StartDate = "start_date"
4545
val EndTime = "end_time"
4646
val Error = "error"
47+
val ErrorClass = "error_class"
48+
val ErrorStackTrace = "error_stack_trace"
4749

4850
}
4951

@@ -160,7 +162,8 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
160162
override def getJobInfos(limit: Int, status: Option[String] = None): Future[Seq[JobInfo]] = {
161163
import Metadata._
162164
val query = QB.select(
163-
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime, Error
165+
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime,
166+
Error, ErrorClass, ErrorStackTrace
164167
).from(JobsChronologicalTable).where(QB.eq(StartDate, today())).limit(limit)
165168

166169
session.executeAsync(query).map { rs =>
@@ -180,7 +183,8 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
180183
override def getRunningJobInfosForContextName(contextName: String): Future[Seq[JobInfo]] = {
181184
import Metadata._
182185
val query = QB.select(
183-
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime, Error
186+
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime,
187+
Error, ErrorClass, ErrorStackTrace
184188
).from(RunningJobsTable).where(QB.eq(ContextName, contextName))
185189

186190
session.executeAsync(query).map { rs =>
@@ -222,6 +226,11 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
222226
}
223227

224228
private def rowToJobInfo(row: Row): JobInfo = {
229+
val errorData = Option(row.getString(Error)).map { error =>
230+
val errorClass = if (row.isNull(ErrorClass)) "" else row.getString(ErrorClass)
231+
val stackTrace = if (row.isNull(ErrorStackTrace)) "" else row.getString(ErrorStackTrace)
232+
ErrorData(error, errorClass, stackTrace)
233+
}
225234
JobInfo(
226235
row.getUUID(Metadata.JobId).toString,
227236
row.getString(ContextName),
@@ -233,13 +242,14 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
233242
row.getString(Classpath),
234243
new DateTime(row.getTimestamp(StartTime)),
235244
Option(row.getTimestamp(EndTime)).map(new DateTime(_)),
236-
Option(row.getString(Error)).map(new Throwable(_))
245+
errorData
237246
)
238247
}
239248

240249
override def getJobInfo(jobId: String): Future[Option[JobInfo]] = {
241250
val query = QB.select(
242-
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime, Error
251+
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime,
252+
Error, ErrorClass, ErrorStackTrace
243253
).from(JobsTable).
244254
where(QB.eq(JobId, UUID.fromString(jobId))).
245255
limit(1)
@@ -252,9 +262,6 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
252262

253263
override def saveJobInfo(jobInfo: JobInfo): Unit = {
254264
val JobInfo(jobId, contextName, binaryInfo, classPath, startTime, endTime, error) = jobInfo
255-
val (_, endOpt, errOpt) = (startTime,
256-
endTime.map(e => e),
257-
error.map(_.getMessage))
258265

259266
val localDate: LocalDate = LocalDate.fromMillisSinceEpoch(jobInfo.startTime.getMillis)
260267

@@ -268,8 +275,13 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
268275
value(Classpath, classPath).
269276
value(StartTime, startTime.getMillis).
270277
value(StartDate, localDate)
271-
endOpt.foreach{e => insert.value(EndTime, e.getMillis)}
272-
errOpt.foreach(insert.value(Error, _))
278+
279+
endTime.foreach{e => insert.value(EndTime, e.getMillis)}
280+
error.foreach { err =>
281+
insert.value(Error, err.message)
282+
insert.value(ErrorClass, err.errorClass)
283+
insert.value(ErrorStackTrace, err.stackTrace)
284+
}
273285
insert
274286
}
275287

@@ -365,7 +377,9 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
365377
addColumn(StartTime, DataType.timestamp).
366378
addColumn(StartDate, DataType.date).
367379
addColumn(EndTime, DataType.timestamp).
368-
addColumn(Error, DataType.text)
380+
addColumn(Error, DataType.text).
381+
addColumn(ErrorClass, DataType.text).
382+
addColumn(ErrorStackTrace, DataType.text)
369383

370384
session.execute(jobsTableStatement)
371385

@@ -381,6 +395,8 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
381395
addColumn(Classpath, DataType.text).
382396
addColumn(EndTime, DataType.timestamp).
383397
addColumn(Error, DataType.text).
398+
addColumn(ErrorClass, DataType.text).
399+
addColumn(ErrorStackTrace, DataType.text).
384400
withOptions().clusteringOrder(StartTime, Direction.DESC)
385401

386402
session.execute(jobsChronologicalView)
@@ -396,7 +412,9 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
396412
addColumn(StartTime, DataType.timestamp).
397413
addColumn(StartDate, DataType.date).
398414
addColumn(EndTime, DataType.timestamp).
399-
addColumn(Error, DataType.text)
415+
addColumn(Error, DataType.text).
416+
addColumn(ErrorClass, DataType.text).
417+
addColumn(ErrorStackTrace, DataType.text)
400418

401419
session.execute(runningJobsView)
402420
}

job-server/src/main/scala/spark/jobserver/io/JobDAO.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package spark.jobserver.io
22

3+
import java.io.{PrintWriter, StringWriter}
4+
35
import com.typesafe.config._
46
import org.joda.time.{DateTime, Duration}
57
import org.slf4j.LoggerFactory
@@ -46,13 +48,26 @@ object BinaryType {
4648
// Uniquely identifies the binary used to run a job
4749
case class BinaryInfo(appName: String, binaryType: BinaryType, uploadTime: DateTime)
4850

51+
case class ErrorData(message: String, errorClass: String, stackTrace: String)
52+
53+
object ErrorData {
54+
def apply(ex: Throwable): ErrorData = {
55+
ErrorData(ex.getMessage, ex.getClass.getName, getStackTrace(ex))
56+
}
57+
58+
def getStackTrace(ex: Throwable): String = {
59+
val stackTrace = new StringWriter()
60+
ex.printStackTrace(new PrintWriter(stackTrace))
61+
stackTrace.toString
62+
}
63+
}
4964

5065
// Both a response and used to track job progress
5166
// NOTE: if endTime is not None, then the job has finished.
5267
case class JobInfo(jobId: String, contextName: String,
5368
binaryInfo: BinaryInfo, classPath: String,
5469
startTime: DateTime, endTime: Option[DateTime],
55-
error: Option[Throwable]) {
70+
error: Option[ErrorData]) {
5671
def jobLengthMillis: Option[Long] = endTime.map { end => new Duration(startTime, end).getMillis }
5772

5873
def isRunning: Boolean = endTime.isEmpty
@@ -143,7 +158,7 @@ trait JobDAO {
143158
for (info <- infos) {
144159
val updatedInfo = info.copy(
145160
endTime = Some(endTime),
146-
error = Some(JobKilledException(info.jobId)))
161+
error = Some(ErrorData(JobKilledException(info.jobId))))
147162
saveJobInfo(jobInfo = updatedInfo)
148163
}
149164
}

job-server/src/main/scala/spark/jobserver/io/JobFileDAO.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,16 @@ class JobFileDAO(config: Config) extends JobDAO {
171171
out.writeLong(jobInfo.startTime.getMillis)
172172
val time = if (jobInfo.endTime.isEmpty) jobInfo.startTime.getMillis else jobInfo.endTime.get.getMillis
173173
out.writeLong(time)
174-
val errorStr = if (jobInfo.error.isEmpty) "" else jobInfo.error.get.toString
175-
out.writeUTF(errorStr)
174+
out.writeUTF(jobInfo.error.map(_.message).getOrElse(""))
175+
out.writeUTF(jobInfo.error.map(_.errorClass).getOrElse(""))
176+
out.writeUTF(jobInfo.error.map(_.stackTrace).getOrElse(""))
176177
}
177178

178179
private def readError(in: DataInputStream) = {
179-
val error = in.readUTF()
180-
if (error == "") None else Some(new Throwable(error))
180+
val error = Some(in.readUTF()).filter(_.isEmpty)
181+
val errorClass = in.readUTF()
182+
val errorStackTrace = in.readUTF()
183+
error.map(ErrorData(_, errorClass, errorStackTrace))
181184
}
182185

183186
private def readJobInfo(in: DataInputStream) = JobInfo(

0 commit comments

Comments
 (0)