Skip to content

Commit 2b0d036

Browse files
bsikandernoorul
authored andcommitted
refactor(jobserver): Remove redundant caching code (spark-jobserver#949)
Mesos cluster change[1] introduced this redundant caching and can be safely removed. Now, FileCacher is the only class responsible for caching jars. The discussion regarding the consent of stakeholders is here [2]. Also removing all the instances of GetBinaryContent because it was only used by redundant change. [1] https://github.com/spark-jobserver/spark-jobserver/pull/681/files [2] spark-jobserver@9084053#commitcomment-25033986
1 parent f9be1a4 commit 2b0d036

File tree

10 files changed

+3
-122
lines changed

10 files changed

+3
-122
lines changed

job-server/src/main/scala/spark/jobserver/JobCache.scala

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package spark.jobserver
22

3-
import java.io.{File, IOException}
3+
import java.io.File
44
import java.net.URL
5-
import java.nio.file.{Files, Paths}
65

76
import akka.actor.ActorRef
87
import akka.util.Timeout
@@ -48,25 +47,7 @@ class JobCacheImpl(maxEntries: Int,
4847
val jarPathReq =
4948
(dao ? JobDAOActor.GetBinaryPath(appName, BinaryType.Jar, uploadTime)).mapTo[JobDAOActor.BinaryPath]
5049
val jarPath = Await.result(jarPathReq, daoAskTimeout.duration).binPath
51-
logger.info("End of get jar path for app {}, uploadTime {}, jarPath {}", appName, uploadTime, jarPath)
52-
val jarFile = Paths.get(jarPath)
53-
if (!Files.exists(jarFile)) {
54-
logger.info("Local jar path {} not exist, fetch binary content from remote actor", jarPath)
55-
val jarBinaryReq = (dao ? JobDAOActor.GetBinaryContent(appName, BinaryType.Jar, uploadTime))
56-
.mapTo[JobDAOActor.BinaryContent]
57-
val binaryJar = Await.result(jarBinaryReq, daoAskTimeout.duration)
58-
logger.info("Writing {} bytes to file {}", binaryJar.content.size, jarFile.toAbsolutePath.toString)
59-
try {
60-
if (!Files.exists(jarFile.getParent)) {
61-
logger.info("Creating cache dir {}", jarFile.getParent.toAbsolutePath.toString)
62-
Files.createDirectories(jarFile.getParent)
63-
}
64-
Files.write(jarFile, binaryJar.content)
65-
} catch {
66-
case e: IOException => logger.error("Write to path {} error {}", jarPath: Any, e)
67-
}
68-
}
69-
val jarFilePath = jarFile.toAbsolutePath.toString
50+
val jarFilePath = new File(jarPath).getAbsolutePath()
7051
sparkContext.addJar(jarFilePath) // Adds jar for remote executors
7152
loader.addURL(new URL("file:" + jarFilePath)) // Now jar added for local loader
7253
val constructor = JarUtils.loadClassOrObject[spark.jobserver.api.SparkJobBase](classPath, loader)

job-server/src/main/scala/spark/jobserver/io/JobCassandraDAO.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package spark.jobserver.io
33
import java.io.File
44
import java.net.InetSocketAddress
55
import java.nio.ByteBuffer
6-
import java.nio.file.{Files, Paths}
76
import java.util.UUID
87

98
import scala.collection.convert.WrapAsJava
@@ -410,16 +409,4 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
410409

411410
session.execute(runningJobsView)
412411
}
413-
414-
override def getBinaryContent(appName: String, binaryType: BinaryType,
415-
uploadTime: DateTime): Array[Byte] = {
416-
val jarFile = new File(rootDir, createBinaryName(appName, binaryType, uploadTime))
417-
if (!jarFile.exists()) {
418-
val binBytes = fetchBinary(appName, binaryType, uploadTime)
419-
cacheBinary(appName, binaryType, uploadTime, binBytes)
420-
binBytes
421-
} else {
422-
Files.readAllBytes(Paths.get(jarFile.getAbsolutePath))
423-
}
424-
}
425412
}

