Skip to content

Commit c9269b9

Browse files
msohnnoorul
authored andcommitted
fix(job-server): JobSQLDAO: Move blobs to a separate table (spark-jobserver#876) (spark-jobserver#932)
* Fix postgres database migration for schema version v0_7_4 Add missing quotes for table and column identifiers. Without these quotes the migration v0_7_4 fails on postgres. Also quote table and column identifiers for h2 and mysql. * fix(job-server): JobSQLDAO: Move blobs to a separate table (spark-jobserver#876) * Separate the BINARIES table into BINARIES and BINARIES_CONTENTS, so that only the BINARIES_CONTENTS table contains the "BINARY" (Blob) column. Both tables can be joined on the same BIN_ID. * Optimize the JobDAO.getLastUploadTimeAndType(...) method - make it abstract and implement an optimized version in the JobFileDAO and JobSqlDAO. This change is based on previous work done by Michał Januszewski in pull request spark-jobserver#886.
1 parent 1d1169b commit c9269b9

File tree

14 files changed

+347
-68
lines changed

14 files changed

+347
-68
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
ALTER TABLE JOBS ADD COLUMN ERROR_CLASS VARCHAR(255);
2-
ALTER TABLE JOBS ADD COLUMN ERROR_STACK_TRACE CLOB;
1+
ALTER TABLE "JOBS" ADD COLUMN "ERROR_CLASS" VARCHAR(255);
2+
ALTER TABLE "JOBS" ADD COLUMN "ERROR_STACK_TRACE" CLOB;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
ALTER TABLE JOBS ADD COLUMN ERROR_CLASS VARCHAR(255);
2-
ALTER TABLE JOBS ADD COLUMN ERROR_STACK_TRACE TEXT;
1+
ALTER TABLE `JOBS` ADD COLUMN `ERROR_CLASS` VARCHAR(255);
2+
ALTER TABLE `JOBS` ADD COLUMN `ERROR_STACK_TRACE` TEXT;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
ALTER TABLE JOBS ADD COLUMN ERROR_CLASS VARCHAR(255);
2-
ALTER TABLE JOBS ADD COLUMN ERROR_STACK_TRACE TEXT;
1+
ALTER TABLE "JOBS" ADD COLUMN "ERROR_CLASS" VARCHAR(255);
2+
ALTER TABLE "JOBS" ADD COLUMN "ERROR_STACK_TRACE" TEXT;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package db.h2.migration.V0_7_5
2+
3+
import java.sql.Blob
4+
import java.sql.Connection
5+
import javax.sql.rowset.serial.SerialBlob
6+
7+
import scala.concurrent.Await
8+
import scala.concurrent.ExecutionContext.Implicits.global
9+
import scala.concurrent.duration.DurationInt
10+
import scala.util.control.NonFatal
11+
12+
import db.migration.V0_7_5.Migration
13+
import org.flywaydb.core.api.migration.jdbc.JdbcMigration
14+
import org.slf4j.LoggerFactory
15+
import slick.dbio.DBIO
16+
import slick.dbio.Effect
17+
import slick.dbio.NoStream
18+
import slick.driver.H2Driver.api.actionBasedSQLInterpolation
19+
import slick.jdbc.GetResult
20+
import slick.jdbc.PositionedParameters
21+
import slick.jdbc.SetParameter
22+
import slick.profile.SqlAction
23+
import spark.jobserver.slick.unmanaged.UnmanagedDatabase
24+
25+
class V0_7_5__Migrate_Blobs extends Migration {
26+
val logger = LoggerFactory.getLogger(getClass)
27+
28+
protected def insertBlob(id: Int, blob: SerialBlob): SqlAction[Int, NoStream, Effect] = {
29+
sqlu"""INSERT INTO "BINARIES_CONTENTS" ("BIN_ID", "BINARY") VALUES (${id}, ${blob})"""
30+
}
31+
val createContentsTable = sqlu"""CREATE TABLE "BINARIES_CONTENTS" (
32+
"BIN_ID" BIGINT NOT NULL PRIMARY KEY,
33+
"BINARY" BLOB
34+
);"""
35+
val getBinaryContents = sql"""SELECT "BIN_ID", "BINARY" FROM "BINARIES"""".as[BinaryContent]
36+
val dropColumn = sqlu"""ALTER TABLE "BINARIES" DROP COLUMN "BINARY""""
37+
}
38+
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package db.migration.V0_7_5
2+
3+
import java.sql.Blob
4+
import java.sql.Connection
5+
import javax.sql.rowset.serial.SerialBlob
6+
7+
import scala.concurrent.Await
8+
import scala.concurrent.duration.DurationInt
9+
import scala.concurrent.ExecutionContext.Implicits.global
10+
import scala.util.control.NonFatal
11+
12+
import org.flywaydb.core.api.migration.jdbc.JdbcMigration
13+
import org.slf4j.Logger
14+
import slick.dbio.DBIO
15+
import slick.dbio.Effect
16+
import slick.dbio.NoStream
17+
import slick.jdbc.GetResult
18+
import slick.jdbc.PositionedParameters
19+
import slick.jdbc.SetParameter
20+
import slick.profile.SqlAction
21+
import spark.jobserver.slick.unmanaged.UnmanagedDatabase
22+
import slick.dbio.DBIOAction
23+
import slick.dbio.Streaming
24+
25+
trait Migration extends JdbcMigration {
26+
protected val Timeout = 10 minutes
27+
protected val logger: Logger
28+
29+
protected case class BinaryContent(id: Int, binary: Blob)
30+
31+
protected implicit object SetSerialBlob extends SetParameter[SerialBlob] {
32+
def apply(v: SerialBlob, pp: PositionedParameters) {
33+
pp.setBlob(v)
34+
}
35+
}
36+
protected def insertBlob(id: Int, blob: SerialBlob): SqlAction[Int, NoStream, Effect]
37+
38+
protected val createContentsTable: SqlAction[Int, NoStream, Effect]
39+
40+
protected implicit val getBinaryResult = GetResult[BinaryContent](
41+
r => BinaryContent(r.nextInt(), r.nextBlob()))
42+
protected val getBinaryContents: DBIOAction[Seq[(BinaryContent)], Streaming[BinaryContent], Effect]
43+
44+
protected val dropColumn: SqlAction[Int, NoStream, Effect]
45+
46+
protected def logErrors = PartialFunction[Throwable, Unit] {
47+
case e: Throwable => logger.error(e.getMessage, e)
48+
}
49+
50+
protected def insertBlob(db: UnmanagedDatabase, b: BinaryContent): Unit = {
51+
val blob = new SerialBlob(b.binary.getBytes(1, b.binary.length().toInt))
52+
Await.ready(db.run(insertBlob(b.id, blob)).recover{logErrors}, Timeout)
53+
}
54+
55+
def migrate(c: Connection): Unit = {
56+
val db = new UnmanagedDatabase(c)
57+
c.setAutoCommit(false);
58+
try {
59+
Await.ready(
60+
for {
61+
_ <- db.run(createContentsTable)
62+
_ <- db.stream(getBinaryContents).foreach(b => insertBlob(db, b))
63+
_ <- db.run(dropColumn)
64+
} yield Unit, Timeout
65+
).recover{logErrors}
66+
c.commit()
67+
} catch {
68+
case NonFatal(e) => { c.rollback() }
69+
}
70+
}
71+
}
72+
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package db.mysql.migration.V0_7_5
2+
3+
import java.sql.Blob
4+
import java.sql.Connection
5+
import javax.sql.rowset.serial.SerialBlob
6+
7+
import scala.concurrent.Await
8+
import scala.concurrent.ExecutionContext.Implicits.global
9+
import scala.concurrent.duration.DurationInt
10+
import scala.util.control.NonFatal
11+
12+
import db.migration.V0_7_5.Migration
13+
import org.flywaydb.core.api.migration.jdbc.JdbcMigration
14+
import org.slf4j.LoggerFactory
15+
import slick.dbio.DBIO
16+
import slick.dbio.Effect
17+
import slick.dbio.NoStream
18+
import slick.driver.MySQLDriver.api.actionBasedSQLInterpolation
19+
import slick.jdbc.GetResult
20+
import slick.jdbc.PositionedParameters
21+
import slick.jdbc.SetParameter
22+
import slick.profile.SqlAction
23+
import spark.jobserver.slick.unmanaged.UnmanagedDatabase
24+
25+
class V0_7_5__Migrate_Blobs extends Migration {
26+
val logger = LoggerFactory.getLogger(getClass)
27+
28+
protected def insertBlob(id: Int, blob: SerialBlob): SqlAction[Int, NoStream, Effect] = {
29+
sqlu"""INSERT INTO `BINARIES_CONTENTS` (`BIN_ID`, `BINARY`) VALUES (${id}, ${blob})"""
30+
}
31+
val createContentsTable = sqlu"""CREATE TABLE `BINARIES_CONTENTS` (
32+
`BIN_ID` SERIAL NOT NULL PRIMARY KEY,
33+
`BINARY` LONGBLOB
34+
);"""
35+
val getBinaryContents = sql"""SELECT `BIN_ID`, `BINARY` FROM `BINARIES`""".as[BinaryContent]
36+
val dropColumn = sqlu"""ALTER TABLE `BINARIES` DROP COLUMN `BINARY`"""
37+
}
38+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package db.postgresql.migration.V0_7_5
2+
3+
import java.sql.Blob
4+
import java.sql.Connection
5+
import javax.sql.rowset.serial.SerialBlob
6+
7+
import scala.concurrent.Await
8+
import scala.concurrent.ExecutionContext.Implicits.global
9+
import scala.concurrent.duration.DurationInt
10+
import scala.util.control.NonFatal
11+
12+
import db.migration.V0_7_5.Migration
13+
import org.flywaydb.core.api.migration.jdbc.JdbcMigration
14+
import org.slf4j.LoggerFactory
15+
import slick.dbio.DBIO
16+
import slick.dbio.Effect
17+
import slick.dbio.NoStream
18+
import slick.driver.PostgresDriver.api.actionBasedSQLInterpolation
19+
import slick.jdbc.GetResult
20+
import slick.jdbc.PositionedParameters
21+
import slick.jdbc.SetParameter
22+
import slick.profile.SqlAction
23+
import spark.jobserver.slick.unmanaged.UnmanagedDatabase
24+
25+
class V0_7_5__Migrate_Blobs extends Migration {
26+
val logger = LoggerFactory.getLogger(getClass)
27+
28+
protected def insertBlob(id: Int, blob: SerialBlob): SqlAction[Int, NoStream, Effect] = {
29+
sqlu"""INSERT INTO "BINARIES_CONTENTS" ("BIN_ID", "BINARY") VALUES (${id}, ${blob})"""
30+
}
31+
val createContentsTable = sqlu"""CREATE TABLE "BINARIES_CONTENTS" (
32+
"BIN_ID" SERIAL NOT NULL PRIMARY KEY,
33+
"BINARY" OID
34+
);"""
35+
val getBinaryContents = sql"""SELECT "BIN_ID", "BINARY" FROM "BINARIES"""".as[BinaryContent]
36+
val dropColumn = sqlu"""ALTER TABLE "BINARIES" DROP COLUMN "BINARY""""
37+
38+
override def migrate(c: Connection): Unit = {
39+
val createTriggerBinariesContents = sqlu"""CREATE TRIGGER t_binary
40+
BEFORE UPDATE OR DELETE ON "BINARIES_CONTENTS"
41+
FOR EACH ROW EXECUTE PROCEDURE lo_manage("BINARY")"""
42+
val dropTriggerBinaries = sqlu"""DROP TRIGGER t_binary ON "BINARIES""""
43+
val db = new UnmanagedDatabase(c)
44+
c.setAutoCommit(false);
45+
try {
46+
Await.ready(
47+
for {
48+
_ <- db.run(createContentsTable)
49+
_ <- db.run(createTriggerBinariesContents)
50+
_ <- db.stream(getBinaryContents).foreach(b => insertBlob(db, b))
51+
_ <- db.run(dropColumn)
52+
_ <- db.run(dropTriggerBinaries)
53+
} yield Unit, Timeout
54+
).recover{logErrors}
55+
c.commit()
56+
} catch {
57+
case NonFatal(e) => { c.rollback() }
58+
}
59+
}
60+
}
61+

job-server/src/main/scala/spark/jobserver/io/JobCassandraDAO.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,23 @@ import java.nio.ByteBuffer
66
import java.nio.file.{Files, Paths}
77
import java.util.UUID
88

9-
import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder => QB}
10-
import com.datastax.driver.core.querybuilder.QueryBuilder._
9+
import scala.collection.convert.WrapAsJava
10+
import scala.collection.convert.Wrappers.JListWrapper
11+
import scala.concurrent.{Await, Future}
12+
import scala.concurrent.duration.DurationInt
13+
import scala.util.Try
14+
1115
import com.datastax.driver.core._
12-
import com.datastax.driver.core.schemabuilder.SchemaBuilder.Direction
16+
import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder => QB }
17+
import com.datastax.driver.core.querybuilder.QueryBuilder._
1318
import com.datastax.driver.core.schemabuilder.{Create, SchemaBuilder}
19+
import com.datastax.driver.core.schemabuilder.SchemaBuilder.Direction
1420
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
1521
import org.joda.time.DateTime
1622
import org.slf4j.LoggerFactory
1723

