|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.hive.execution
|
19 | 19 |
|
| 20 | +import org.apache.hadoop.conf.Configuration |
20 | 21 | import org.apache.hadoop.fs.Path
|
21 | 22 | import org.apache.hadoop.hive.ql.ErrorMsg
|
22 | 23 | import org.apache.hadoop.hive.ql.plan.TableDesc
|
23 | 24 |
|
24 | 25 | import org.apache.spark.SparkException
|
25 | 26 | import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
|
26 |
| -import org.apache.spark.sql.catalyst.catalog.CatalogTable |
| 27 | +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog} |
27 | 28 | import org.apache.spark.sql.catalyst.expressions.Attribute
|
28 | 29 | import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
29 | 30 | import org.apache.spark.sql.execution.command.CommandUtils
|
@@ -91,6 +92,34 @@ case class InsertIntoHiveTable(
|
91 | 92 | )
|
92 | 93 | val tableLocation = hiveQlTable.getDataLocation
|
93 | 94 | val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
|
| 95 | + |
| 96 | + try { |
| 97 | + processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation) |
| 98 | + } finally { |
| 99 | + // Attempt to delete the staging directory and the inclusive files. If failed, the files are |
| 100 | + // expected to be dropped at the normal termination of VM since deleteOnExit is used. |
| 101 | + deleteExternalTmpPath(hadoopConf) |
| 102 | + } |
| 103 | + |
| 104 | + // un-cache this table. |
| 105 | + sparkSession.catalog.uncacheTable(table.identifier.quotedString) |
| 106 | + sparkSession.sessionState.catalog.refreshTable(table.identifier) |
| 107 | + |
| 108 | + CommandUtils.updateTableStats(sparkSession, table) |
| 109 | + |
| 110 | + // It would be nice to just return the childRdd unchanged so insert operations could be chained, |
| 111 | + // however for now we return an empty list to simplify compatibility checks with hive, which |
| 112 | + // does not return anything for insert operations. |
| 113 | + // TODO: implement hive compatibility as rules. |
| 114 | + Seq.empty[Row] |
| 115 | + } |
| 116 | + |
| 117 | + private def processInsert( |
| 118 | + sparkSession: SparkSession, |
| 119 | + externalCatalog: ExternalCatalog, |
| 120 | + hadoopConf: Configuration, |
| 121 | + tableDesc: TableDesc, |
| 122 | + tmpLocation: Path): Unit = { |
94 | 123 | val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
|
95 | 124 |
|
96 | 125 | val numDynamicPartitions = partition.values.count(_.isEmpty)
|
@@ -231,21 +260,5 @@ case class InsertIntoHiveTable(
|
231 | 260 | overwrite,
|
232 | 261 | isSrcLocal = false)
|
233 | 262 | }
|
234 |
| - |
235 |
| - // Attempt to delete the staging directory and the inclusive files. If failed, the files are |
236 |
| - // expected to be dropped at the normal termination of VM since deleteOnExit is used. |
237 |
| - deleteExternalTmpPath(hadoopConf) |
238 |
| - |
239 |
| - // un-cache this table. |
240 |
| - sparkSession.catalog.uncacheTable(table.identifier.quotedString) |
241 |
| - sparkSession.sessionState.catalog.refreshTable(table.identifier) |
242 |
| - |
243 |
| - CommandUtils.updateTableStats(sparkSession, table) |
244 |
| - |
245 |
| - // It would be nice to just return the childRdd unchanged so insert operations could be chained, |
246 |
| - // however for now we return an empty list to simplify compatibility checks with hive, which |
247 |
| - // does not return anything for insert operations. |
248 |
| - // TODO: implement hive compatibility as rules. |
249 |
| - Seq.empty[Row] |
250 | 263 | }
|
251 | 264 | }
|
0 commit comments