job-server/src/main/scala/spark/jobserver/io/JobDAO.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,4 @@ trait JobDAO {
184184
* @return Some(lastUploadedTime) if the app exists and the list of times is nonempty, None otherwise
185185
*/
186186
def getLastUploadTimeAndType(appName: String): Option[(DateTime, BinaryType)]
187-
188-
/**
189-
* Fetch submited jar or egg content for remote driver and JobManagerActor to cache in local
190-
* @param appName
191-
* @param uploadTime
192-
* @return
193-
*/
194-
def getBinaryContent(appName: String,
195-
binaryType: BinaryType,
196-
uploadTime: DateTime): Array[Byte]
197187
}

job-server/src/main/scala/spark/jobserver/io/JobDAOActor.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ object JobDAOActor {
2828
case class GetBinaryPath(appName: String,
2929
binaryType: BinaryType,
3030
uploadTime: DateTime) extends JobDAORequest
31-
case class GetBinaryContent(appName: String,
32-
binaryType: BinaryType,
33-
uploadTime: DateTime) extends JobDAORequest
3431

3532
case class SaveJobInfo(jobInfo: JobInfo) extends JobDAORequest
3633
case class GetJobInfos(limit: Int) extends JobDAORequest
@@ -45,7 +42,6 @@ object JobDAOActor {
4542
sealed trait JobDAOResponse
4643
case class Apps(apps: Map[String, (BinaryType, DateTime)]) extends JobDAOResponse
4744
case class BinaryPath(binPath: String) extends JobDAOResponse
48-
case class BinaryContent(content: Array[Byte]) extends JobDAOResponse
4945
case class JobInfos(jobInfos: Seq[JobInfo]) extends JobDAOResponse
5046
case class JobConfig(jobConfig: Option[Config]) extends JobDAOResponse
5147
case class LastUploadTimeAndType(uploadTimeAndType: Option[(DateTime, BinaryType)]) extends JobDAOResponse
@@ -92,9 +88,6 @@ class JobDAOActor(dao: JobDAO) extends InstrumentedActor {
9288
case GetLastUploadTimeAndType(appName) =>
9389
sender() ! LastUploadTimeAndType(dao.getLastUploadTimeAndType(appName))
9490

95-
case GetBinaryContent(appName, binaryType, uploadTime) =>
96-
sender() ! BinaryContent(dao.getBinaryContent(appName, binaryType, uploadTime))
97-
9891
case CleanContextJobInfos(contextName, endTime) =>
9992
dao.cleanRunningJobInfosForContext(contextName, endTime)
10093
}

job-server/src/main/scala/spark/jobserver/io/JobFileDAO.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package spark.jobserver.io
22

33
import java.io._
4-
import java.nio.file.{Files, Paths}
54

65
import com.typesafe.config._
76
import org.joda.time.DateTime
@@ -238,11 +237,6 @@ class JobFileDAO(config: Config) extends JobDAO {
238237
ConfigFactory.parseString(in.readUTF)
239238
)
240239

241-
override def getBinaryContent(appName: String, binaryType: BinaryType,
242-
uploadTime: DateTime): Array[Byte] = {
243-
Files.readAllBytes(Paths.get(retrieveBinaryFile(appName, binaryType, uploadTime)))
244-
}
245-
246240
/**
247241
* Delete a jar.
248242
*

job-server/src/main/scala/spark/jobserver/io/JobSqlDAO.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package spark.jobserver.io
22

33
import java.io.File
4-
import java.nio.file.{Files, Paths}
54
import java.sql.{Blob, Timestamp}
65
import javax.sql.DataSource
76
import javax.sql.rowset.serial.SerialBlob
@@ -374,23 +373,4 @@ class JobSqlDAO(config: Config) extends JobDAO with FileCacher {
374373
r.map(jobInfoFromRow).headOption
375374
}
376375
}
377-
378-
/**
379-
* Fetch submited jar or egg content for remote driver and JobManagerActor to cache in local
380-
*
381-
* @param appName
382-
* @param uploadTime
383-
* @return
384-
*/
385-
override def getBinaryContent(appName: String, binaryType: BinaryType,
386-
uploadTime: DateTime): Array[Byte] = {
387-
val jarFile = new File(rootDir, createBinaryName(appName, binaryType, uploadTime))
388-
if (!jarFile.exists()) {
389-
val binBytes = Await.result(fetchBinary(appName, binaryType, uploadTime), 60.seconds)
390-
cacheBinary(appName, binaryType, uploadTime, binBytes)
391-
binBytes
392-
} else {
393-
Files.readAllBytes(Paths.get(jarFile.getAbsolutePath))
394-
}
395-
}
396376
}

job-server/src/test/scala/spark/jobserver/InMemoryDAO.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,7 @@ class InMemoryDAO extends JobDAO {
8686
Await.result(getApps, 60 seconds).get(appName).map(t => (t._2, t._1))
8787
}
8888

89-
override def getBinaryContent(appName: String, binaryType: BinaryType,
90-
uploadTime: DateTime): Array[Byte] = {
91-
binaries((appName, binaryType, uploadTime))
92-
}
93-
94-
override def deleteBinary(appName: String): Unit = {
89+
override def deleteBinary(appName: String): Unit = {
9590
binaries = binaries.filter { case ((name, _, _), _) => appName != name }
9691
}
9792
}

job-server/src/test/scala/spark/jobserver/io/JobCassandraDAOSpec.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,19 +137,6 @@ class JobCassandraDAOSpec extends TestJarFinder with FunSpecLike with Matchers w
137137
jarFile.length() should equal (retrieved.length())
138138
Files.toByteArray(jarFile) should equal(Files.toByteArray(retrieved))
139139
}
140-
141-
it("should retrieve the jar binary content for remote job manager") {
142-
// chack the pre-condition
143-
jarFile.exists() should equal (false)
144-
145-
// retrieve the jar content
146-
val jarBinaryContent: Array[Byte] = dao.getBinaryContent(jarInfo.appName, jarInfo.binaryType, jarInfo.uploadTime)
147-
148-
// test
149-
jarFile.exists() should equal (true)
150-
jarBinaryContent.length should equal (jarBytes.length)
151-
jarBinaryContent should equal(jarBytes)
152-
}
153140
}
154141

