Skip to content

Commit b83b502

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState
## What changes were proposed in this pull request? Set isolated to false while using builtin hive jars and `SessionState.get` returns a `CliSessionState` instance. ## How was this patch tested? 1 Unit Tests 2 Manually verified: `hive.exec.strachdir` was only created once because of reusing cliSessionState ```java ➜ spark git:(SPARK-21428) ✗ bin/spark-sql --conf spark.sql.hive.metastore.jars=builtin log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/07/16 23:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/07/16 23:59:27 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 17/07/16 23:59:27 INFO ObjectStore: ObjectStore, initialize called 17/07/16 23:59:28 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 17/07/16 23:59:28 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 17/07/16 23:59:29 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:31 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY 17/07/16 23:59:31 INFO ObjectStore: Initialized ObjectStore 17/07/16 23:59:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/07/16 23:59:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/07/16 23:59:32 INFO HiveMetaStore: Added admin role in metastore 17/07/16 23:59:32 INFO HiveMetaStore: Added public role in metastore 17/07/16 23:59:32 INFO HiveMetaStore: No user is added in admin role, since config is empty 17/07/16 23:59:32 INFO HiveMetaStore: 0: get_all_databases 17/07/16 23:59:32 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_all_databases 17/07/16 23:59:32 INFO HiveMetaStore: 0: get_functions: db=default pat=* 17/07/16 23:59:32 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_functions: db=default pat=* 17/07/16 23:59:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/beea7261-221a-4711-89e8-8b12a9d37370_resources 17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370 17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/Kent/beea7261-221a-4711-89e8-8b12a9d37370 17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370/_tmp_space.db 17/07/16 23:59:32 INFO SparkContext: Running Spark version 2.3.0-SNAPSHOT 17/07/16 23:59:32 INFO SparkContext: Submitted application: SparkSQL::10.0.0.8 17/07/16 23:59:32 INFO SecurityManager: Changing view acls to: Kent 17/07/16 23:59:32 INFO SecurityManager: Changing modify acls to: Kent 17/07/16 23:59:32 INFO SecurityManager: Changing view acls groups to: 17/07/16 23:59:32 INFO SecurityManager: Changing modify acls groups to: 17/07/16 23:59:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Kent); groups with view permissions: Set(); users with modify permissions: Set(Kent); groups with modify permissions: Set() 17/07/16 23:59:33 INFO Utils: Successfully started service 'sparkDriver' on port 51889. 17/07/16 23:59:33 INFO SparkEnv: Registering MapOutputTracker 17/07/16 23:59:33 INFO SparkEnv: Registering BlockManagerMaster 17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/07/16 23:59:33 INFO DiskBlockManager: Created local directory at /private/var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/blockmgr-9cfae28a-01e9-4c73-a1f1-f76fa52fc7a5 17/07/16 23:59:33 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 17/07/16 23:59:33 INFO SparkEnv: Registering OutputCommitCoordinator 17/07/16 23:59:33 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/07/16 23:59:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.0.8:4040 17/07/16 23:59:33 INFO Executor: Starting executor ID driver on host localhost 17/07/16 23:59:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51890. 17/07/16 23:59:33 INFO NettyBlockTransferService: Server created on 10.0.0.8:51890 17/07/16 23:59:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/07/16 23:59:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.0.8:51890 with 366.3 MB RAM, BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/Kent/Documents/spark/spark-warehouse'). 17/07/16 23:59:34 INFO SharedState: Warehouse path is 'file:/Users/Kent/Documents/spark/spark-warehouse'. 17/07/16 23:59:34 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes. 17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse 17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: default 17/07/16 23:59:34 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_database: default 17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse 17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: global_temp 17/07/16 23:59:34 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_database: global_temp 17/07/16 23:59:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException 17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse 17/07/16 23:59:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint spark-sql> ``` cc cloud-fan gatorsmile Author: Kent Yao <[email protected]> Author: hzyaoqin <[email protected]> Closes apache#18648 from yaooqinn/SPARK-21428.
1 parent d695a52 commit b83b502

File tree

6 files changed

+170
-108
lines changed

6 files changed

