Skip to content

Commit 3c96937

Browse files
mgaido91mridulm
authored andcommitted
[SPARK-24948][SHS] Delegate check access permissions to the file system
## What changes were proposed in this pull request? In `SparkHadoopUtil. checkAccessPermission`, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can). The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons. ## How was this patch tested? modified UTs Author: Marco Gaido <[email protected]> Closes apache#21895 from mgaido91/SPARK-24948.
1 parent 278984d commit 3c96937

File tree

4 files changed

+89
-140
lines changed

4 files changed

+89
-140
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import scala.util.control.NonFatal
3131
import com.google.common.primitives.Longs
3232
import org.apache.hadoop.conf.Configuration
3333
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
34-
import org.apache.hadoop.fs.permission.FsAction
3534
import org.apache.hadoop.mapred.JobConf
3635
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3736
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -367,28 +366,6 @@ class SparkHadoopUtil extends Logging {
367366
buffer.toString
368367
}
369368

370-
private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
371-
val perm = status.getPermission
372-
val ugi = UserGroupInformation.getCurrentUser
373-
374-
if (ugi.getShortUserName == status.getOwner) {
375-
if (perm.getUserAction.implies(mode)) {
376-
return true
377-
}
378-
} else if (ugi.getGroupNames.contains(status.getGroup)) {
379-
if (perm.getGroupAction.implies(mode)) {
380-
return true
381-
}
382-
} else if (perm.getOtherAction.implies(mode)) {
383-
return true
384-
}
385-
386-
logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
387-
s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
388-
s"${if (status.isDirectory) "d" else "-"}$perm")
389-
false
390-
}
391-
392369
def serialize(creds: Credentials): Array[Byte] = {
393370
val byteStream = new ByteArrayOutputStream
394371
val dataStream = new DataOutputStream(byteStream)

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@ import java.io.{File, FileNotFoundException, IOException}
2121
import java.nio.file.Files
2222
import java.nio.file.attribute.PosixFilePermissions
2323
import java.util.{Date, ServiceLoader}
24-
import java.util.concurrent.{ExecutorService, TimeUnit}
24+
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
2525
import java.util.zip.{ZipEntry, ZipOutputStream}
2626

2727
import scala.collection.JavaConverters._
2828
import scala.collection.mutable
29+
import scala.concurrent.ExecutionException
2930
import scala.io.Source
3031
import scala.util.Try
3132
import scala.xml.Node
3233

3334
import com.fasterxml.jackson.annotation.JsonIgnore
3435
import com.google.common.io.ByteStreams
3536
import com.google.common.util.concurrent.MoreExecutors
36-
import org.apache.hadoop.fs.{FileStatus, Path}
37-
import org.apache.hadoop.fs.permission.FsAction
37+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
3838
import org.apache.hadoop.hdfs.DistributedFileSystem
3939
import org.apache.hadoop.hdfs.protocol.HdfsConstants
4040
import org.apache.hadoop.security.AccessControlException
@@ -114,7 +114,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
114114
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
115115

116116
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
117-
private val fs = new Path(logDir).getFileSystem(hadoopConf)
117+
// Visible for testing
118+
private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf)
118119

119120
// Used by check event thread and clean log thread.
120121
// Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -161,6 +162,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
161162
new HistoryServerDiskManager(conf, path, listing, clock)
162163
}
163164

165+
private val blacklist = new ConcurrentHashMap[String, Long]
166+
167+
// Visible for testing
168+
private[history] def isBlacklisted(path: Path): Boolean = {
169+
blacklist.containsKey(path.getName)
170+
}
171+
172+
private def blacklist(path: Path): Unit = {
173+
blacklist.put(path.getName, clock.getTimeMillis())
174+
}
175+
176+
/**
177+
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
178+
*/
179+
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
180+
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
181+
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
182+
}
183+
164184
private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()
165185

