Skip to content

Commit 1df0e9a

Browse files
Fix Unity Catalog Creds and Iceberg REST endpoint construction (delta-io#5904)
## Summary This PR fixes Unity Catalog credential detection and refactors Iceberg REST catalog endpoint construction to properly implement the Iceberg REST catalog specification. **Key Changes:** - Fix credential detection to recognize Unity Catalog's `option.fs.*` credential properties - Refactor `/v1/config` endpoint handling to follow Iceberg REST catalog spec - Move prefix discovery logic from `UnityCatalogMetadata` to `IcebergRESTCatalogPlanningClient` - Improve robustness with trailing slash handling and HTTP client reuse - More test coverage with explicit path verification - Fix module separation issues and improve code organization ## Testing Tests now explicitly verify: - Correct `/v1/config` endpoint calls with `warehouse` query parameter - Proper prefix application in `/plan` request paths - Behavior when no prefix is returned (uses baseUri directly per Iceberg REST spec) - Token handling consistency between tests and production - Trailing slash handling robustness --- ## Compatibility This is an internal refactoring with no changes to public APIs or user-facing behavior. The changes: - Improve correctness of credential detection for Unity Catalog - Fix internal endpoint construction to follow Iceberg REST spec - Maintain backward compatibility with existing Unity Catalog configurations --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 37fff90 commit 1df0e9a

File tree

8 files changed

+170
-175
lines changed

8 files changed

+170
-175
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/IcebergRESTCatalogPlanningClient.scala

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Locale
2323
import scala.jdk.CollectionConverters._
2424
import scala.util.Try
2525

26-
import org.apache.http.client.methods.HttpPost
26+
import org.apache.http.client.methods.{HttpGet, HttpPost}
2727
import org.apache.http.entity.{ContentType, StringEntity}
2828
import org.apache.http.util.EntityUtils
2929
import org.apache.http.{HttpHeaders, HttpStatus}
@@ -39,6 +39,15 @@ import shadedForDelta.org.apache.iceberg.PartitionSpec
3939
import shadedForDelta.org.apache.iceberg.rest.requests.{PlanTableScanRequest, PlanTableScanRequestParser}
4040
import shadedForDelta.org.apache.iceberg.rest.responses.PlanTableScanResponse
4141

42+
/**
43+
* Case class for parsing Iceberg REST catalog /v1/config response.
44+
* Per the Iceberg REST spec, the config endpoint returns defaults and overrides.
45+
* The optional "prefix" in overrides is used for multi-tenant catalog paths.
46+
*/
47+
private case class CatalogConfigResponse(
48+
defaults: Map[String, String],
49+
overrides: Map[String, String])
50+
4251
/**
4352
* Iceberg REST implementation of ServerSidePlanningClient that calls Iceberg REST catalog server.
4453
*
@@ -49,29 +58,71 @@ import shadedForDelta.org.apache.iceberg.rest.responses.PlanTableScanResponse
4958
* Thread safety: This class creates a shared HTTP client that is thread-safe for concurrent
5059
* requests. The HTTP client should be explicitly closed by calling close() when done.
5160
*
52-
* @param icebergRestCatalogUriRoot Base URI of the Iceberg REST catalog server, e.g.,
53-
* "http://localhost:8181". Should not include trailing slash
54-
* or "/v1" prefix.
61+
* @param baseUriRaw Base URI of the Iceberg REST catalog up to /v1, e.g.,
62+
* "http://<catalog-URL>/iceberg/v1". Trailing slashes are handled automatically.
63+
* @param catalogName Name of the catalog for config endpoint query parameter.
5564
* @param token Authentication token for the catalog server.
5665
*/
5766
class IcebergRESTCatalogPlanningClient(
58-
icebergRestCatalogUriRoot: String,
67+
baseUriRaw: String,
68+
catalogName: String,
5969
token: String) extends ServerSidePlanningClient with AutoCloseable {
6070

71+
// Normalize baseUri to handle trailing slashes
72+
private val baseUri = baseUriRaw.stripSuffix("/")
73+
6174
// Sentinel value indicating "use current snapshot" in Iceberg REST API
6275
private val CURRENT_SNAPSHOT_ID = 0L
6376

6477
// Partition spec ID for unpartitioned tables
6578
private val UNPARTITIONED_SPEC_ID = 0
6679

80+
/**
81+
* Lazily fetch the catalog configuration and construct the endpoint URI root.
82+
* Calls /v1/config?warehouse=<catalogName> per Iceberg REST catalog spec to get the prefix.
83+
* If no prefix is returned, uses baseUri directly without any prefix per Iceberg spec.
84+
*/
85+
private lazy val icebergRestCatalogUriRoot: String = {
86+
fetchCatalogPrefix() match {
87+
case Some(prefix) => s"$baseUri/$prefix"
88+
case None => baseUri
89+
}
90+
}
91+
92+
/**
93+
* Fetch catalog prefix from /v1/config endpoint per Iceberg REST catalog spec.
94+
* Returns None on any error or if no prefix is defined in the config.
95+
*/
96+
private def fetchCatalogPrefix(): Option[String] = {
97+
try {
98+
val configUri = s"$baseUri/config?warehouse=$catalogName"
99+
val httpGet = new HttpGet(configUri)
100+
val response = httpClient.execute(httpGet)
101+
try {
102+
if (response.getStatusLine.getStatusCode == HttpStatus.SC_OK) {
103+
val body = EntityUtils.toString(response.getEntity)
104+
val config = JsonUtils.fromJson[CatalogConfigResponse](body)
105+
// Apply overrides on top of defaults per Iceberg REST spec
106+
config.overrides.get("prefix").orElse(config.defaults.get("prefix"))
107+
} else {
108+
None
109+
}
110+
} finally {
111+
response.close()
112+
}
113+
} catch {
114+
case _: Exception => None
115+
}
116+
}
117+
67118
private val httpHeaders = {
68119
val baseHeaders = Map(
69120
HttpHeaders.ACCEPT -> ContentType.APPLICATION_JSON.getMimeType,
70121
HttpHeaders.CONTENT_TYPE -> ContentType.APPLICATION_JSON.getMimeType,
71122
HttpHeaders.USER_AGENT -> buildUserAgent()
72123
)
73124
// Add Bearer token authentication if token is provided
74-
val headersWithAuth = if (token != null && token.nonEmpty) {
125+
val headersWithAuth = if (token.nonEmpty) {
75126
baseHeaders + (HttpHeaders.AUTHORIZATION -> s"Bearer $token")
76127
} else {
77128
baseHeaders
@@ -186,13 +237,11 @@ class IcebergRESTCatalogPlanningClient(
186237
sparkProjectionOption: Option[Seq[String]] = None,
187238
sparkLimitOption: Option[Int] = None): ScanPlan = {
188239
// Construct the /plan endpoint URI. For Unity Catalog tables, the
189-
// icebergRestCatalogUriRoot is constructed by UnityCatalogMetadata which calls
190-
// /v1/config to get the optional prefix and builds the proper endpoint
191-
// (e.g., {ucUri}/api/2.1/unity-catalog/iceberg-rest/v1/{prefix}).
192-
// For other catalogs, the endpoint is passed directly via metadata.
240+
// Call /v1/config to get the catalog prefix, then construct the full endpoint.
241+
// icebergRestCatalogUriRoot is lazily constructed as: {baseUri}/{prefix}
242+
// where prefix comes from /v1/config?warehouse=<catalogName> per Iceberg REST spec.
193243
// See: https://iceberg.apache.org/rest-catalog-spec/
194-
val planTableScanUri =
195-
s"$icebergRestCatalogUriRoot/v1/namespaces/$database/tables/$table/plan"
244+
val planTableScanUri = s"$icebergRestCatalogUriRoot/namespaces/$database/tables/$table/plan"
196245

197246
// Request planning for current snapshot. snapshotId = 0 means "use current snapshot"
198247
// in the Iceberg REST API spec. Time-travel queries are not yet supported.

iceberg/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/IcebergRESTCatalogPlanningClientFactory.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ class IcebergRESTCatalogPlanningClientFactory extends ServerSidePlanningClientFa
2727
spark: SparkSession,
2828
metadata: ServerSidePlanningMetadata): ServerSidePlanningClient = {
2929

30-
val endpointUri = metadata.planningEndpointUri
30+
val baseUri = metadata.planningEndpointUri
3131
val token = metadata.authToken.getOrElse("")
32+
val catalogName = metadata.catalogName
3233

33-
new IcebergRESTCatalogPlanningClient(endpointUri, token)
34+
new IcebergRESTCatalogPlanningClient(baseUri, catalogName, token)
3435
}
3536
}

iceberg/src/test/java/shadedForDelta/org/apache/iceberg/rest/IcebergRESTCatalogAdapterWithPlanSupport.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class IcebergRESTCatalogAdapterWithPlanSupport extends RESTCatalogAdapter {
6666
// Static field for test credential injection - credentials to inject into /plan responses
6767
// Volatile is used to guarantee correct cross-thread access (test thread and Jetty server thread).
6868
private static volatile Map<String, String> testCredentials = null;
69+
70+
// Static field to capture the request path of /plan requests for test verification
71+
// Volatile is used to guarantee correct cross-thread access (test thread and Jetty server thread).
72+
private static volatile String capturedPlanRequestPath = null;
6973

7074
IcebergRESTCatalogAdapterWithPlanSupport(Catalog catalog) {
7175
super(catalog);
@@ -125,6 +129,14 @@ static Boolean getCapturedCaseSensitive() {
125129
return capturedCaseSensitive;
126130
}
127131

132+
/**
133+
* Get the request path captured from the most recent /plan request.
134+
* Package-private for test access.
135+
*/
136+
static String getCapturedPlanRequestPath() {
137+
return capturedPlanRequestPath;
138+
}
139+
128140
/**
129141
* Set test credentials to inject into /plan responses.
130142
* Package-private for test access.
@@ -153,6 +165,7 @@ static void clearCaptured() {
153165
capturedMinRowsRequested = null;
154166
capturedCaseSensitive = null;
155167
testCredentials = null;
168+
capturedPlanRequestPath = null;
156169
}
157170

158171
@Override
@@ -166,6 +179,7 @@ protected <T extends RESTResponse> T execute(
166179

167180
// Intercept /plan requests before they reach the base adapter
168181
if (isPlanTableScanRequest(request)) {
182+
capturedPlanRequestPath = request.path(); // Capture the path for test verification
169183
try {
170184
PlanTableScanResponse response = handlePlanTableScan(request, parserContext);
171185
return (T) response;

iceberg/src/test/java/shadedForDelta/org/apache/iceberg/rest/IcebergRESTServer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,14 @@ public Boolean getCapturedCaseSensitive() {
186186
return IcebergRESTCatalogAdapterWithPlanSupport.getCapturedCaseSensitive();
187187
}
188188

189+
/**
190+
* Get the request path captured from the most recent /plan request.
191+
* Delegates to adapter. For test verification of endpoint construction.
192+
*/
193+
public String getCapturedPlanRequestPath() {
194+
return IcebergRESTCatalogAdapterWithPlanSupport.getCapturedPlanRequestPath();
195+
}
196+
189197
/**
190198
* Set test credentials to inject into /plan responses.
191199
* Used for testing credential extraction in clients.

iceberg/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/IcebergRESTCatalogPlanningClientSuite.scala

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
8383
// Tests that the REST /plan endpoint returns 0 files for an empty table.
8484
test("basic plan table scan via IcebergRESTCatalogPlanningClient") {
8585
withTempTable("testTable") { table =>
86-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
86+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
8787
try {
8888
val scanPlan = client.planScan(defaultNamespace.toString, "testTable")
8989
assert(scanPlan != null, "Scan plan should not be null")
@@ -111,7 +111,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
111111
.map(row => (new Path(row.getString(0)).getName, row.getLong(1)))
112112
.toMap
113113

114-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
114+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
115115
try {
116116
val scanPlan = client.planScan(defaultNamespace.toString, "tableWithData")
117117
assert(scanPlan != null, "Scan plan should not be null")
@@ -143,49 +143,69 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
143143
// This will test the client's partition validation logic at
144144
// IcebergRESTCatalogPlanningClient:160-164
145145

146-
test("UnityCatalogMetadata uses prefix from /v1/config endpoint") {
147-
import org.apache.spark.sql.delta.serverSidePlanning.UnityCatalogMetadata
146+
test("IcebergRESTCatalogPlanningClient uses prefix from /v1/config endpoint") {
147+
server.clearCaptured() // Clear any previous state
148148

149-
// Configure server to return prefix
150-
server.setCatalogPrefix("catalogs/test-catalog")
149+
server.setCatalogPrefix("catalogs/test-catalog-prefix")
151150

152-
val metadata = UnityCatalogMetadata(
153-
catalogName = "test_catalog",
154-
ucUri = serverUri,
155-
ucToken = "test-token",
156-
tableProps = Map.empty)
151+
withTempTable("testTable") { table =>
152+
// Client expects baseUri to include the /v1 path (per Iceberg REST spec)
153+
val client = new IcebergRESTCatalogPlanningClient(s"$serverUri/v1", "test_catalog", "")
154+
try {
155+
// Make a call that will trigger the lazy initialization of icebergRestCatalogUriRoot
156+
// which internally calls fetchCatalogPrefix()
157+
val scanPlan = client.planScan(defaultNamespace.toString, "testTable")
158+
assert(scanPlan != null, "Scan plan should not be null")
157159

158-
// Verify endpoint includes prefix from /v1/config response
159-
val expectedEndpoint = s"$serverUri/api/2.1/unity-catalog/iceberg-rest/v1/catalogs/test-catalog"
160-
assert(
161-
metadata.planningEndpointUri == expectedEndpoint,
162-
s"Expected endpoint to include prefix: ${metadata.planningEndpointUri}")
160+
// Verify the server received a /plan request with the correct prefix
161+
// This confirms that the config endpoint returned the correct prefix and that the client
162+
// correctly constructed the full plan request path.
163+
val capturedPath = server.getCapturedPlanRequestPath()
164+
assert(capturedPath != null, "Server should have captured the request path")
165+
assert(capturedPath.startsWith("v1/catalogs/test-catalog-prefix/"),
166+
s"Expected path to start with 'v1/catalogs/test-catalog-prefix/' but got: $capturedPath")
167+
} finally {
168+
client.close()
169+
}
170+
}
163171
}
164172

165-
test("UnityCatalogMetadata falls back when /v1/config returns no prefix") {
166-
import org.apache.spark.sql.delta.serverSidePlanning.UnityCatalogMetadata
173+
test("IcebergRESTCatalogPlanningClient uses baseUri directly when /v1/config returns no prefix") {
174+
server.clearCaptured() // Clear any previous state
167175

168-
// Configure server to return no prefix (fallback case)
176+
// Configure server to return no prefix
169177
server.setCatalogPrefix(null)
170178

171-
val metadata = UnityCatalogMetadata(
172-
catalogName = "test_catalog",
173-
ucUri = serverUri,
174-
ucToken = "test-token",
175-
tableProps = Map.empty)
176-
177-
// Verify endpoint uses simple path without prefix
178-
val expectedEndpoint = s"$serverUri/api/2.1/unity-catalog/iceberg-rest"
179-
assert(
180-
metadata.planningEndpointUri == expectedEndpoint,
181-
s"Expected endpoint without prefix: ${metadata.planningEndpointUri}")
179+
withTempTable("testTable") { table =>
180+
// Client expects baseUri to include the /v1 path (per Iceberg REST spec)
181+
val client = new IcebergRESTCatalogPlanningClient(s"$serverUri/v1", "test_catalog", "")
182+
try {
183+
// Make a call that will trigger the lazy initialization
184+
val scanPlan = client.planScan(defaultNamespace.toString, "testTable")
185+
assert(scanPlan != null, "Scan plan should not be null")
186+
187+
// Verify the server received a /plan request using baseUri directly (no prefix)
188+
val capturedPath = server.getCapturedPlanRequestPath()
189+
assert(capturedPath != null, "Server should have captured the request path")
190+
// When no prefix is returned, use baseUri directly without adding prefix
191+
assert(
192+
!capturedPath.contains("catalogs/"),
193+
s"Expected path to NOT contain 'catalogs/' when no prefix, but got: $capturedPath")
194+
assert(
195+
capturedPath.startsWith("v1/namespaces/"),
196+
s"Expected path to start with 'v1/namespaces/' (using baseUri directly), but got: " +
197+
s"$capturedPath")
198+
} finally {
199+
client.close()
200+
}
201+
}
182202
}
183203

184204
test("filter sent to IRC server over HTTP") {
185205
withTempTable("filterTest") { table =>
186206
populateTestData(s"rest_catalog.${defaultNamespace}.filterTest")
187207

188-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
208+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
189209
try {
190210
val testCases = Seq(
191211
(EqualTo("longCol", 2L), "EqualTo numeric (long)"),
@@ -295,7 +315,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
295315
Set("`address.city`"))
296316
)
297317

298-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
318+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
299319
try {
300320
testCases.foreach { testCase =>
301321
// Clear previous captured projection
@@ -328,7 +348,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
328348
val tableName = s"rest_catalog.${defaultNamespace}.limitTest"
329349
populateTestData(tableName)
330350

331-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
351+
val client = new IcebergRESTCatalogPlanningClient(serverUri, null, "")
332352
try {
333353
// Test different limit values
334354
val testCases = Seq(
@@ -363,7 +383,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
363383
val tableName = s"rest_catalog.${defaultNamespace}.filterProjectionLimitTest"
364384
populateTestData(tableName)
365385

366-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
386+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
367387
try {
368388
// Note: Filter types are already tested in "filter sent to IRC server" test.
369389
// Here we verify filter, projection, AND limit are sent together correctly.
@@ -441,7 +461,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
441461
withTempTable("caseSensitiveTest") { table =>
442462
populateTestData(s"rest_catalog.${defaultNamespace}.caseSensitiveTest")
443463

444-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
464+
val client = new IcebergRESTCatalogPlanningClient(serverUri, null, "")
445465
try {
446466
server.clearCaptured()
447467

@@ -472,7 +492,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
472492
withTempTable("credentialsTest") { table =>
473493
populateTestData(s"rest_catalog.${defaultNamespace}.credentialsTest")
474494

475-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
495+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
476496
try {
477497
// Test cases for all three cloud providers
478498
val testCases = Seq(
@@ -531,7 +551,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
531551
withTempTable("noCredentialsTest") { table =>
532552
populateTestData(s"rest_catalog.${defaultNamespace}.noCredentialsTest")
533553

534-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
554+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
535555
try {
536556
// Don't configure any credentials (current default behavior)
537557
val scanPlan = client.planScan(defaultNamespace.toString, "noCredentialsTest")
@@ -549,7 +569,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
549569
withTempTable("incompleteCredsTest") { table =>
550570
populateTestData(s"rest_catalog.${defaultNamespace}.incompleteCredsTest")
551571

552-
val client = new IcebergRESTCatalogPlanningClient(serverUri, null)
572+
val client = new IcebergRESTCatalogPlanningClient(serverUri, "test_catalog", "")
553573
try {
554574
// Test cases for incomplete credentials that should throw errors
555575
val errorTestCases = Seq(
@@ -630,7 +650,7 @@ class IcebergRESTCatalogPlanningClientSuite extends QueryTest with SharedSparkSe
630650
}
631651

632652
test("User-Agent header format") {
633-
val client = new IcebergRESTCatalogPlanningClient("http://localhost:8080", null)
653+
val client = new IcebergRESTCatalogPlanningClient("http://localhost:8080", "test_catalog", "")
634654
try {
635655
val userAgent = client.getUserAgent()
636656

0 commit comments

Comments
 (0)