Skip to content

Commit a51b281

Browse files
authored
feat: [iceberg] REST catalog support for CometNativeIcebergScan (#2895)
1 parent ca7e728 commit a51b281

File tree

9 files changed

+1407
-21
lines changed

9 files changed

+1407
-21
lines changed

spark/pom.xml

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ under the License.
173173
<groupId>software.amazon.awssdk</groupId>
174174
<artifactId>s3</artifactId>
175175
</dependency>
176-
<!-- Iceberg dependencies for testing native Iceberg scan -->
177-
<!-- Note: The specific Iceberg artifact is defined in profiles below based on Spark version -->
176+
<!-- Jetty and Iceberg dependencies for testing native Iceberg scan -->
177+
<!-- Note: The specific versions are defined in profiles below based on Spark version -->
178178
</dependencies>
179179

180180
<profiles>
@@ -188,6 +188,19 @@ under the License.
188188
<version>1.5.2</version>
189189
<scope>test</scope>
190190
</dependency>
191+
<!-- Jetty 9.4.x for Spark 3.4 (JDK 11, javax.* packages) -->
192+
<dependency>
193+
<groupId>org.eclipse.jetty</groupId>
194+
<artifactId>jetty-server</artifactId>
195+
<version>9.4.53.v20231009</version>
196+
<scope>test</scope>
197+
</dependency>
198+
<dependency>
199+
<groupId>org.eclipse.jetty</groupId>
200+
<artifactId>jetty-servlet</artifactId>
201+
<version>9.4.53.v20231009</version>
202+
<scope>test</scope>
203+
</dependency>
191204
</dependencies>
192205
</profile>
193206

@@ -203,6 +216,19 @@ under the License.
203216
<version>1.8.1</version>
204217
<scope>test</scope>
205218
</dependency>
219+
<!-- Jetty 9.4.x for Spark 3.5 (JDK 11, javax.* packages) -->
220+
<dependency>
221+
<groupId>org.eclipse.jetty</groupId>
222+
<artifactId>jetty-server</artifactId>
223+
<version>9.4.53.v20231009</version>
224+
<scope>test</scope>
225+
</dependency>
226+
<dependency>
227+
<groupId>org.eclipse.jetty</groupId>
228+
<artifactId>jetty-servlet</artifactId>
229+
<version>9.4.53.v20231009</version>
230+
<scope>test</scope>
231+
</dependency>
206232
</dependencies>
207233
</profile>
208234

@@ -215,6 +241,19 @@ under the License.
215241
<version>1.10.0</version>
216242
<scope>test</scope>
217243
</dependency>
244+
<!-- Jetty 11.x for Spark 4.0 (jakarta.servlet) -->
245+
<dependency>
246+
<groupId>org.eclipse.jetty</groupId>
247+
<artifactId>jetty-server</artifactId>
248+
<version>11.0.24</version>
249+
<scope>test</scope>
250+
</dependency>
251+
<dependency>
252+
<groupId>org.eclipse.jetty</groupId>
253+
<artifactId>jetty-servlet</artifactId>
254+
<version>11.0.24</version>
255+
<scope>test</scope>
256+
</dependency>
218257
</dependencies>
219258
</profile>
220259
</profiles>

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,11 @@ object IcebergReflection extends Logging {
204204
case _: NoSuchMethodException =>
205205
try {
206206
// If not directly available, access via operations/metadata
207-
val opsMethod = table.getClass.getMethod("operations")
207+
val opsMethod = table.getClass.getDeclaredMethod("operations")
208+
opsMethod.setAccessible(true)
208209
val ops = opsMethod.invoke(table)
209-
val currentMethod = ops.getClass.getMethod("current")
210+
val currentMethod = ops.getClass.getDeclaredMethod("current")
211+
currentMethod.setAccessible(true)
210212
val metadata = currentMethod.invoke(ops)
211213
val formatVersionMethod = metadata.getClass.getMethod("formatVersion")
212214
Some(formatVersionMethod.invoke(metadata).asInstanceOf[Int])
@@ -274,10 +276,12 @@ object IcebergReflection extends Logging {
274276
*/
275277
def getTableMetadata(table: Any): Option[Any] = {
276278
try {
277-
val operationsMethod = table.getClass.getMethod("operations")
279+
val operationsMethod = table.getClass.getDeclaredMethod("operations")
280+
operationsMethod.setAccessible(true)
278281
val operations = operationsMethod.invoke(table)
279282

280-
val currentMethod = operations.getClass.getMethod("current")
283+
val currentMethod = operations.getClass.getDeclaredMethod("current")
284+
currentMethod.setAccessible(true)
281285
Some(currentMethod.invoke(operations))
282286
} catch {
283287
case e: Exception =>

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -284,22 +284,56 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
284284
// Extract all Iceberg metadata once using reflection.
285285
// If any required reflection fails, this returns None, and we fall back to Spark.
286286
// First get metadataLocation and catalogProperties which are needed by the factory.
287-
val metadataLocationOpt = IcebergReflection
288-
.getTable(scanExec.scan)
289-
.flatMap(IcebergReflection.getMetadataLocation)
287+
val tableOpt = IcebergReflection.getTable(scanExec.scan)
288+
289+
val metadataLocationOpt = tableOpt.flatMap { table =>
290+
IcebergReflection.getMetadataLocation(table)
291+
}
290292

291293
val metadataOpt = metadataLocationOpt.flatMap { metadataLocation =>
292294
try {
293295
val session = org.apache.spark.sql.SparkSession.active
294296
val hadoopConf = session.sessionState.newHadoopConf()
297+
298+
// For REST catalogs, the metadata file may not exist on disk since metadata
299+
// is fetched via HTTP. Check if file exists; if not, use table location instead.
295300
val metadataUri = new java.net.URI(metadataLocation)
296-
val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, metadataUri)
301+
302+
val metadataFile = new java.io.File(metadataUri.getPath)
303+
304+
val effectiveLocation =
305+
if (!metadataFile.exists() && metadataUri.getScheme == "file") {
306+
// Metadata file doesn't exist (REST catalog with InMemoryFileIO or similar)
307+
// Use table location instead for FileIO initialization
308+
309+
tableOpt
310+
.flatMap { table =>
311+
try {
312+
val locationMethod = table.getClass.getMethod("location")
313+
val tableLocation = locationMethod.invoke(table).asInstanceOf[String]
314+
Some(tableLocation)
315+
} catch {
316+
case _: Exception =>
317+
Some(metadataLocation)
318+
}
319+
}
320+
.getOrElse(metadataLocation)
321+
} else {
322+
metadataLocation
323+
}
324+
325+
val effectiveUri = new java.net.URI(effectiveLocation)
326+
327+
val hadoopS3Options = NativeConfig.extractObjectStoreOptions(hadoopConf, effectiveUri)
328+
297329
val catalogProperties =
298330
org.apache.comet.serde.operator.CometIcebergNativeScan
299331
.hadoopToIcebergS3Properties(hadoopS3Options)
300332

301-
CometIcebergNativeScanMetadata
302-
.extract(scanExec.scan, metadataLocation, catalogProperties)
333+
val result = CometIcebergNativeScanMetadata
334+
.extract(scanExec.scan, effectiveLocation, catalogProperties)
335+
336+
result
303337
} catch {
304338
case e: Exception =>
305339
logError(
@@ -319,21 +353,18 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
319353

320354
// Now perform all validation using the pre-extracted metadata
321355
// Check if table uses a FileIO implementation compatible with iceberg-rust
356+
322357
val fileIOCompatible = IcebergReflection.getFileIO(metadata.table) match {
323-
case Some(fileIO) =>
324-
val fileIOClassName = fileIO.getClass.getName
325-
if (fileIOClassName == "org.apache.iceberg.inmemory.InMemoryFileIO") {
326-
fallbackReasons += "Comet does not support InMemoryFileIO table locations"
327-
false
328-
} else {
329-
true
330-
}
358+
case Some(_) =>
359+
// InMemoryFileIO is now supported with table location fallback for REST catalogs
360+
true
331361
case None =>
332362
fallbackReasons += "Could not check FileIO compatibility"
333363
false
334364
}
335365

336366
// Check Iceberg table format version
367+
337368
val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match {
338369
case Some(formatVersion) =>
339370
if (formatVersion > 2) {

0 commit comments

Comments
 (0)