Skip to content

Commit f31cf78

Browse files
authored
fix: Fix repeatedly url-decode path when reading parquet from s3 using native parquet reader (#2138)
* Fix repeatedly url-decode path when reading parquet from s3 using native parquet reader * Make ParquetReadFromS3Suite runs using all scan impls * test: support URL escape sequences in write path * Improve S3 parquet read tests
1 parent 0771de6 commit f31cf78

File tree

2 files changed

+36
-10
lines changed

2 files changed

+36
-10
lines changed

native/core/src/parquet/objectstore/s3.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub fn create_store(
6666
source: format!("Scheme of URL is not S3: {url}").into(),
6767
});
6868
}
69-
let path = Path::from_url_path(path)?;
69+
let path = Path::parse(path)?;
7070

7171
let mut builder = AmazonS3Builder::new()
7272
.with_url(url.to_string())

spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ import org.testcontainers.utility.DockerImageName
2828

2929
import org.apache.spark.SparkConf
3030
import org.apache.spark.sql.CometTestBase
31+
import org.apache.spark.sql.DataFrame
3132
import org.apache.spark.sql.SaveMode
3233
import org.apache.spark.sql.comet.CometNativeScanExec
3334
import org.apache.spark.sql.comet.CometScanExec
3435
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
35-
import org.apache.spark.sql.functions.{col, sum}
36+
import org.apache.spark.sql.functions.{col, expr, max, sum}
3637

3738
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
3839
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
@@ -102,20 +103,45 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper
102103
df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath)
103104
}
104105

105-
test("read parquet file from MinIO") {
106+
private def writePartitionedParquetFile(filePath: String): Unit = {
107+
val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)"))
108+
df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath)
109+
}
110+
111+
private def assertCometScan(df: DataFrame): Unit = {
112+
val scans = collect(df.queryExecution.executedPlan) {
113+
case p: CometScanExec => p
114+
case p: CometNativeScanExec => p
115+
}
116+
assert(scans.size == 1)
117+
}
106118

119+
test("read parquet file from MinIO") {
107120
val testFilePath = s"s3a://$testBucketName/data/test-file.parquet"
108121
writeTestParquetFile(testFilePath)
109122

110123
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
111-
val scans = collect(df.queryExecution.executedPlan) {
112-
case p: CometScanExec =>
113-
p
114-
case p: CometNativeScanExec =>
115-
p
116-
}
117-
assert(scans.size == 1)
124+
assertCometScan(df)
125+
assert(df.first().getLong(0) == 499500)
126+
}
118127

128+
test("read partitioned parquet file from MinIO") {
129+
val testFilePath = s"s3a://$testBucketName/data/test-partitioned-file.parquet"
130+
writePartitionedParquetFile(testFilePath)
131+
132+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val")))
133+
val firstRow = df.first()
134+
assert(firstRow.getLong(0) == 499500)
135+
assert(firstRow.getString(1) == "val#9")
136+
}
137+
138+
test("read parquet file from MinIO with URL escape sequences in path") {
139+
// Path with '%23' and '%20' which are URL escape sequences for '#' and ' '
140+
val testFilePath = s"s3a://$testBucketName/data/Brand%2321/test%20file.parquet"
141+
writeTestParquetFile(testFilePath)
142+
143+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
144+
assertCometScan(df)
119145
assert(df.first().getLong(0) == 499500)
120146
}
121147
}

0 commit comments

Comments
 (0)