1
1
package spark .jobserver
2
2
3
- import akka .actor .{ActorSystem , ActorRef }
3
+ import akka .actor .{ActorRef , ActorSystem }
4
4
import akka .actor .Props
5
5
import akka .pattern .ask
6
- import com .typesafe .config .{ConfigValueFactory , Config , ConfigFactory }
7
-
6
+ import com .typesafe .config .{Config , ConfigFactory , ConfigValueFactory }
8
7
import java .io .File
9
- import spark .jobserver .io .{BinaryType , JobDAOActor , JobDAO , DataFileDAO }
8
+
9
+ import spark .jobserver .io .{BinaryType , DataFileDAO , JobDAO , JobDAOActor }
10
10
import org .slf4j .LoggerFactory
11
11
12
12
import scala .collection .JavaConverters ._
13
13
import scala .concurrent .{Await , ExecutionContext }
14
14
import scala .concurrent .duration ._
15
+ import scala .util .matching .Regex
15
16
16
17
/**
17
18
* The Spark Job Server is a web service that allows users to submit and run Spark jobs, check status,
@@ -31,7 +32,9 @@ import scala.concurrent.duration._
31
32
*/
32
33
object JobServer {
33
34
val logger = LoggerFactory .getLogger(getClass)
34
-
35
+ final val DEFAULT_CREDENTIAL_KEYS = " credentials,password,secret,token"
36
+ final val EMPTY_VALUE_PATTERN = " \"\" ,?" .r
37
+ final val CREDENTIAL_MASK = " \" xxx\" "
35
38
// Allow custom function to create ActorSystem. An example of why this is useful:
36
39
// we can have something that stores the ActorSystem so it could be shut down easily later.
37
40
def start (args : Array [String ], makeSystem : Config => ActorSystem ) {
@@ -46,8 +49,12 @@ object JobServer {
46
49
} else {
47
50
defaultConfig
48
51
}
49
- logger.info(" Starting JobServer with config {}" , config.getConfig(" spark" ).root.render())
50
- logger.info(" Spray config: {}" , config.getConfig(" spray.can.server" ).root.render())
52
+
53
+ val credentialPattern = credentialRegex(config)
54
+ val sparkConfig = config.getConfig(" spark" ).root
55
+ logger.info(" Starting JobServer with config {}" , maskCredentials(sparkConfig.render(), credentialPattern))
56
+ val sprayConfig = config.getConfig(" spray.can.server" ).root
57
+ logger.info(" Spray config: {}" , maskCredentials(sprayConfig.render(), credentialPattern))
51
58
val port = config.getInt(" spark.jobserver.port" )
52
59
53
60
// TODO: Hardcode for now to get going. Make it configurable later.
@@ -90,6 +97,36 @@ object JobServer {
90
97
}
91
98
}
92
99
100
+ private def maskCredentials (lines : String , credentialRegex : Regex ): String = {
101
+ lines
102
+ .split(" \n " )
103
+ .toSeq
104
+ .map {
105
+ line => line.split(" :" ) match {
106
+ // if key matches credential keys pattern and value is not empty, mask credentials
107
+ case Array (key, value) if (credentialRegex.findFirstIn(key).nonEmpty
108
+ && EMPTY_VALUE_PATTERN .findFirstIn(value.stripMargin).isEmpty) =>
109
+ Array (key, CREDENTIAL_MASK ).mkString(" :" )
110
+ case _ => line
111
+ }
112
+ }.mkString(" \n " )
113
+ }
114
+
115
+ private def credentialRegex (config : Config ): Regex = {
116
+ // Use default credential keys if spark.ui.confidentialKeys is not set
117
+ val credentialKeys = try {
118
+ config.getString(" spark.ui.confidentialKeys" )
119
+ } catch {
120
+ case _ : Exception =>
121
+ logger.info(s " spark.ui.confidentialKeys is not set, " +
122
+ s " use default credential keys $DEFAULT_CREDENTIAL_KEYS" )
123
+ DEFAULT_CREDENTIAL_KEYS
124
+ }
125
+
126
+ // Case insensitive
127
+ s """ (?i)( ${credentialKeys.split(" ," ).mkString(" |" )})" """ .r
128
+ }
129
+
93
130
private def parseInitialBinaryConfig (key : String , config : Config ): Map [String , String ] = {
94
131
if (config.hasPath(key)) {
95
132
val initialJarsConfig = config.getConfig(key).root
0 commit comments