155142
describe("saveJobConfig() tests") {

job-server/src/test/scala/spark/jobserver/io/JobDAOActorSpec.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ object JobDAOActorSpec {
2222
val cleanupProbe = TestProbe()(system)
2323

2424
object DummyDao extends JobDAO{
25-
val jarContent = Array.empty[Byte]
2625

2726
override def saveBinary(appName: String, binaryType: BinaryType,
2827
uploadTime: DateTime, binaryBytes: Array[Byte]): Unit = {
@@ -38,14 +37,6 @@ object JobDAOActorSpec {
3837
"app2" -> (BinaryType.Egg, dtplus1)
3938
))
4039

41-
override def getBinaryContent(appName: String, binaryType: BinaryType,
42-
uploadTime: DateTime): Array[Byte] = {
43-
appName match {
44-
case "failOnThis" => throw new Exception("get binary content failure")
45-
case _ => jarContent
46-
}
47-
}
48-
4940
override def retrieveBinaryFile(appName: String,
5041
binaryType: BinaryType, uploadTime: DateTime): String = ???
5142

@@ -128,11 +119,6 @@ class JobDAOActorSpec extends TestKit(JobDAOActorSpec.system) with ImplicitSende
128119
expectMsg(JobInfos(Seq()))
129120
}
130121

131-
it("should get binary content") {
132-
daoActor ! GetBinaryContent("succeed", BinaryType.Jar, DateTime.now)
133-
expectMsg(BinaryContent(DummyDao.jarContent))
134-
}
135-
136122
it("should request jobs cleanup") {
137123
daoActor ! CleanContextJobInfos("context", DateTime.now())
138124
cleanupProbe.expectMsg("context")

job-server/src/test/scala/spark/jobserver/io/JobSqlDAOSpec.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,6 @@ class JobSqlDAOSpec extends JobSqlDAOSpecBase with TestJarFinder with FunSpecLik
133133
jarFile.exists() should equal (true)
134134
jarFilePath should equal (jarFile.getAbsolutePath)
135135
}
136-
137-
it("should retrieve the jar binary content for remote job manager") {
138-
// chack the pre-condition
139-
jarFile.exists() should equal (false)
140-
141-
// retrieve the jar content
142-
val jarBinaryContent: Array[Byte] = dao.getBinaryContent(jarInfo.appName, BinaryType.Jar, jarInfo.uploadTime)
143-
144-
// test
145-
jarFile.exists() should equal (true)
146-
jarBinaryContent should equal (jarBytes)
147-
}
148136
}
149137

150138
describe("save and get Python eggs") {

0 commit comments

Comments
 (0)