@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
21
21
import java .net .{InetAddress , UnknownHostException , URI }
22
22
import java .nio .ByteBuffer
23
23
import java .nio .charset .StandardCharsets
24
+ import java .security .PrivilegedExceptionAction
24
25
import java .util .{Locale , Properties , UUID }
25
26
import java .util .zip .{ZipEntry , ZipOutputStream }
26
27
@@ -192,16 +193,32 @@ private[spark] class Client(
192
193
* Cleanup application staging directory.
193
194
*/
194
195
private def cleanupStagingDir (appId : ApplicationId ): Unit = {
195
- val stagingDirPath = new Path (appStagingBaseDir, getAppStagingDir(appId))
196
- try {
197
- val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES )
198
- val fs = stagingDirPath.getFileSystem(hadoopConf)
199
- if (! preserveFiles && fs.delete(stagingDirPath, true )) {
200
- logInfo(s " Deleted staging directory $stagingDirPath" )
196
+ if (sparkConf.get(PRESERVE_STAGING_FILES )) {
197
+ return
198
+ }
199
+
200
+ def cleanupStagingDirInternal (): Unit = {
201
+ val stagingDirPath = new Path (appStagingBaseDir, getAppStagingDir(appId))
202
+ try {
203
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
204
+ if (fs.delete(stagingDirPath, true )) {
205
+ logInfo(s " Deleted staging directory $stagingDirPath" )
206
+ }
207
+ } catch {
208
+ case ioe : IOException =>
209
+ logWarning(" Failed to cleanup staging dir " + stagingDirPath, ioe)
201
210
}
202
- } catch {
203
- case ioe : IOException =>
204
- logWarning(" Failed to cleanup staging dir " + stagingDirPath, ioe)
211
+ }
212
+
213
+ if (isClusterMode && principal != null && keytab != null ) {
214
+ val newUgi = UserGroupInformation .loginUserFromKeytabAndReturnUGI(principal, keytab)
215
+ newUgi.doAs(new PrivilegedExceptionAction [Unit ] {
216
+ override def run (): Unit = {
217
+ cleanupStagingDirInternal()
218
+ }
219
+ })
220
+ } else {
221
+ cleanupStagingDirInternal()
205
222
}
206
223
}
207
224
0 commit comments