Skip to content

Commit fa9671c

Browse files
KontinuationSteve Vaughan Jr
authored andcommitted
fix: Fix repeatedly url-decode path when reading parquet from s3 using native parquet reader (apache#2138)
* Fix repeatedly url-decode path when reading parquet from s3 using native parquet reader * Make ParquetReadFromS3Suite runs using all scan impls * test: support URL escape sequences in write path * Improve S3 parquet read tests
1 parent a30d3f0 commit fa9671c

File tree

2 files changed

+40
-15
lines changed

2 files changed

+40
-15
lines changed

native/core/src/parquet/objectstore/s3.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub fn create_store(
6666
source: format!("Scheme of URL is not S3: {url}").into(),
6767
});
6868
}
69-
let path = Path::from_url_path(path)?;
69+
let path = Path::parse(path)?;
7070

7171
let mut builder = AmazonS3Builder::new()
7272
.with_url(url.to_string())

spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,14 @@ import org.testcontainers.utility.DockerImageName
2828

2929
import org.apache.spark.SparkConf
3030
import org.apache.spark.sql.CometTestBase
31+
import org.apache.spark.sql.DataFrame
3132
import org.apache.spark.sql.SaveMode
3233
import org.apache.spark.sql.comet.CometNativeScanExec
3334
import org.apache.spark.sql.comet.CometScanExec
3435
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
35-
import org.apache.spark.sql.functions.{col, sum}
36+
import org.apache.spark.sql.functions.{col, expr, max, sum}
3637
import org.apache.spark.tags.DockerTest
3738

38-
import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
39-
4039
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
4140
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
4241
import software.amazon.awssdk.regions.Region
@@ -117,25 +116,51 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper
117116
df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath)
118117
}
119118

119+
private def writePartitionedParquetFile(filePath: String): Unit = {
120+
val df = spark.range(0, 1000).withColumn("val", expr("concat('val#', id % 10)"))
121+
df.write.format("parquet").partitionBy("val").mode(SaveMode.Overwrite).save(filePath)
122+
}
123+
124+
private def assertCometScan(df: DataFrame): Unit = {
125+
val scans = collect(df.queryExecution.executedPlan) {
126+
case p: CometScanExec => p
127+
case p: CometNativeScanExec => p
128+
}
129+
assert(scans.size == 1)
130+
}
131+
120132
test("read parquet file from MinIO") {
121133
assume(minioContainer != null, "No Docker API is available")
122134

123-
// native_iceberg_compat mode does not have comprehensive S3 support, so we don't run tests
124-
// under this mode.
125-
assume(sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT)
126-
127135
val testFilePath = s"s3a://$testBucketName/data/test-file.parquet"
128136
writeTestParquetFile(testFilePath)
129137

130138
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
131-
val scans = collect(df.queryExecution.executedPlan) {
132-
case p: CometScanExec =>
133-
p
134-
case p: CometNativeScanExec =>
135-
p
136-
}
137-
assert(scans.size == 1)
139+
assertCometScan(df)
140+
assert(df.first().getLong(0) == 499500)
141+
}
142+
143+
test("read partitioned parquet file from MinIO") {
144+
assume(minioContainer != null, "No Docker API is available")
145+
146+
val testFilePath = s"s3a://$testBucketName/data/test-partitioned-file.parquet"
147+
writePartitionedParquetFile(testFilePath)
148+
149+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")), max(col("val")))
150+
val firstRow = df.first()
151+
assert(firstRow.getLong(0) == 499500)
152+
assert(firstRow.getString(1) == "val#9")
153+
}
154+
155+
test("read parquet file from MinIO with URL escape sequences in path") {
156+
assume(minioContainer != null, "No Docker API is available")
138157

158+
// Path with '%23' and '%20' which are URL escape sequences for '#' and ' '
159+
val testFilePath = s"s3a://$testBucketName/data/Brand%2321/test%20file.parquet"
160+
writeTestParquetFile(testFilePath)
161+
162+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
163+
assertCometScan(df)
139164
assert(df.first().getLong(0) == 499500)
140165
}
141166
}

0 commit comments

Comments
 (0)