+170
-108
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.IOException
20+
import java.io.{File, IOException}
2121
import java.security.PrivilegedExceptionAction
2222
import java.text.DateFormat
2323
import java.util.{Arrays, Comparator, Date, Locale}
@@ -155,8 +155,14 @@ class SparkHadoopUtil extends Logging {
155155

156156
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
157157

158-
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
159-
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
158+
def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
159+
if (!new File(keytabFilename).exists()) {
160+
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
161+
} else {
162+
logInfo("Attempting to login to Kerberos" +
163+
s" using principal: ${principalName} and keytab: ${keytabFilename}")
164+
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
165+
}
160166
}
161167

162168
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -559,18 +559,13 @@ object SparkSubmit extends CommandLineUtils {
559559
if (clusterManager == YARN || clusterManager == LOCAL) {
560560
if (args.principal != null) {
561561
require(args.keytab != null, "Keytab must be specified when principal is specified")
562-
if (!new File(args.keytab).exists()) {
563-
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
564-
} else {
565-
// Add keytab and principal configurations in sysProps to make them available
566-
// for later use; e.g. in spark sql, the isolated class loader used to talk
567-
// to HiveMetastore will use these settings. They will be set as Java system
568-
// properties and then loaded by SparkConf
569-
sysProps.put("spark.yarn.keytab", args.keytab)
570-
sysProps.put("spark.yarn.principal", args.principal)
571-
572-
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
573-
}
562+
SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
563+
// Add keytab and principal configurations in sysProps to make them available
564+
// for later use; e.g. in spark sql, the isolated class loader used to talk
565+
// to HiveMetastore will use these settings. They will be set as Java system
566+
// properties and then loaded by SparkConf
567+
sysProps.put("spark.yarn.keytab", args.keytab)
568+
sysProps.put("spark.yarn.principal", args.principal)
574569
}
575570
}
576571

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.thriftserver
19+
20+
import org.apache.hadoop.hive.cli.CliSessionState
21+
import org.apache.hadoop.hive.conf.HiveConf
22+
import org.apache.hadoop.hive.ql.session.SessionState
23+
24+
import org.apache.spark.{SparkConf, SparkFunSuite}
25+
import org.apache.spark.deploy.SparkHadoopUtil
26+
import org.apache.spark.sql.hive.HiveUtils
27+
28+
class HiveCliSessionStateSuite extends SparkFunSuite {
29+
30+
def withSessionClear(f: () => Unit): Unit = {
31+
try f finally SessionState.detachSession()
32+
}
33+
34+
test("CliSessionState will be reused") {
35+
withSessionClear { () =>
36+
val hiveConf = new HiveConf(classOf[SessionState])
37+
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
38+
case (key, value) => hiveConf.set(key, value)
39+
}
40+
val sessionState: SessionState = new CliSessionState(hiveConf)
41+
SessionState.start(sessionState)
42+
val s1 = SessionState.get
43+
val sparkConf = new SparkConf()
44+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
45+
val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState
46+
assert(s1 === s2)
47+
assert(s2.isInstanceOf[CliSessionState])
48+
}
49+
}
50+
51+
test("SessionState will not be reused") {
52+
withSessionClear { () =>
53+
val sparkConf = new SparkConf()
54+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
55+
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
56+
case (key, value) => hadoopConf.set(key, value)
57+
}
58+
val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf)
59+
val s1 = hiveClient.getState
60+
val s2 = hiveClient.newSession().getState
61+
assert(s1 !== s2)
62+
}
63+
}
64+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
3232
import org.apache.hadoop.hive.common.`type`.HiveDecimal
3333
import org.apache.hadoop.hive.conf.HiveConf
3434
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
35+
import org.apache.hadoop.hive.ql.session.SessionState
3536
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
3637
import org.apache.hadoop.util.VersionInfo
3738

@@ -230,6 +231,22 @@ private[spark] object HiveUtils extends Logging {
230231
}.toMap
231232
}
232233

234+
/**
235+
* Check current Thread's SessionState type
236+
* @return true when SessionState.get returns an instance of CliSessionState,
237+
* false when it gets non-CliSessionState instance or null
238+
*/
239+
def isCliSessionState(): Boolean = {
240+
val state = SessionState.get
241+
var temp: Class[_] = if (state != null) state.getClass else null
242+
var found = false
243+
while (temp != null && !found) {
244+
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
245+
temp = temp.getSuperclass
246+
}
247+
found
248+
}
249+
233250
/**
234251
* Create a [[HiveClient]] used for execution.
235252
*
@@ -313,7 +330,7 @@ private[spark] object HiveUtils extends Logging {
313330
hadoopConf = hadoopConf,
314331
execJars = jars.toSeq,
315332
config = configurations,
316-
isolationOn = true,
333+
isolationOn = !isCliSessionState(),
317334
barrierPrefixes = hiveMetastoreBarrierPrefixes,
318335
sharedPrefixes = hiveMetastoreSharedPrefixes)
319336
} else if (hiveMetastoreJars == "maven") {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ private[hive] trait HiveClient {
3838
/** Returns the configuration for the given key in the current session. */
3939
def getConf(key: String, defaultValue: String): String
4040

