Skip to content

Commit 60bcc71

Browse files
sririshindraattilapiros
authored andcommitted
[SPARK-51095][CORE][SQL] Include caller context for hdfs audit logs for calls from driver
### What changes were proposed in this pull request? Add the caller context for calls from DRIVER to HDFS. ### Why are the changes needed? HDFS audit logs include the ability to add a "caller context". Spark already leverages this to set the yarn application id, job id, task id, etc. but only on executors. The caller context is left empty on the spark driver. With introductions of Iceberg we have seen multiple scenarios in which files in HDFS are accessed from the driver. But since the caller context is left empty our ability to forensically analyse any issues has diminished. This PR includes sets caller context from the driver as well. ### Does this PR introduce _any_ user-facing change? Yes, hdfs audit logs will now have caller context for calls from driver. ### How was this patch tested? This patch was tested manually. After this change the hdfs audit logs now contain caller context from the driver. ``` 2025-02-14 02:26:23,249 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE) ip=/192.168.97.4 cmd=getfileinfo src=/warehouse/sample dst=null perm=null proto=rpc callerContext=SPARK_DRIVER_application_1739496632907_0005 2025-02-14 02:26:23,265 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE) ip=/192.168.97.4 cmd=listStatus src=/warehouse/sample dst=null perm=null proto=rpc callerContext=SPARK_DRIVER_application_1739496632907_0005 2025-02-14 02:26:25,519 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE) ip=/192.168.97.5 cmd=open src=/warehouse/sample/part-00000-dd473344-76b1-4179-91ae-d15a8da4a888-c000 dst=null perm=null proto=rpc callerContext=SPARK_TASK_application_1739496632907_0005_JId_0_SId_0_0_TId_0_0 2025-02-14 02:26:26,345 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE) ip=/192.168.97.5 cmd=open src=/warehouse/sample/part-00000-dd473344-76b1-4179-91ae-d15a8da4a888-c000 dst=null perm=null proto=rpc callerContext=SPARK_TASK_application_1739496632907_0005_JId_1_SId_1_0_TId_1_0 ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#49814 from sririshindra/master-SPARK-51095. Lead-authored-by: Rishi <[email protected]> Co-authored-by: Rishi <[email protected]> Signed-off-by: attilapiros <[email protected]>
1 parent 48fc0fb commit 60bcc71

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,9 @@ class SparkContext(config: SparkConf) extends Logging {
722722
}
723723
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
724724
_plugins.foreach(_.registerMetrics(applicationId))
725+
726+
new CallerContext("DRIVER", config.get(APP_CALLER_CONTEXT),
727+
Some(applicationId), applicationAttemptId).setCurrentContext()
725728
} catch {
726729
case NonFatal(e) =>
727730
logError("Error initializing SparkContext.", e)

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.google.common.io.Files
2929
import org.apache.hadoop.conf.Configuration
3030
import org.apache.hadoop.fs.Path
3131
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
32+
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
3233
import org.apache.hadoop.mapred.TextInputFormat
3334
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
3435
import org.apache.logging.log4j.{Level, LogManager}
@@ -1460,6 +1461,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
14601461
}
14611462
sc.stop()
14621463
}
1464+
1465+
test("SPARK-51095: Test caller context initialization") {
1466+
val conf = new SparkConf().setAppName("test").setMaster("local")
1467+
sc = new SparkContext(conf)
1468+
val hadoopCallerContext = HadoopCallerContext.getCurrent()
1469+
assert(hadoopCallerContext.getContext().startsWith("SPARK_DRIVER"))
1470+
sc.stop()
1471+
}
1472+
14631473
}
14641474

14651475
object SparkContextSuite {

0 commit comments

Comments
 (0)