Skip to content

Commit 870f972

Browse files
wangyumgatorsmile
authored andcommitted
[SPARK-28104][SQL] Implement Spark's own GetColumnsOperation
## What changes were proposed in this pull request? [SPARK-24196](https://issues.apache.org/jira/browse/SPARK-24196) and [SPARK-24570](https://issues.apache.org/jira/browse/SPARK-24570) implemented Spark's own `GetSchemasOperation` and `GetTablesOperation`. This pr implements Spark's own `GetColumnsOperation`. ## How was this patch tested? unit tests and manual tests: ![image](https://user-images.githubusercontent.com/5399861/59745367-3a7d6180-92a7-11e9-862d-96bc494c5f00.png) Closes apache#24906 from wangyum/SPARK-28104. Authored-by: Yuming Wang <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 5ad1053 commit 870f972

File tree

8 files changed

+270
-10
lines changed

8 files changed

+270
-10
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.thriftserver
19+
20+
import java.util.UUID
21+
import java.util.regex.Pattern
22+
23+
import scala.collection.JavaConverters.seqAsJavaListConverter
24+
25+
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObject}
26+
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
27+
import org.apache.hive.service.cli._
28+
import org.apache.hive.service.cli.operation.GetColumnsOperation
29+
import org.apache.hive.service.cli.session.HiveSession
30+
31+
import org.apache.spark.internal.Logging
32+
import org.apache.spark.sql.SQLContext
33+
import org.apache.spark.sql.catalyst.TableIdentifier
34+
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
35+
import org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils.toJavaSQLType
36+
37+
/**
38+
* Spark's own SparkGetColumnsOperation
39+
*
40+
* @param sqlContext SQLContext to use
41+
* @param parentSession a HiveSession from SessionManager
42+
* @param catalogName catalog name. NULL if not applicable.
43+
* @param schemaName database name, NULL or a concrete database name
44+
* @param tableName table name
45+
* @param columnName column name
46+
*/
47+
private[hive] class SparkGetColumnsOperation(
48+
sqlContext: SQLContext,
49+
parentSession: HiveSession,
50+
catalogName: String,
51+
schemaName: String,
52+
tableName: String,
53+
columnName: String)
54+
extends GetColumnsOperation(parentSession, catalogName, schemaName, tableName, columnName)
55+
with Logging {
56+
57+
val catalog: SessionCatalog = sqlContext.sessionState.catalog
58+
59+
private var statementId: String = _
60+
61+
override def runInternal(): Unit = {
62+
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" +
63+
s", columnName : $columnName"
64+
logInfo(s"GetColumnsOperation: $cmdStr")
65+
66+
setState(OperationState.RUNNING)
67+
// Always use the latest class loader provided by executionHive's state.
68+
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
69+
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
70+
71+
val schemaPattern = convertSchemaPattern(schemaName)
72+
val tablePattern = convertIdentifierPattern(tableName, true)
73+
74+
var columnPattern: Pattern = null
75+
if (columnName != null) {
76+
columnPattern = Pattern.compile(convertIdentifierPattern(columnName, false))
77+
}
78+
79+
val db2Tabs = catalog.listDatabases(schemaPattern).map { dbName =>
80+
(dbName, catalog.listTables(dbName, tablePattern))
81+
}.toMap
82+
83+
if (isAuthV2Enabled) {
84+
val privObjs = seqAsJavaListConverter(getPrivObjs(db2Tabs)).asJava
85+
val cmdStr =
86+
s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName"
87+
authorizeMetaGets(HiveOperationType.GET_COLUMNS, privObjs, cmdStr)
88+
}
89+
90+
try {
91+
db2Tabs.foreach {
92+
case (dbName, tables) =>
93+
catalog.getTablesByName(tables).foreach { catalogTable =>
94+
catalogTable.schema.foreach { column =>
95+
if (columnPattern != null && !columnPattern.matcher(column.name).matches()) {
96+
} else {
97+
val rowData = Array[AnyRef](
98+
null, // TABLE_CAT
99+
dbName, // TABLE_SCHEM
100+
catalogTable.identifier.table, // TABLE_NAME
101+
column.name, // COLUMN_NAME
102+
toJavaSQLType(column.dataType.sql).asInstanceOf[AnyRef], // DATA_TYPE
103+
column.dataType.sql, // TYPE_NAME
104+
null, // COLUMN_SIZE
105+
null, // BUFFER_LENGTH, unused
106+
null, // DECIMAL_DIGITS
107+
null, // NUM_PREC_RADIX
108+
(if (column.nullable) 1 else 0).asInstanceOf[AnyRef], // NULLABLE
109+
column.getComment().getOrElse(""), // REMARKS
110+
null, // COLUMN_DEF
111+
null, // SQL_DATA_TYPE
112+
null, // SQL_DATETIME_SUB
113+
null, // CHAR_OCTET_LENGTH
114+
null, // ORDINAL_POSITION
115+
"YES", // IS_NULLABLE
116+
null, // SCOPE_CATALOG
117+
null, // SCOPE_SCHEMA
118+
null, // SCOPE_TABLE
119+
null, // SOURCE_DATA_TYPE
120+
"NO" // IS_AUTO_INCREMENT
121+
)
122+
rowSet.addRow(rowData)
123+
}
124+
}
125+
}
126+
}
127+
setState(OperationState.FINISHED)
128+
} catch {
129+
case e: HiveSQLException =>
130+
setState(OperationState.ERROR)
131+
throw e
132+
}
133+
}
134+
135+
private def getPrivObjs(db2Tabs: Map[String, Seq[TableIdentifier]]): Seq[HivePrivilegeObject] = {
136+
db2Tabs.foldLeft(Seq.empty[HivePrivilegeObject])({
137+
case (i, (dbName, tables)) => i ++ tables.map { tableId =>
138+
new HivePrivilegeObject(HivePrivilegeObjectType.TABLE_OR_VIEW, dbName, tableId.table)
139+
}
140+
})
141+
}
142+
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ import java.util.{List => JList, Map => JMap}
2121
import java.util.concurrent.ConcurrentHashMap
2222

2323
import org.apache.hive.service.cli._
24-
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager}
24+
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetColumnsOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager}
2525
import org.apache.hive.service.cli.session.HiveSession
2626

