Skip to content

Commit 1c40d43

Browse files
committed
Basic S3 test and properties support
1 parent 3371cc1 commit 1c40d43

File tree

6 files changed

+361
-77
lines changed

6 files changed

+361
-77
lines changed

dev/ci/check-suites.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def file_to_class_name(path: Path) -> str | None:
3434
ignore_list = [
3535
"org.apache.comet.parquet.ParquetReadSuite", # abstract
3636
"org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite
37+
"org.apache.comet.IcebergReadFromS3Suite", # manual test suite
3738
"org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract
3839
"org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract
3940
"org.apache.comet.exec.CometColumnarShuffleSuite" # abstract

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,57 @@ object QueryPlanSerde extends Logging with CometExprShim {
269269
classOf[VariancePop] -> CometVariancePop,
270270
classOf[VarianceSamp] -> CometVarianceSamp)
271271

272+
/**
273+
* Transforms Hadoop S3A configuration keys to Iceberg FileIO property keys.
274+
*
275+
* Iceberg-rust's FileIO expects Iceberg-format keys (e.g., s3.access-key-id), not Hadoop keys
276+
* (e.g., fs.s3a.access.key). This function converts Hadoop keys extracted from Spark's
277+
* configuration to the format expected by iceberg-rust.
278+
*
279+
* @param hadoopProps
280+
* Map of Hadoop configuration properties (fs.s3a.* keys)
281+
* @return
282+
* Map with keys transformed to Iceberg format (s3.* keys)
283+
*/
284+
private def hadoopToIcebergS3Properties(
285+
hadoopProps: Map[String, String]): Map[String, String] = {
286+
hadoopProps.flatMap { case (key, value) =>
287+
key match {
288+
// Global S3A configuration keys
289+
case "fs.s3a.access.key" => Some("s3.access-key-id" -> value)
290+
case "fs.s3a.secret.key" => Some("s3.secret-access-key" -> value)
291+
case "fs.s3a.endpoint" => Some("s3.endpoint" -> value)
292+
case "fs.s3a.path.style.access" => Some("s3.path-style-access" -> value)
293+
case "fs.s3a.endpoint.region" => Some("s3.region" -> value)
294+
295+
// Per-bucket configuration keys (e.g., fs.s3a.bucket.mybucket.access.key)
296+
// Extract bucket name and property, then transform to s3.* format
297+
case k if k.startsWith("fs.s3a.bucket.") =>
298+
val parts = k.stripPrefix("fs.s3a.bucket.").split("\\.", 2)
299+
if (parts.length == 2) {
300+
val bucket = parts(0)
301+
val property = parts(1)
302+
property match {
303+
case "access.key" => Some(s"s3.bucket.$bucket.access-key-id" -> value)
304+
case "secret.key" => Some(s"s3.bucket.$bucket.secret-access-key" -> value)
305+
case "endpoint" => Some(s"s3.bucket.$bucket.endpoint" -> value)
306+
case "path.style.access" => Some(s"s3.bucket.$bucket.path-style-access" -> value)
307+
case "endpoint.region" => Some(s"s3.bucket.$bucket.region" -> value)
308+
case _ => None // Ignore unrecognized per-bucket properties
309+
}
310+
} else {
311+
None
312+
}
313+
314+
// Pass through any keys that are already in Iceberg format
315+
case k if k.startsWith("s3.") => Some(key -> value)
316+
317+
// Ignore all other keys
318+
case _ => None
319+
}
320+
}
321+
}
322+
272323
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match {
273324
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
274325
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
@@ -1113,8 +1164,26 @@ object QueryPlanSerde extends Logging with CometExprShim {
11131164
// Set metadata location
11141165
icebergScanBuilder.setMetadataLocation(metadataLocation)
11151166

1116-
// Serialize catalog properties (for authentication - currently empty)
1117-
// TODO: Extract credentials, S3 config, etc.
1167+
val catalogProperties =
1168+
try {
1169+
val session = org.apache.spark.sql.SparkSession.active
1170+
val hadoopConf = session.sessionState.newHadoopConf()
1171+
1172+
val metadataUri = new java.net.URI(metadataLocation)
1173+
val hadoopS3Options =
1174+
NativeConfig.extractObjectStoreOptions(hadoopConf, metadataUri)
1175+
1176+
hadoopToIcebergS3Properties(hadoopS3Options)
1177+
} catch {
1178+
case e: Exception =>
1179+
logWarning(
1180+
s"Failed to extract catalog properties from Iceberg scan: ${e.getMessage}")
1181+
e.printStackTrace()
1182+
Map.empty[String, String]
1183+
}
1184+
catalogProperties.foreach { case (key, value) =>
1185+
icebergScanBuilder.putCatalogProperties(key, value)
1186+
}
11181187

11191188
// Determine number of partitions from Iceberg's output partitioning
11201189
// TODO: Add a test case for both partitioning schemes

spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,6 @@ object CometIcebergNativeScanExec {
159159
/**
160160
* Extracts metadata location from Iceberg table.
161161
*
162-
* TODO: Also extract catalog properties (credentials, S3 config, etc.) for authentication
163-
*
164162
* @param scanExec
165163
* The Spark BatchScanExec containing an Iceberg scan
166164
* @return
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import java.net.URI
23+
24+
import scala.util.Try
25+
26+
import org.testcontainers.containers.MinIOContainer
27+
import org.testcontainers.utility.DockerImageName
28+
29+
import org.apache.spark.SparkConf
30+
import org.apache.spark.sql.CometTestBase
31+
32+
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
33+
import software.amazon.awssdk.services.s3.S3Client
34+
import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, HeadBucketRequest}
35+
36+
trait CometS3TestBase extends CometTestBase {
37+
38+
protected var minioContainer: MinIOContainer = _
39+
protected val userName = "minio-test-user"
40+
protected val password = "minio-test-password"
41+
42+
protected def testBucketName: String
43+
44+
override def beforeAll(): Unit = {
45+
minioContainer = new MinIOContainer(DockerImageName.parse("minio/minio:latest"))
46+
.withUserName(userName)
47+
.withPassword(password)
48+
minioContainer.start()
49+
createBucketIfNotExists(testBucketName)
50+
51+
super.beforeAll()
52+
}
53+
54+
override def afterAll(): Unit = {
55+
super.afterAll()
56+
if (minioContainer != null) {
57+
minioContainer.stop()
58+
}
59+
}
60+
61+
override protected def sparkConf: SparkConf = {
62+
val conf = super.sparkConf
63+
conf.set("spark.hadoop.fs.s3a.access.key", userName)
64+
conf.set("spark.hadoop.fs.s3a.secret.key", password)
65+
conf.set("spark.hadoop.fs.s3a.endpoint", minioContainer.getS3URL)
66+
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
67+
}
68+
69+
protected def createBucketIfNotExists(bucketName: String): Unit = {
70+
val credentials = AwsBasicCredentials.create(userName, password)
71+
val s3Client = S3Client
72+
.builder()
73+
.endpointOverride(URI.create(minioContainer.getS3URL))
74+
.credentialsProvider(StaticCredentialsProvider.create(credentials))
75+
.forcePathStyle(true)
76+
.build()
77+
try {
78+
val bucketExists = Try {
79+
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build())
80+
true
81+
}.getOrElse(false)
82+
83+
if (!bucketExists) {
84+
val request = CreateBucketRequest.builder().bucket(bucketName).build()
85+
s3Client.createBucket(request)
86+
}
87+
} finally {
88+
s3Client.close()
89+
}
90+
}
91+
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.comet.CometIcebergNativeScanExec
24+
import org.apache.spark.sql.execution.SparkPlan
25+
26+
class IcebergReadFromS3Suite extends CometS3TestBase {
27+
28+
override protected val testBucketName = "test-iceberg-bucket"
29+
30+
private def icebergAvailable: Boolean = {
31+
try {
32+
Class.forName("org.apache.iceberg.catalog.Catalog")
33+
true
34+
} catch {
35+
case _: ClassNotFoundException => false
36+
}
37+
}
38+
39+
override protected def sparkConf: SparkConf = {
40+
val conf = super.sparkConf
41+
42+
conf.set("spark.sql.catalog.s3_catalog", "org.apache.iceberg.spark.SparkCatalog")
43+
conf.set("spark.sql.catalog.s3_catalog.type", "hadoop")
44+
conf.set("spark.sql.catalog.s3_catalog.warehouse", s"s3a://$testBucketName/warehouse")
45+
46+
conf.set(CometConf.COMET_ENABLED.key, "true")
47+
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
48+
conf.set(CometConf.COMET_ICEBERG_NATIVE_ENABLED.key, "true")
49+
50+
conf
51+
}
52+
53+
/** Collects all CometIcebergNativeScanExec nodes from a plan */
54+
private def collectIcebergNativeScans(plan: SparkPlan): Seq[CometIcebergNativeScanExec] = {
55+
collect(plan) { case scan: CometIcebergNativeScanExec =>
56+
scan
57+
}
58+
}
59+
60+
/**
61+
* Helper to verify query correctness and that exactly one CometIcebergNativeScanExec is used.
62+
*/
63+
private def checkIcebergNativeScan(query: String): Unit = {
64+
val (_, cometPlan) = checkSparkAnswer(query)
65+
val icebergScans = collectIcebergNativeScans(cometPlan)
66+
assert(
67+
icebergScans.length == 1,
68+
s"Expected exactly 1 CometIcebergNativeScanExec but found ${icebergScans.length}. Plan:\n$cometPlan")
69+
}
70+
71+
test("create and query simple Iceberg table from MinIO") {
72+
assume(icebergAvailable, "Iceberg not available in classpath")
73+
74+
spark.sql("""
75+
CREATE TABLE s3_catalog.db.simple_table (
76+
id INT,
77+
name STRING,
78+
value DOUBLE
79+
) USING iceberg
80+
""")
81+
82+
spark.sql("""
83+
INSERT INTO s3_catalog.db.simple_table
84+
VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)
85+
""")
86+
87+
checkIcebergNativeScan("SELECT * FROM s3_catalog.db.simple_table ORDER BY id")
88+
89+
spark.sql("DROP TABLE s3_catalog.db.simple_table")
90+
}
91+
92+
test("read partitioned Iceberg table from MinIO") {
93+
assume(icebergAvailable, "Iceberg not available in classpath")
94+
95+
spark.sql("""
96+
CREATE TABLE s3_catalog.db.partitioned_table (
97+
id INT,
98+
category STRING,
99+
value DOUBLE
100+
) USING iceberg
101+
PARTITIONED BY (category)
102+
""")
103+
104+
spark.sql("""
105+
INSERT INTO s3_catalog.db.partitioned_table VALUES
106+
(1, 'A', 10.5), (2, 'B', 20.3), (3, 'C', 30.7),
107+
(4, 'A', 15.2), (5, 'B', 25.8), (6, 'C', 35.0)
108+
""")
109+
110+
checkIcebergNativeScan("SELECT * FROM s3_catalog.db.partitioned_table ORDER BY id")
111+
checkIcebergNativeScan(
112+
"SELECT * FROM s3_catalog.db.partitioned_table WHERE category = 'A' ORDER BY id")
113+
114+
spark.sql("DROP TABLE s3_catalog.db.partitioned_table")
115+
}
116+
117+
test("filter pushdown to S3-backed Iceberg table") {
118+
assume(icebergAvailable, "Iceberg not available in classpath")
119+
120+
spark.sql("""
121+
CREATE TABLE s3_catalog.db.filter_test (
122+
id INT,
123+
name STRING,
124+
value DOUBLE
125+
) USING iceberg
126+
""")
127+
128+
spark.sql("""
129+
INSERT INTO s3_catalog.db.filter_test VALUES
130+
(1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7),
131+
(4, 'Diana', 15.2), (5, 'Eve', 25.8)
132+
""")
133+
134+
checkIcebergNativeScan("SELECT * FROM s3_catalog.db.filter_test WHERE id = 3")
135+
checkIcebergNativeScan("SELECT * FROM s3_catalog.db.filter_test WHERE value > 20.0")
136+
checkIcebergNativeScan("SELECT * FROM s3_catalog.db.filter_test WHERE name = 'Alice'")
137+
138+
spark.sql("DROP TABLE s3_catalog.db.filter_test")
139+
}
140+
141+
test("multiple files in S3 - verify no duplicates") {
142+
assume(icebergAvailable, "Iceberg not available in classpath")
143+
144+
withSQLConf("spark.sql.files.maxRecordsPerFile" -> "50") {
145+
spark.sql("""
146+
CREATE TABLE s3_catalog.db.multifile_test (
147+
id INT,
148+
data STRING
149+
) USING iceberg
150+
""")
151+
152+
spark.sql("""
153+
INSERT INTO s3_catalog.db.multifile_test
154+
SELECT id, CONCAT('data_', CAST(id AS STRING)) as data
155+
FROM range(200)
156+
""")
157+
158+
checkIcebergNativeScan("SELECT COUNT(DISTINCT id) FROM s3_catalog.db.multifile_test")
159+
checkIcebergNativeScan(
160+
"SELECT * FROM s3_catalog.db.multifile_test WHERE id < 10 ORDER BY id")
161+
162+
spark.sql("DROP TABLE s3_catalog.db.multifile_test")
163+
}
164+
}
165+
166+
test("MOR table with deletes in S3") {
167+
assume(icebergAvailable, "Iceberg not available in classpath")
168+
169+
spark.sql("""
170+
CREATE TABLE s3_catalog.db.mor_delete_test (
171+
id INT,
172+
name STRING,
173+
value DOUBLE
174+
) USING iceberg
175+
TBLPROPERTIES (
176+
'write.delete.mode' = 'merge-on-read',
177+
'write.merge.mode' = 'merge-on-read'
178+
)
179+
""")
180+
181+
spark.sql("""
182+
INSERT INTO s3_catalog.db.mor_delete_test VALUES
183+
(1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7),
184+
(4, 'Diana', 15.2), (5, 'Eve', 25.8)
185+
""")
186+
187+
spark.sql("DELETE FROM s3_catalog.db.mor_delete_test WHERE id IN (2, 4)")
188+
189+
checkIcebergNativeScan("SELECT * FROM s3_catalog.db.mor_delete_test ORDER BY id")
190+
191+
spark.sql("DROP TABLE s3_catalog.db.mor_delete_test")
192+
}
193+
}

0 commit comments

Comments
 (0)