Skip to content

Commit 12411b5

Browse files
zsxwinggatorsmile
authored andcommitted
[SPARK-21732][SQL] Lazily init hive metastore client
## What changes were proposed in this pull request? This PR changes the codes to lazily init hive metastore client so that we can create SparkSession without talking to the hive metastore sever. It's pretty helpful when you set a hive metastore server but it's down. You can still start the Spark shell to debug. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <[email protected]> Closes apache#18944 from zsxwing/hive-lazy-init.
1 parent 0422ce0 commit 12411b5

File tree

3 files changed

+81
-12
lines changed

3 files changed

+81
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,14 +287,14 @@ abstract class BaseSessionStateBuilder(
287287
experimentalMethods,
288288
functionRegistry,
289289
udfRegistration,
290-
catalog,
290+
() => catalog,
291291
sqlParser,
292-
analyzer,
293-
optimizer,
292+
() => analyzer,
293+
() => optimizer,
294294
planner,
295295
streamingQueryManager,
296296
listenerManager,
297-
resourceLoader,
297+
() => resourceLoader,
298298
createQueryExecution,
299299
createClone)
300300
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,17 @@ import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListen
4242
* @param experimentalMethods Interface to add custom planning strategies and optimizers.
4343
* @param functionRegistry Internal catalog for managing functions registered by the user.
4444
* @param udfRegistration Interface exposed to the user for registering user-defined functions.
45-
* @param catalog Internal catalog for managing table and database states.
45+
* @param catalogBuilder a function to create an internal catalog for managing table and database
46+
* states.
4647
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
47-
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
48-
* @param optimizer Logical query plan optimizer.
48+
* @param analyzerBuilder A function to create the logical query plan analyzer for resolving
49+
* unresolved attributes and relations.
50+
* @param optimizerBuilder a function to create the logical query plan optimizer.
4951
* @param planner Planner that converts optimized logical plans to physical plans.
5052
* @param streamingQueryManager Interface to start and stop streaming queries.
5153
* @param listenerManager Interface to register custom [[QueryExecutionListener]]s.
52-
* @param resourceLoader Session shared resource loader to load JARs, files, etc.
54+
* @param resourceLoaderBuilder a function to create a session shared resource loader to load JARs,
55+
* files, etc.
5356
* @param createQueryExecution Function used to create QueryExecution objects.
5457
* @param createClone Function used to create clones of the session state.
5558
*/
@@ -59,17 +62,26 @@ private[sql] class SessionState(
5962
val experimentalMethods: ExperimentalMethods,
6063
val functionRegistry: FunctionRegistry,
6164
val udfRegistration: UDFRegistration,
62-
val catalog: SessionCatalog,
65+
catalogBuilder: () => SessionCatalog,
6366
val sqlParser: ParserInterface,
64-
val analyzer: Analyzer,
65-
val optimizer: Optimizer,
67+
analyzerBuilder: () => Analyzer,
68+
optimizerBuilder: () => Optimizer,
6669
val planner: SparkPlanner,
6770
val streamingQueryManager: StreamingQueryManager,
6871
val listenerManager: ExecutionListenerManager,
69-
val resourceLoader: SessionResourceLoader,
72+
resourceLoaderBuilder: () => SessionResourceLoader,
7073
createQueryExecution: LogicalPlan => QueryExecution,
7174
createClone: (SparkSession, SessionState) => SessionState) {
7275

76+
// The following fields are lazy to avoid creating the Hive client when creating SessionState.
77+
lazy val catalog: SessionCatalog = catalogBuilder()
78+
79+
lazy val analyzer: Analyzer = analyzerBuilder()
80+
81+
lazy val optimizer: Optimizer = optimizerBuilder()
82+
83+
lazy val resourceLoader: SessionResourceLoader = resourceLoaderBuilder()
84+
7385
def newHadoopConf(): Configuration = SessionState.newHadoopConf(
7486
sharedState.sparkContext.hadoopConfiguration,
7587
conf)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.{AnalysisException, SparkSession}
22+
import org.apache.spark.util.Utils
23+
24+
class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
25+
26+
test("lazily initialize Hive client") {
27+
val spark = SparkSession.builder()
28+
.appName("HiveMetastoreLazyInitializationSuite")
29+
.master("local[2]")
30+
.enableHiveSupport()
31+
.config("spark.hadoop.hive.metastore.uris", "thrift://127.0.0.1:11111")
32+
.getOrCreate()
33+
val originalLevel = org.apache.log4j.Logger.getRootLogger().getLevel
34+
try {
35+
// Avoid outputting a lot of expected warning logs
36+
spark.sparkContext.setLogLevel("error")
37+
38+
// We should be able to run Spark jobs without Hive client.
39+
assert(spark.sparkContext.range(0, 1).count() === 1)
40+
41+
// Make sure that we are not using the local derby metastore.
42+
val exceptionString = Utils.exceptionString(intercept[AnalysisException] {
43+
spark.sql("show tables")
44+
})
45+
for (msg <- Seq(
46+
"show tables",
47+
"Could not connect to meta store",
48+
"org.apache.thrift.transport.TTransportException",
49+
"Connection refused")) {
50+
exceptionString.contains(msg)
51+
}
52+
} finally {
53+
spark.sparkContext.setLogLevel(originalLevel.toString)
54+
spark.stop()
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)