41+
/**
42+
* Return the associated Hive SessionState of this [[HiveClientImpl]]
43+
* @return [[Any]] not SessionState to avoid linkage error
44+
*/
45+
def getState: Any
46+
4147
/**
4248
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
4349
* result in one string.

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 66 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab
3535
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
3636
import org.apache.hadoop.hive.ql.processors._
3737
import org.apache.hadoop.hive.ql.session.SessionState
38-
import org.apache.hadoop.security.UserGroupInformation
3938

4039
import org.apache.spark.{SparkConf, SparkException}
40+
import org.apache.spark.deploy.SparkHadoopUtil
4141
import org.apache.spark.internal.Logging
4242
import org.apache.spark.metrics.source.HiveCatalogMetrics
4343
import org.apache.spark.sql.AnalysisException
@@ -105,107 +105,78 @@ private[hive] class HiveClientImpl(
105105
// Create an internal session state for this HiveClientImpl.
106106
val state: SessionState = {
107107
val original = Thread.currentThread().getContextClassLoader
108-
// Switch to the initClassLoader.
109-
Thread.currentThread().setContextClassLoader(initClassLoader)
110-
111-
// Set up kerberos credentials for UserGroupInformation.loginUser within
112-
// current class loader
113-
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
114-
val principalName = sparkConf.get("spark.yarn.principal")
115-
val keytabFileName = sparkConf.get("spark.yarn.keytab")
116-
if (!new File(keytabFileName).exists()) {
117-
throw new SparkException(s"Keytab file: ${keytabFileName}" +
118-
" specified in spark.yarn.keytab does not exist")
119-
} else {
120-
logInfo("Attempting to login to Kerberos" +
121-
s" using principal: ${principalName} and keytab: ${keytabFileName}")
122-
UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
123-
}
124-
}
125-
126-
def isCliSessionState(state: SessionState): Boolean = {
127-
var temp: Class[_] = if (state != null) state.getClass else null
128-
var found = false
129-
while (temp != null && !found) {
130-
found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
131-
temp = temp.getSuperclass
108+
if (clientLoader.isolationOn) {
109+
// Switch to the initClassLoader.
110+
Thread.currentThread().setContextClassLoader(initClassLoader)
111+
// Set up kerberos credentials for UserGroupInformation.loginUser within current class loader
112+
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
113+
val principal = sparkConf.get("spark.yarn.principal")
114+
val keytab = sparkConf.get("spark.yarn.keytab")
115+
SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab)
132116
}
133-
found
134-
}
135-
136-
val ret = try {
137-
// originState will be created if not exists, will never be null
138-
val originalState = SessionState.get()
139-
if (isCliSessionState(originalState)) {
140-
// In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
141-
// which contains information like configurations from command line. Later
142-
// we call `SparkSQLEnv.init()` there, which would run into this part again.
143-
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
144-
originalState
145-
} else {
146-
val hiveConf = new HiveConf(classOf[SessionState])
147-
// 1: we set all confs in the hadoopConf to this hiveConf.
148-
// This hadoopConf contains user settings in Hadoop's core-site.xml file
149-
// and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
150-
// SharedState and put settings in this hadoopConf instead of relying on HiveConf
151-
// to load user settings. Otherwise, HiveConf's initialize method will override
152-
// settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
153-
// is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
154-
// has hive-site.xml. So, HiveConf will use that to override its default values.
155-
hadoopConf.iterator().asScala.foreach { entry =>
156-
val key = entry.getKey
157-
val value = entry.getValue
158-
if (key.toLowerCase(Locale.ROOT).contains("password")) {
159-
logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx")
160-
} else {
161-
logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value")
162-
}
163-
hiveConf.set(key, value)
164-
}
165-
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
166-
// the initial value will be the current thread's context class loader
167-
// (i.e. initClassLoader at here).
168-
// We call initialConf.setClassLoader(initClassLoader) at here to make
169-
// this action explicit.
170-
hiveConf.setClassLoader(initClassLoader)
171-
// 2: we set all spark confs to this hiveConf.
172-
sparkConf.getAll.foreach { case (k, v) =>
173-
if (k.toLowerCase(Locale.ROOT).contains("password")) {
174-
logDebug(s"Applying Spark config to Hive Conf: $k=xxx")
175-
} else {
176-
logDebug(s"Applying Spark config to Hive Conf: $k=$v")
177-
}
178-
hiveConf.set(k, v)
179-
}
180-
// 3: we set all entries in config to this hiveConf.
181-
extraConfig.foreach { case (k, v) =>
182-
if (k.toLowerCase(Locale.ROOT).contains("password")) {
183-
logDebug(s"Applying extra config to HiveConf: $k=xxx")
184-
} else {
185-
logDebug(s"Applying extra config to HiveConf: $k=$v")
186-
}
187-
hiveConf.set(k, v)
188-
}
189-
val state = new SessionState(hiveConf)
190-
if (clientLoader.cachedHive != null) {
191-
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
192-
}
193-
SessionState.start(state)
194-
state.out = new PrintStream(outputBuffer, true, "UTF-8")
195-
state.err = new PrintStream(outputBuffer, true, "UTF-8")
196-
state
117+
try {
118+
newState()
119+
} finally {
120+
Thread.currentThread().setContextClassLoader(original)
197121
}
198-
} finally {
199-
Thread.currentThread().setContextClassLoader(original)
122+
} else {
123+
// Isolation off means we detect a CliSessionState instance in current thread.
124+
// 1: Inside the spark project, we have already started a CliSessionState in
125+
// `SparkSQLCLIDriver`, which contains configurations from command lines. Later, we call
126+
// `SparkSQLEnv.init()` there, which would new a hive client again. so we should keep those
127+
// configurations and reuse the existing instance of `CliSessionState`. In this case,
128+
// SessionState.get will always return a CliSessionState.
129+
// 2: In another case, a user app may start a CliSessionState outside spark project with built
130+
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
131+
// called to remove the current state after that, hive client created later will initialize
132+
// its own state by newState()
133+
Option(SessionState.get).getOrElse(newState())
200134
}
201-
ret
202135
}
203136

204137
// Log the default warehouse location.
205138
logInfo(
206139
s"Warehouse location for Hive client " +
207140
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
208141

142+
private def newState(): SessionState = {
143+
val hiveConf = new HiveConf(classOf[SessionState])
144+
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
145+
// the initial value will be the current thread's context class loader
146+
// (i.e. initClassLoader at here).
147+
// We call initialConf.setClassLoader(initClassLoader) at here to make
148+
// this action explicit.
149+
hiveConf.setClassLoader(initClassLoader)
150+
151+
// 1: Take all from the hadoopConf to this hiveConf.
152+
// This hadoopConf contains user settings in Hadoop's core-site.xml file
153+
// and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
154+
// SharedState and put settings in this hadoopConf instead of relying on HiveConf
155+
// to load user settings. Otherwise, HiveConf's initialize method will override
156+
// settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
157+
// is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
158+
// has hive-site.xml. So, HiveConf will use that to override its default values.
159+
// 2: we set all spark confs to this hiveConf.
160+
// 3: we set all entries in config to this hiveConf.
161+
(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
162+
++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) =>
163+
logDebug(
164+
s"""
165+
|Applying Hadoop/Hive/Spark and extra properties to Hive Conf:
166+
|$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v}
167+
""".stripMargin)
168+
hiveConf.set(k, v)
169+
}
170+
val state = new SessionState(hiveConf)
171+
if (clientLoader.cachedHive != null) {
172+
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
173+
}
174+
SessionState.start(state)
175+
state.out = new PrintStream(outputBuffer, true, "UTF-8")
176+
state.err = new PrintStream(outputBuffer, true, "UTF-8")
177+
state
178+
}
179+
209180
/** Returns the configuration for the current session. */
210181
def conf: HiveConf = state.getConf
211182

@@ -269,6 +240,9 @@ private[hive] class HiveClientImpl(
269240
}
270241
}
271242

243+
/** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */
244+
override def getState: SessionState = withHiveState(state)
245+
272246
/**
273247
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
274248
*/

0 commit comments

Comments
 (0)