Skip to content

Commit 0b7ee27

Browse files
committed
Fix Mocked FS parse non-default container root path issue
Also introduced Windows file status class with fixing reference Class RawLocalFileSystem method loadPermissionInfo() issue. Signed-off-by: Wei Zhang <[email protected]>
1 parent 5061313 commit 0b7ee27

File tree

6 files changed

+306
-26
lines changed

6 files changed

+306
-26
lines changed

Utils/hdinsight-node-common/Test/java/com/microsoft/azure/hdinsight/spark/mock/SparkLocalRunnerITScenario.kt

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package com.microsoft.azure.hdinsight.spark.mock
2424

2525
import com.microsoft.azure.hdinsight.spark.common.SparkLocalJvmProcess
26+
import com.microsoft.azure.hdinsight.spark.mock.jobapp.CatCmd
2627
import cucumber.api.java.en.And
2728
import cucumber.api.java.en.Given
2829
import cucumber.api.java.en.Then
@@ -46,14 +47,10 @@ class SparkLocalRunnerITScenario {
4647
sparkLocalJob = jvmProcess.createProcess("", SparkLocalRunner::class.java, args)
4748
}
4849

49-
private fun runToGetStdoutLines(): List<String> {
50-
assertThat(sparkLocalJob)
51-
.describedAs("Run Spark Job locally firstly")
52-
.isNotNull()
50+
private fun runToGetStdoutLines(job: ProcessBuilder): List<String> {
51+
job.redirectOutput(ProcessBuilder.Redirect.PIPE)
5352

54-
sparkLocalJob!!.redirectOutput(ProcessBuilder.Redirect.PIPE)
55-
56-
val process = sparkLocalJob!!.start()
53+
val process = job.start()
5754

5855
assertThat(process.waitFor())
5956
.describedAs("Spark job exist with error.")
@@ -66,15 +63,15 @@ class SparkLocalRunnerITScenario {
6663

6764
@Then("^locally run stand output should be")
6865
fun checkLocallyRunStdout(expectOutputs: List<String>) {
69-
val outputLines = runToGetStdoutLines()
66+
val outputLines = runToGetStdoutLines(sparkLocalJob!!)
7067

7168
assertThat(outputLines).containsAll(expectOutputs)
7269
assertThat(outputLines).hasSameSizeAs(expectOutputs)
7370
}
7471

7572
@Then("^locally run stand output table should be")
7673
fun checkLocallyRunStdoutTable(expectOutputs: DataTable) {
77-
val outputTable = runToGetStdoutLines()
74+
val outputTable = runToGetStdoutLines(sparkLocalJob!!)
7875
.filter { !it.matches("""^\+[-+]*\+$""".toRegex()) }
7976
.map {
8077
it.split("|")
@@ -86,4 +83,20 @@ class SparkLocalRunnerITScenario {
8683

8784
assertThat(outputTable).isEqualTo(expectOutputs.raw())
8885
}
86+
87+
@Then("^locally cat '(.*)' should be")
88+
fun checkLocallyRunFile(fileUri: String, expectOutputs: List<String>) {
89+
runToGetStdoutLines(sparkLocalJob!!)
90+
91+
// Start another job to read the file
92+
93+
val args = arrayOf("--master local[1]", CatCmd::class.java.canonicalName, fileUri)
94+
95+
val catJob = jvmProcess.createProcess("", SparkLocalRunner::class.java, args)
96+
97+
val outputLines = runToGetStdoutLines(catJob!!)
98+
99+
assertThat(outputLines).containsAll(expectOutputs)
100+
assertThat(outputLines).hasSameSizeAs(expectOutputs)
101+
}
89102
}

Utils/hdinsight-node-common/Test/resources/com/microsoft/azure/hdinsight/spark/mock/MockRawLocalFileSystemScenario.feature

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@ Feature: Mock File System Unit Test
1818
Scenario: WASB container blob path
1919
Given set mocked file system local working directory to '/data/__default__/user/current'
2020
Then convert mocked file system path 'wasb://account@blob1/abc/def' to File should be '/data/account@blob1/abc/def'
21+
22+
Scenario: WASB container blob root path
23+
Given set mocked file system local working directory to '/data/__default__/user/current'
24+
Then convert mocked file system path 'wasb://account@blob1/' to File should be '/data/account@blob1/'

Utils/hdinsight-node-common/Test/resources/com/microsoft/azure/hdinsight/spark/mock/SparkLocalRunnerScenario.feature

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ Feature: Spark Local Runner Integration Test
3838
| local,2 |
3939
| world,1 |
4040

41+
Scenario: Mocked blob container file write
42+
Given locally run job 'com.microsoft.azure.hdinsight.spark.mock.jobapp.WordCountTest' with args
43+
| /word_count_input.txt | wasb://container1/wc_result |
44+
Then locally cat 'wasb://container1/wc_result' should be
45+
| (a,1) |
46+
| (mocked,1) |
47+
| (fs,1) |
48+
| (with,1) |
49+
| (Spark,1) |
50+
| (run,1) |
51+
| (Hello,1) |
52+
| (local,2) |
53+
| (world,1) |
54+
4155
Scenario: SparkSQL test
4256
Given locally run job 'com.microsoft.azure.hdinsight.spark.mock.jobapp.SparkSQLTest' with args
4357
| wasb:///people.json | people | SELECT * FROM people |
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation
3+
* <p/>
4+
* All rights reserved.
5+
* <p/>
6+
* MIT License
7+
* <p/>
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
9+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
10+
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
11+
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
12+
* <p/>
13+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
14+
* the Software.
15+
* <p/>
16+
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
17+
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19+
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
* SOFTWARE.
21+
*/
22+
23+
package com.microsoft.azure.hdinsight.spark.mock.jobapp
24+
25+
import org.apache.spark.{SparkConf, SparkContext}
26+
27+
object CatCmd {
28+
def main(args: Array[String]) {
29+
val inputFile = args(0)
30+
31+
val conf = new SparkConf().setAppName("DFS cat command")
32+
// Create a Scala Spark Context.
33+
val sc = new SparkContext(conf)
34+
// Load our input data.
35+
val input = sc.textFile(inputFile)
36+
// output line by line
37+
input
38+
.foreach(l => println(l))
39+
}
40+
}

Utils/hdinsight-node-common/Test/scala/com/microsoft/azure/hdinsight/spark/mock/jobapp/WordCountTest.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
package com.microsoft.azure.hdinsight.spark.mock.jobapp
2424

2525
import org.apache.spark._
26-
import org.apache.spark.SparkContext._
2726

2827
object WordCountTest {
2928
def main(args: Array[String]) {
@@ -41,7 +40,12 @@ object WordCountTest {
4140

4241
outputFileOption match {
4342
// Save the word count back out to a text file, causing evaluation.
44-
case Some(outputFile) => counts.saveAsTextFile(outputFile)
43+
case Some(outputFile) => {
44+
val dfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(outputFile), sc.hadoopConfiguration)
45+
dfs.delete(new org.apache.hadoop.fs.Path(outputFile), true)
46+
47+
counts.saveAsTextFile(outputFile)
48+
}
4549
// just output to stdout
4650
case None =>
4751
counts.collect().foreach { case (word, count) => println(s"$word,$count") }

0 commit comments

Comments
 (0)