Skip to content

Commit 6be0a20

Browse files
ecandreevnoorul
authored andcommitted
fix(job-server): JobSQLDAO: Prevent saving the same binary multiple (spark-jobserver#960)
times Save hash of the binary in extra column in BINARIES and use it as a primary key in BINARIES_CONTENTS table instead of binary id.
1 parent fd3d160 commit 6be0a20

File tree

5 files changed

+51
-16
lines changed

5 files changed

+51
-16
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
ALTER TABLE "BINARIES" ADD COLUMN "BIN_HASH" BINARY(32);
2+
UPDATE "BINARIES" SET "BIN_HASH" = (SELECT (CAST("BIN_ID" AS BINARY(32))));
3+
ALTER TABLE "BINARIES" ALTER COLUMN "BIN_HASH" SET NOT NULL;
4+
ALTER TABLE "BINARIES_CONTENTS" ALTER COLUMN "BIN_ID" RENAME TO "BIN_HASH";
5+
ALTER TABLE "BINARIES_CONTENTS" ALTER COLUMN "BIN_HASH" BINARY(32) NOT NULL;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE `BINARIES` ADD COLUMN `BIN_HASH` VARBINARY(32) NOT NULL;
2+
UPDATE `BINARIES` SET `BIN_HASH` = (SELECT CAST(`BIN_ID` AS BINARY(32)));
3+
ALTER TABLE `BINARIES_CONTENTS` CHANGE COLUMN `BIN_ID` `BIN_HASH` VARBINARY(32) NOT NULL;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
ALTER TABLE "BINARIES" ADD COLUMN "BIN_HASH" BYTEA;
2+
UPDATE "BINARIES" SET "BIN_HASH" = (SELECT "BIN_ID"::VARCHAR(32)::BYTEA);
3+
ALTER TABLE "BINARIES" ALTER COLUMN "BIN_HASH" SET NOT NULL;
4+
ALTER TABLE "BINARIES_CONTENTS" ADD COLUMN "BIN_HASH" BYTEA;
5+
UPDATE "BINARIES_CONTENTS" SET "BIN_HASH" = (SELECT "BIN_ID"::VARCHAR(32)::BYTEA);
6+
ALTER TABLE "BINARIES_CONTENTS" ALTER COLUMN "BIN_HASH" SET NOT NULL;
7+
ALTER TABLE "BINARIES_CONTENTS" DROP COLUMN "BIN_ID";
8+
ALTER TABLE "BINARIES_CONTENTS" ADD PRIMARY KEY ("BIN_HASH");

job-server/src/main/scala/spark/jobserver/io/JobSqlDAO.scala

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,19 @@ class JobSqlDAO(config: Config) extends JobDAO with FileCacher {
4141

4242
// Definition of the tables
4343
//scalastyle:off
44-
class Binaries(tag: Tag) extends Table[(Int, String, String, Timestamp)](tag, "BINARIES") {
44+
class Binaries(tag: Tag) extends Table[(Int, String, String, Timestamp, Array[Byte])](tag, "BINARIES") {
4545
def binId = column[Int]("BIN_ID", O.PrimaryKey, O.AutoInc)
4646
def appName = column[String]("APP_NAME")
4747
def binaryType = column[String]("BINARY_TYPE")
4848
def uploadTime = column[Timestamp]("UPLOAD_TIME")
49-
def * = (binId, appName, binaryType, uploadTime)
49+
def binHash = column[Array[Byte]]("BIN_HASH")
50+
def * = (binId, appName, binaryType, uploadTime, binHash)
5051
}
5152

52-
class BinariesContents(tag: Tag) extends Table[(Int, Blob)](tag, "BINARIES_CONTENTS") {
53-
def binId = column[Int]("BIN_ID", O.PrimaryKey)
53+
class BinariesContents(tag: Tag) extends Table[(Array[Byte], Blob)](tag, "BINARIES_CONTENTS") {
54+
def binHash = column[Array[Byte]]("BIN_HASH", O.PrimaryKey)
5455
def binary = column[Blob]("BINARY")
55-
def * = (binId, binary)
56+
def * = (binHash, binary)
5657
}
5758

5859
val binaries = TableQuery[Binaries]
@@ -183,12 +184,24 @@ class JobSqlDAO(config: Config) extends JobDAO with FileCacher {
183184
Await.result(db.run(query), 60 seconds)
184185
}
185186

187+
private def calculateBinaryHash(binBytes: Array[Byte]): Array[Byte] = {
188+
import java.security.MessageDigest
189+
val md = MessageDigest.getInstance("SHA-256");
190+
md.digest(binBytes)
191+
}
192+
186193
// Insert JarInfo and its jar into db and return the primary key associated with that row
187194
private def insertBinaryInfo(binInfo: BinaryInfo, binBytes: Array[Byte]): Future[Int] = {
195+
val hash = calculateBinaryHash(binBytes);
188196
val dbAction = (for {
189197
binId <- binaries.returning(binaries.map(_.binId)) +=
190-
(-1, binInfo.appName, binInfo.binaryType.name, convertDateJodaToSql(binInfo.uploadTime))
191-
_ <- binariesContents.map(bc => bc.*) += (binId, new SerialBlob(binBytes))
198+
(-1, binInfo.appName, binInfo.binaryType.name, convertDateJodaToSql(binInfo.uploadTime), hash)
199+
_ <- binariesContents.filter(_.binHash === hash).result.headOption.flatMap{
200+
case None =>
201+
binariesContents.map(bc => bc.*) += (hash, new SerialBlob(binBytes))
202+
case Some(bc) =>
203+
DBIO.successful(None) // no-op
204+
}
192205
} yield binId).transactionally
193206
db.run(dbAction)
194207
}
@@ -200,9 +213,15 @@ class JobSqlDAO(config: Config) extends JobDAO with FileCacher {
200213

201214
private def deleteBinaryInfo(appName: String): Future[Int] = {
202215
val deleteBinary = binaries.filter(_.appName === appName)
203-
val deleteBinariesContents = binariesContents.filter(_.binId in deleteBinary.map(_.binId))
216+
val hashUsed = binaries.filter(_.binHash in deleteBinary.map(_.binHash)).filter(_.appName =!= appName)
217+
val deleteBinariesContents = binariesContents.filter(_.binHash in deleteBinary.map(_.binHash))
204218
val dbAction = (for {
205-
_ <- deleteBinariesContents.delete
219+
_ <- hashUsed.result.headOption.flatMap{
220+
case None =>
221+
deleteBinariesContents.delete
222+
case Some(bc) =>
223+
DBIO.successful(None) // no-op
224+
}
206225
b <- deleteBinary.delete
207226
} yield b).transactionally
208227
db.run(dbAction).recover(logDeleteErrors)
@@ -231,7 +250,7 @@ class JobSqlDAO(config: Config) extends JobDAO with FileCacher {
231250
b <- binaries.filter { bin =>
232251
bin.appName === appName && bin.uploadTime === dateTime && bin.binaryType === binaryType.name
233252
}
234-
bc <- binariesContents if b.binId === bc.binId
253+
bc <- binariesContents if b.binHash === bc.binHash
235254
} yield bc.binary
236255
val dbAction = query.result
237256
db.run(dbAction.head.map { b => b.getBytes(1, b.length.toInt) }.transactionally)

job-server/src/test/scala/spark/jobserver/io/FlywayMigrationSpec.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ class FlywayMigrationSpec extends FunSpec with Matchers {
8080
val descBinariesIt = ResultSetIterator(descBinaries){ r: ResultSet =>
8181
r.getString("FIELD")
8282
}
83-
descBinariesIt.toList should be (List("BIN_ID", "APP_NAME", "UPLOAD_TIME", "BINARY_TYPE"))
83+
descBinariesIt.toList should be (List("BIN_ID", "APP_NAME", "UPLOAD_TIME", "BINARY_TYPE", "BIN_HASH"))
8484

8585
val descBinariesContents = sqlConn.createStatement().executeQuery("SHOW COLUMNS FROM BINARIES_CONTENTS")
8686
val descBinariesContentsIt = ResultSetIterator(descBinariesContents){ r: ResultSet =>
8787
r.getString("FIELD")
8888
}
89-
descBinariesContentsIt.toList should be (List("BIN_ID", "BINARY"))
89+
descBinariesContentsIt.toList should be (List("BIN_HASH", "BINARY"))
9090

9191
val descJobs = sqlConn.createStatement().executeQuery("SHOW COLUMNS FROM JOBS")
9292
val descJobsIt = ResultSetIterator(descJobs){ r: ResultSet =>
@@ -115,14 +115,14 @@ class FlywayMigrationSpec extends FunSpec with Matchers {
115115
))
116116

117117
val migratedBinariesContents =
118-
sqlConn.createStatement().executeQuery("SELECT BIN_ID, BINARY FROM BINARIES_CONTENTS")
118+
sqlConn.createStatement().executeQuery("SELECT BIN_HASH, BINARY FROM BINARIES_CONTENTS")
119119
val migratedBinariesContentsIt = ResultSetIterator(migratedBinariesContents){ r: ResultSet =>
120-
(r.getInt("BIN_ID"),
120+
(r.getString("BIN_HASH"),
121121
r.getString("BINARY"))
122122
}
123123
migratedBinariesContentsIt.toList should be (List(
124-
(1, "deadbeef"),
125-
(2, "beadface")
124+
("0000000000000001", "deadbeef"),
125+
("0000000000000002", "beadface")
126126
))
127127
}
128128
}

0 commit comments

Comments
 (0)