@@ -21,13 +21,16 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
2121import java .net .InetAddress
2222import java .security .PrivilegedExceptionAction
2323import java .text .DateFormat
24- import java .util .{Date , Locale }
24+ import java .util .{Arrays , Date , Locale }
2525
2626import scala .collection .JavaConverters ._
27+ import scala .collection .immutable .Map
2728import scala .collection .mutable
2829import scala .collection .mutable .HashMap
2930import scala .language .existentials
31+ import scala .util .control .NonFatal
3032
33+ import com .google .common .primitives .Longs
3134import org .apache .hadoop .conf .Configuration
3235import org .apache .hadoop .fs ._
3336import org .apache .hadoop .hdfs .DistributedFileSystem .HdfsDataOutputStreamBuilder
@@ -225,6 +228,27 @@ private[spark] class SparkHadoopUtil extends Logging {
225228 if (baseStatus.isDirectory) recurse(baseStatus) else Seq (baseStatus)
226229 }
227230
231+ /**
232+ * [LYFT-INTERNAL] Removed from OSS Spark: https://github.com/apache/spark/pull/40942/
233+ */
234+ def listLeafDirStatuses (fs : FileSystem , basePath : Path ): Seq [FileStatus ] = {
235+ listLeafDirStatuses(fs, fs.getFileStatus(basePath))
236+ }
237+
238+ /**
239+ * [LYFT-INTERNAL] Removed from OSS Spark: https://github.com/apache/spark/pull/40942/
240+ */
241+ def listLeafDirStatuses (fs : FileSystem , baseStatus : FileStatus ): Seq [FileStatus ] = {
242+ def recurse (status : FileStatus ): Seq [FileStatus ] = {
243+ val (directories, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
244+ val leaves = if (directories.isEmpty) Seq (status) else Seq .empty[FileStatus ]
245+ leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
246+ }
247+
248+ assert(baseStatus.isDirectory)
249+ recurse(baseStatus)
250+ }
251+
228252 def isGlobPath (pattern : Path ): Boolean = {
229253 pattern.toString.exists(" {}[]*?\\ " .toSet.contains)
230254 }
0 commit comments