166186
/**
@@ -418,7 +438,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
418438
// reading a garbage file is safe, but we would log an error which can be scary to
419439
// the end-user.
420440
!entry.getPath().getName().startsWith(".") &&
421-
SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
441+
!isBlacklisted(entry.getPath)
422442
}
423443
.filter { entry =>
424444
try {
@@ -461,32 +481,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
461481
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}")
462482
}
463483

464-
val tasks = updated.map { entry =>
484+
val tasks = updated.flatMap { entry =>
465485
try {
466-
replayExecutor.submit(new Runnable {
486+
val task: Future[Unit] = replayExecutor.submit(new Runnable {
467487
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
468-
})
488+
}, Unit)
489+
Some(task -> entry.getPath)
469490
} catch {
470491
// let the iteration over the updated entries break, since an exception on
471492
// replayExecutor.submit (..) indicates the ExecutorService is unable
472493
// to take any more submissions at this time
473494
case e: Exception =>
474495
logError(s"Exception while submitting event log for replay", e)
475-
null
496+
None
476497
}
477-
}.filter(_ != null)
498+
}
478499

479500
pendingReplayTasksCount.addAndGet(tasks.size)
480501

481502
// Wait for all tasks to finish. This makes sure that checkForLogs
482503
// is not scheduled again while some tasks are already running in
483504
// the replayExecutor.
484-
tasks.foreach { task =>
505+
tasks.foreach { case (task, path) =>
485506
try {
486507
task.get()
487508
} catch {
488509
case e: InterruptedException =>
489510
throw e
511+
case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] =>
512+
// We don't have read permissions on the log file
513+
logWarning(s"Unable to read log $path", e.getCause)
514+
blacklist(path)
490515
case e: Exception =>
491516
logError("Exception while merging application listings", e)
492517
} finally {
@@ -779,6 +804,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
779804
listing.delete(classOf[LogInfo], log.logPath)
780805
}
781806
}
807+
// Clean the blacklist from the expired entries.
808+
clearBlacklist(CLEAN_INTERVAL_S)
782809
}
783810

784811
/**
@@ -938,13 +965,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
938965
}
939966

940967
private def deleteLog(log: Path): Unit = {
941-
try {
942-
fs.delete(log, true)
943-
} catch {
944-
case _: AccessControlException =>
945-
logInfo(s"No permission to delete $log, ignoring.")
946-
case ioe: IOException =>
947-
logError(s"IOException in cleaning $log", ioe)
968+
if (isBlacklisted(log)) {
969+
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
970+
} else {
971+
try {
972+
fs.delete(log, true)
973+
} catch {
974+
case _: AccessControlException =>
975+
logInfo(s"No permission to delete $log, ignoring.")
976+
case ioe: IOException =>
977+
logError(s"IOException in cleaning $log", ioe)
978+
}
948979
}
949980
}
950981

core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala

Lines changed: 0 additions & 97 deletions
This file was deleted.

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ import scala.language.postfixOps
2929
import com.google.common.io.{ByteStreams, Files}
3030
import org.apache.hadoop.fs.{FileStatus, Path}
3131
import org.apache.hadoop.hdfs.DistributedFileSystem
32+
import org.apache.hadoop.security.AccessControlException
3233
import org.json4s.jackson.JsonMethods._
33-
import org.mockito.Matchers.any
34-
import org.mockito.Mockito.{mock, spy, verify}
34+
import org.mockito.ArgumentMatcher
35+
import org.mockito.Matchers.{any, argThat}
36+
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
3537
import org.scalatest.BeforeAndAfter
3638
import org.scalatest.Matchers
3739
import org.scalatest.concurrent.Eventually._
@@ -818,6 +820,42 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
818820
}
819821
}
820822

823+
test("SPARK-24948: blacklist files we don't have read permission on") {
824+
val clock = new ManualClock(1533132471)
825+
val provider = new FsHistoryProvider(createTestConf(), clock)
826+
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
827+
writeFile(accessDenied, true, None,
828+
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
829+
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
830+
writeFile(accessGranted, true, None,
831+
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
832+
SparkListenerApplicationEnd(5L))
833+
val mockedFs = spy(provider.fs)
834+
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
835+
argThat(new ArgumentMatcher[Path]() {
836+
override def matches(path: Any): Boolean = {
837+
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
838+
}
839+
}))
840+
val mockedProvider = spy(provider)
841+
when(mockedProvider.fs).thenReturn(mockedFs)
842+
updateAndCheck(mockedProvider) { list =>
843+
list.size should be(1)
844+
}
845+
writeFile(accessDenied, true, None,
846+
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None),
847+
SparkListenerApplicationEnd(5L))
848+
// Doing 2 times in order to check the blacklist filter too
849+
updateAndCheck(mockedProvider) { list =>
850+
list.size should be(1)
851+
}
852+
val accessDeniedPath = new Path(accessDenied.getPath)
853+
assert(mockedProvider.isBlacklisted(accessDeniedPath))
854+
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
855+
mockedProvider.cleanLogs()
856+
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
857+
}
858+
821859
/**
822860
* Asks the provider to check for logs and calls a function to perform checks on the updated
823861
* app list. Example:

0 commit comments

Comments
 (0)