2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.SQLContext
2929
import org.apache.spark.sql.hive.HiveUtils
30-
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetSchemasOperation, SparkGetTablesOperation}
30+
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetColumnsOperation, SparkGetSchemasOperation, SparkGetTablesOperation}
3131
import org.apache.spark.sql.internal.SQLConf
3232

3333
/**
@@ -92,6 +92,22 @@ private[thriftserver] class SparkSQLOperationManager()
9292
operation
9393
}
9494

95+
override def newGetColumnsOperation(
96+
parentSession: HiveSession,
97+
catalogName: String,
98+
schemaName: String,
99+
tableName: String,
100+
columnName: String): GetColumnsOperation = synchronized {
101+
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
102+
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
103+
s" initialized or had already closed.")
104+
val operation = new SparkGetColumnsOperation(sqlContext, parentSession,
105+
catalogName, schemaName, tableName, columnName)
106+
handleToOperation.put(operation.getHandle, operation)
107+
logDebug(s"Created GetColumnsOperation with session=$parentSession.")
108+
operation
109+
}
110+
95111
def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
96112
val iterator = confMap.entrySet().iterator()
97113
while (iterator.hasNext) {

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{File, FilenameFilter}
2121
import java.net.URL
2222
import java.nio.charset.StandardCharsets
2323
import java.sql.{Date, DriverManager, SQLException, Statement}
24-
import java.util.UUID
24+
import java.util.{Locale, UUID}
2525

2626
import scala.collection.mutable
2727
import scala.collection.mutable.ArrayBuffer
@@ -820,7 +820,12 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
820820
statements.zip(fs).foreach { case (s, f) => f(s) }
821821
} finally {
822822
tableNames.foreach { name =>
823-
statements(0).execute(s"DROP TABLE IF EXISTS $name")
823+
// TODO: Need a better way to drop the view.
824+
if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
825+
statements(0).execute(s"DROP VIEW IF EXISTS $name")
826+
} else {
827+
statements(0).execute(s"DROP TABLE IF EXISTS $name")
828+
}
824829
}
825830
statements.foreach(_.close())
826831
connections.foreach(_.close())

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
146146
}
147147
}
148148

149-
withJdbcStatement("table1", "table2") { statement =>
149+
withJdbcStatement("table1", "table2", "view1") { statement =>
150150
Seq(
151151
"CREATE TABLE table1(key INT, val STRING)",
152152
"CREATE TABLE table2(key INT, val STRING)",
@@ -177,4 +177,94 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
177177
}
178178
}
179179
}
180+
181+
test("Spark's own GetColumnsOperation(SparkGetColumnsOperation)") {
182+
def testGetColumnsOperation(
183+
schema: String,
184+
tableNamePattern: String,
185+
columnNamePattern: String)(f: HiveQueryResultSet => Unit): Unit = {
186+
val rawTransport = new TSocket("localhost", serverPort)
187+
val connection = new HiveConnection(s"jdbc:hive2://localhost:$serverPort", new Properties)
188+
val user = System.getProperty("user.name")
189+
val transport = PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
190+
val client = new ThriftserverShimUtils.Client(new TBinaryProtocol(transport))
191+
transport.open()
192+
193+
var rs: HiveQueryResultSet = null
194+
195+
try {
196+
val openResp = client.OpenSession(new ThriftserverShimUtils.TOpenSessionReq)
197+
val sessHandle = openResp.getSessionHandle
198+
199+
val getColumnsReq = new ThriftserverShimUtils.TGetColumnsReq(sessHandle)
200+
getColumnsReq.setSchemaName(schema)
201+
getColumnsReq.setTableName(tableNamePattern)
202+
getColumnsReq.setColumnName(columnNamePattern)
203+
204+
rs = new HiveQueryResultSet.Builder(connection)
205+
.setClient(client)
206+
.setSessionHandle(sessHandle)
207+
.setStmtHandle(client.GetColumns(getColumnsReq).getOperationHandle)
208+
.build()
209+
210+
f(rs)
211+
} finally {
212+
rs.close()
213+
connection.close()
214+
transport.close()
215+
rawTransport.close()
216+
}
217+
}
218+
219+
def checkResult(
220+
columns: Seq[(String, String, String, String, String)],
221+
rs: HiveQueryResultSet) : Unit = {
222+
if (columns.nonEmpty) {
223+
for (i <- columns.indices) {
224+
assert(rs.next())
225+
val col = columns(i)
226+
assert(rs.getString("TABLE_NAME") === col._1)
227+
assert(rs.getString("COLUMN_NAME") === col._2)
228+
assert(rs.getString("DATA_TYPE") === col._3)
229+
assert(rs.getString("TYPE_NAME") === col._4)
230+
assert(rs.getString("REMARKS") === col._5)
231+
}
232+
} else {
233+
assert(!rs.next())
234+
}
235+
}
236+
237+
withJdbcStatement("table1", "table2", "view1") { statement =>
238+
Seq(
239+
"CREATE TABLE table1(key INT comment 'Int column', val STRING comment 'String column')",
240+
"CREATE TABLE table2(key INT, val DECIMAL comment 'Decimal column')",
241+
"CREATE VIEW view1 AS SELECT key FROM table1"
242+
).foreach(statement.execute)
243+
244+
testGetColumnsOperation("%", "%", null) { rs =>
245+
checkResult(
246+
Seq(
247+
("table1", "key", "4", "INT", "Int column"),
248+
("table1", "val", "12", "STRING", "String column"),
249+
("table2", "key", "4", "INT", ""),
250+
("table2", "val", "3", "DECIMAL(10,0)", "Decimal column"),
251+
("view1", "key", "4", "INT", "Int column")), rs)
252+
}
253+
254+
testGetColumnsOperation("%", "table1", null) { rs =>
255+
checkResult(
256+
Seq(
257+
("table1", "key", "4", "INT", "Int column"),
258+
("table1", "val", "12", "STRING", "String column")), rs)
259+
}
260+
261+
testGetColumnsOperation("%", "table1", "key") { rs =>
262+
checkResult(Seq(("table1", "key", "4", "INT", "Int column")), rs)
263+
}
264+
265+
testGetColumnsOperation("%", "table_not_exist", null) { rs =>
266+
checkResult(Seq.empty, rs)
267+
}
268+
}
269+
}
180270
}

sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
*/
5050
public class GetColumnsOperation extends MetadataOperation {
5151

52-
private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
52+
protected static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
5353
.addPrimitiveColumn("TABLE_CAT", Type.STRING_TYPE,
5454
"Catalog name. NULL if not applicable")
5555
.addPrimitiveColumn("TABLE_SCHEM", Type.STRING_TYPE,
@@ -109,7 +109,7 @@ public class GetColumnsOperation extends MetadataOperation {
109109
private final String tableName;
110110
private final String columnName;
111111

112-
private final RowSet rowSet;
112+
protected final RowSet rowSet;
113113

114114
protected GetColumnsOperation(HiveSession parentSession, String catalogName, String schemaName,
115115
String tableName, String columnName) {

sql/hive-thriftserver/v1.2.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import org.apache.commons.logging.LogFactory
2121
import org.apache.hadoop.hive.ql.session.SessionState
22-
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema}
22+
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema, Type}
2323

2424
/**
2525
* Various utilities for hive-thriftserver used to upgrade the built-in Hive.
@@ -31,6 +31,7 @@ private[thriftserver] object ThriftserverShimUtils {
3131
private[thriftserver] type TOpenSessionReq = org.apache.hive.service.cli.thrift.TOpenSessionReq
3232
private[thriftserver] type TGetSchemasReq = org.apache.hive.service.cli.thrift.TGetSchemasReq
3333
private[thriftserver] type TGetTablesReq = org.apache.hive.service.cli.thrift.TGetTablesReq
34+
private[thriftserver] type TGetColumnsReq = org.apache.hive.service.cli.thrift.TGetColumnsReq
3435

3536
private[thriftserver] def getConsole: SessionState.LogHelper = {
3637
val LOG = LogFactory.getLog(classOf[SparkSQLCLIDriver])
@@ -43,4 +44,6 @@ private[thriftserver] object ThriftserverShimUtils {
4344
RowSetFactory.create(getResultSetSchema, getProtocolVersion)
4445
}
4546

47+
private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType
48+
4649
}

sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
*/
5757
public class GetColumnsOperation extends MetadataOperation {
5858

59-
private static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
59+
protected static final TableSchema RESULT_SET_SCHEMA = new TableSchema()
6060
.addPrimitiveColumn("TABLE_CAT", Type.STRING_TYPE,
6161
"Catalog name. NULL if not applicable")
6262
.addPrimitiveColumn("TABLE_SCHEM", Type.STRING_TYPE,
@@ -116,7 +116,7 @@ public class GetColumnsOperation extends MetadataOperation {
116116
private final String tableName;
117117
private final String columnName;
118118

119-
private final RowSet rowSet;
119+
protected final RowSet rowSet;
120120

121121
protected GetColumnsOperation(HiveSession parentSession, String catalogName, String schemaName,
122122
String tableName, String columnName) {

sql/hive-thriftserver/v2.3.5/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive.thriftserver
1919

2020
import org.apache.hadoop.hive.ql.session.SessionState
21+
import org.apache.hadoop.hive.serde2.thrift.Type
2122
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema}
2223
import org.slf4j.LoggerFactory
2324

@@ -31,6 +32,7 @@ private[thriftserver] object ThriftserverShimUtils {
3132
private[thriftserver] type TOpenSessionReq = org.apache.hive.service.rpc.thrift.TOpenSessionReq
3233
private[thriftserver] type TGetSchemasReq = org.apache.hive.service.rpc.thrift.TGetSchemasReq
3334
private[thriftserver] type TGetTablesReq = org.apache.hive.service.rpc.thrift.TGetTablesReq
35+
private[thriftserver] type TGetColumnsReq = org.apache.hive.service.rpc.thrift.TGetColumnsReq
3436

3537
private[thriftserver] def getConsole: SessionState.LogHelper = {
3638
val LOG = LoggerFactory.getLogger(classOf[SparkSQLCLIDriver])
@@ -43,4 +45,6 @@ private[thriftserver] object ThriftserverShimUtils {
4345
RowSetFactory.create(getResultSetSchema, getProtocolVersion, false)
4446
}
4547

48+
private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType
49+
4650
}

0 commit comments

Comments
 (0)