18-
import scala.concurrent.duration._
19-
import scala.collection.convert.WrapAsJava
20-
import scala.collection.convert.Wrappers.JListWrapper
21-
import scala.concurrent.{Await, Future}
2224
import spark.jobserver.cassandra.Cassandra.Resultset.toFuture
2325

24-
import scala.util.Try
25-
2626
object Metadata {
2727
val BinariesTable = "binaries"
2828
val BinariesChronologicalTable = "binaries_chronological"
@@ -160,7 +160,6 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
160160
}
161161

162162
override def getJobInfos(limit: Int, status: Option[String] = None): Future[Seq[JobInfo]] = {
163-
import Metadata._
164163
val query = QB.select(
165164
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime,
166165
Error, ErrorClass, ErrorStackTrace
@@ -181,7 +180,6 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
181180
}
182181

183182
override def getRunningJobInfosForContextName(contextName: String): Future[Seq[JobInfo]] = {
184-
import Metadata._
185183
val query = QB.select(
186184
JobId, ContextName, AppName, BType, UploadTime, Classpath, StartTime, EndTime,
187185
Error, ErrorClass, ErrorStackTrace
@@ -307,6 +305,11 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
307305
}
308306
}
309307

308+
override def getLastUploadTimeAndType(appName: String): Option[(DateTime, BinaryType)] = {
309+
// Copied from the base JobDAO, feel free to optimize this (having in mind this specific storage type)
310+
Await.result(getApps, 60 seconds).get(appName).map(t => (t._2, t._1))
311+
}
312+
310313
private def setup(config: Config): Session = {
311314
val cassandraConfig = config.getConfig("spark.jobserver.cassandra")
312315
val hosts = JListWrapper(cassandraConfig.getStringList("hosts"))
@@ -337,7 +340,6 @@ class JobCassandraDAO(config: Config) extends JobDAO with FileCacher {
337340
}
338341

339342
private def setupSchema() = {
340-
import Metadata._
341343

342344
val binariesTable: Create = SchemaBuilder.createTable(BinariesTable).ifNotExists.
343345
addPartitionKey(AppName, DataType.text).

job-server/src/main/scala/spark/jobserver/io/JobDAO.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ trait JobDAO {
104104
* @param appName
105105
*/
106106
def deleteBinary(appName: String)
107+
107108
/**
108109
* Return all applications name and their last upload times.
109110
*
@@ -182,8 +183,7 @@ trait JobDAO {
182183
* Returns the last upload time for a given app name.
183184
* @return Some(lastUploadedTime) if the app exists and the list of times is nonempty, None otherwise
184185
*/
185-
def getLastUploadTimeAndType(appName: String): Option[(DateTime, BinaryType)] =
186-
Await.result(getApps, 60 seconds).get(appName).map(t => (t._2, t._1))
186+
def getLastUploadTimeAndType(appName: String): Option[(DateTime, BinaryType)]
187187

188188
/**
189189
* Fetch submited jar or egg content for remote driver and JobManagerActor to cache in local

job-server/src/main/scala/spark/jobserver/io/JobFileDAO.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package spark.jobserver.io
22

3-
import com.typesafe.config._
43
import java.io._
54
import java.nio.file.{Files, Paths}
65

6+
import com.typesafe.config._
77
import org.joda.time.DateTime
88
import org.slf4j.LoggerFactory
99

1010
import scala.collection.mutable
11-
import scala.concurrent.Future
1211
import scala.concurrent.ExecutionContext.Implicits.global
12+
import scala.concurrent.Future
1313

1414
/**
1515
* NB This class does NOT support persisting binary types
@@ -224,6 +224,10 @@ class JobFileDAO(config: Config) extends JobDAO {
224224
configs.get(jobId)
225225
}
226226

227+
override def getLastUploadTimeAndType(appName: String): Option[(DateTime, BinaryType)] = {
228+
apps(appName).headOption.map(uploadTime => (uploadTime, BinaryType.Jar))
229+
}
230+
227231
private def writeJobConfig(out: DataOutputStream, jobId: String, jobConfig: Config) {
228232
out.writeUTF(jobId)
229233
out.writeUTF(jobConfig.root().render(ConfigRenderOptions.concise()))

0 commit comments

Comments
 (0)