Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

Commit 1e25d7b

Browse files
Clone hadoopConf and use (#582)
Co-authored-by: sandeep katta <[email protected]>
1 parent 729ac27 commit 1e25d7b

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

src/main/scala/com/databricks/spark/xml/util/XmlFile.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.Map
2323

2424
import com.databricks.spark.xml.parsers.StaxXmlGenerator
2525
import com.sun.xml.txw2.output.IndentingXMLStreamWriter
26+
import org.apache.hadoop.conf.Configuration
2627
import org.apache.hadoop.io.{Text, LongWritable}
2728

2829
import org.apache.spark.rdd.RDD
@@ -40,13 +41,15 @@ private[xml] object XmlFile {
4041
rowTag: String): RDD[String] = {
4142
// This just checks the charset's validity early, to keep behavior
4243
Charset.forName(charset)
43-
context.hadoopConfiguration.set(XmlInputFormat.START_TAG_KEY, s"<$rowTag>")
44-
context.hadoopConfiguration.set(XmlInputFormat.END_TAG_KEY, s"</$rowTag>")
45-
context.hadoopConfiguration.set(XmlInputFormat.ENCODING_KEY, charset)
44+
val config = new Configuration(context.hadoopConfiguration)
45+
config.set(XmlInputFormat.START_TAG_KEY, s"<$rowTag>")
46+
config.set(XmlInputFormat.END_TAG_KEY, s"</$rowTag>")
47+
config.set(XmlInputFormat.ENCODING_KEY, charset)
4648
context.newAPIHadoopFile(location,
4749
classOf[XmlInputFormat],
4850
classOf[LongWritable],
49-
classOf[Text]).map { case (_, text) => text.toString }
51+
classOf[Text],
52+
config).map { case (_, text) => text.toString }
5053
}
5154

5255
/**

src/test/scala/com/databricks/spark/xml/XmlSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.TimeZone
2222

2323
import scala.io.Source
2424
import scala.collection.JavaConverters._
25+
import scala.collection.mutable
2526

2627
import org.apache.hadoop.conf.Configuration
2728
import org.apache.hadoop.io.{LongWritable, Text}
@@ -1395,6 +1396,41 @@ final class XmlSuite extends AnyFunSuite with BeforeAndAfterAll {
13951396
assert(df.collect()(1).getStruct(0).get(2) === null)
13961397
}
13971398

1399+
test("read multiple xml files in parallel") {
1400+
val failedAgesSet = mutable.Set[Long]()
1401+
val threads_ages = (1 to 10).map { i =>
1402+
new Thread {
1403+
override def run() {
1404+
val df = spark.read.option("rowTag", "person").format("xml")
1405+
.load(resDir + "ages.xml")
1406+
if (df.schema.fields.isEmpty) {
1407+
failedAgesSet.add(i)
1408+
}
1409+
}
1410+
}
1411+
}
1412+
1413+
val failedBooksSet = mutable.Set[Long]()
1414+
val threads_books = (11 to 20).map { i =>
1415+
new Thread {
1416+
override def run() {
1417+
val df = spark.read.option("rowTag", "book").format("xml")
1418+
.load(resDir + "books.xml")
1419+
if (df.schema.fields.isEmpty) {
1420+
failedBooksSet.add(i)
1421+
}
1422+
}
1423+
}
1424+
}
1425+
1426+
threads_ages.foreach(_.start())
1427+
threads_books.foreach(_.start())
1428+
threads_ages.foreach(_.join())
1429+
threads_books.foreach(_.join())
1430+
assert(failedBooksSet.isEmpty)
1431+
assert(failedAgesSet.isEmpty)
1432+
}
1433+
13981434
private def getLines(path: Path): Seq[String] = {
13991435
val source = Source.fromFile(path.toFile)
14001436
try {

0 commit comments

Comments
 (0)