Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 581200a

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one
## What changes were proposed in this pull request? While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too. ## How was this patch tested? existing ut cc cloud-fan jiangxb1987 Author: Kent Yao <[email protected]> Closes apache#19068 from yaooqinn/SPARK-21428-FOLLOWUP.
1 parent 1bc17a6 commit 581200a

File tree

5 files changed

+29
-10
lines changed

5 files changed

+29
-10
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
3737
import org.apache.log4j.{Level, Logger}
3838
import org.apache.thrift.transport.TSocket
3939

40+
import org.apache.spark.SparkConf
41+
import org.apache.spark.deploy.SparkHadoopUtil
4042
import org.apache.spark.internal.Logging
4143
import org.apache.spark.sql.AnalysisException
4244
import org.apache.spark.sql.hive.HiveUtils
@@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
8183
System.exit(1)
8284
}
8385

86+
val sparkConf = new SparkConf(loadDefaults = true)
87+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
88+
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
89+
8490
val cliConf = new HiveConf(classOf[SessionState])
85-
// Override the location of the metastore since this is only used for local execution.
86-
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
87-
case (key, value) => cliConf.set(key, value)
91+
(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
92+
++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
93+
case (k, v) =>
94+
cliConf.set(k, v)
8895
}
96+
8997
val sessionState = new CliSessionState(cliConf)
9098

9199
sessionState.in = System.in

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
176176
}
177177

178178
/**
179-
* Configurations needed to create a [[HiveClient]].
179+
* Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
180180
*/
181-
private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
181+
private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
182182
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
183183
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
184184
// compatibility when users are trying to connecting to a Hive metastore of lower version,
@@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging {
280280
protected[hive] def newClientForMetadata(
281281
conf: SparkConf,
282282
hadoopConf: Configuration): HiveClient = {
283-
val configurations = hiveClientConfigurations(hadoopConf)
283+
val configurations = formatTimeVarsForHiveClient(hadoopConf)
284284
newClientForMetadata(conf, hadoopConf, configurations)
285285
}
286286

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration
2828
import org.apache.hadoop.fs.Path
2929
import org.apache.hadoop.hive.common.StatsSetupConst
3030
import org.apache.hadoop.hive.conf.HiveConf
31+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
3132
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
3233
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
3334
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
@@ -132,14 +133,24 @@ private[hive] class HiveClientImpl(
132133
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
133134
// called to remove the current state after that, hive client created later will initialize
134135
// its own state by newState()
135-
Option(SessionState.get).getOrElse(newState())
136+
val ret = SessionState.get
137+
if (ret != null) {
138+
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
139+
// instance constructed, we need to follow that change here.
140+
Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
141+
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
142+
}
143+
ret
144+
} else {
145+
newState()
146+
}
136147
}
137148
}
138149

139150
// Log the default warehouse location.
140151
logInfo(
141152
s"Warehouse location for Hive client " +
142-
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
153+
s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
143154

144155
private def newState(): SessionState = {
145156
val hiveConf = new HiveConf(classOf[SessionState])

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
3636
hadoopConf.set("hive.metastore.schema.verification", "false")
3737
}
3838
HiveClientBuilder
39-
.buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
39+
.buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
4040
}
4141

4242
override def suiteName: String = s"${super.suiteName}($version)"

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
127127
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
128128
hadoopConf.set("hive.metastore.schema.verification", "false")
129129
}
130-
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
130+
client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
131131
if (versionSpark != null) versionSpark.reset()
132132
versionSpark = TestHiveVersion(client)
133133
assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

0 commit comments

Comments
 (0)