Skip to content

Commit 35d66a3

Browse files
authored
Core: Support Custom Table/View Operations in RESTCatalog (apache#14465)
1 parent da76b87 commit 35d66a3

File tree

4 files changed

+383
-9
lines changed

4 files changed

+383
-9
lines changed

core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,27 @@ public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
6969
public RESTCatalog(
7070
SessionCatalog.SessionContext context,
7171
Function<Map<String, String>, RESTClient> clientBuilder) {
72-
this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
72+
this.sessionCatalog = newSessionCatalog(clientBuilder);
7373
this.delegate = sessionCatalog.asCatalog(context);
7474
this.nsDelegate = (SupportsNamespaces) delegate;
7575
this.context = context;
7676
this.viewSessionCatalog = sessionCatalog.asViewCatalog(context);
7777
}
7878

79+
/**
80+
* Create a new {@link RESTSessionCatalog} instance.
81+
*
82+
* <p>This method can be overridden in subclasses to provide custom {@link RESTSessionCatalog}
83+
* implementations.
84+
*
85+
* @param clientBuilder a function to build REST clients
86+
* @return a new RESTSessionCatalog instance
87+
*/
88+
protected RESTSessionCatalog newSessionCatalog(
89+
Function<Map<String, String>, RESTClient> clientBuilder) {
90+
return new RESTSessionCatalog(clientBuilder, null);
91+
}
92+
7993
@Override
8094
public void initialize(String name, Map<String, String> props) {
8195
Preconditions.checkArgument(props != null, "Invalid configuration: null");

core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Set;
2828
import java.util.function.BiFunction;
2929
import java.util.function.Function;
30+
import java.util.function.Supplier;
3031
import java.util.stream.Collectors;
3132
import org.apache.iceberg.BaseTable;
3233
import org.apache.iceberg.CatalogProperties;
@@ -450,7 +451,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
450451

451452
RESTClient tableClient = client.withAuthSession(tableSession);
452453
RESTTableOperations ops =
453-
new RESTTableOperations(
454+
newTableOps(
454455
tableClient,
455456
paths.table(finalIdentifier),
456457
Map::of,
@@ -529,7 +530,7 @@ public Table registerTable(
529530
AuthSession tableSession = authManager.tableSession(ident, tableConf, contextualSession);
530531
RESTClient tableClient = client.withAuthSession(tableSession);
531532
RESTTableOperations ops =
532-
new RESTTableOperations(
533+
newTableOps(
533534
tableClient,
534535
paths.table(ident),
535536
Map::of,
@@ -788,7 +789,7 @@ public Table create() {
788789
AuthSession tableSession = authManager.tableSession(ident, tableConf, contextualSession);
789790
RESTClient tableClient = client.withAuthSession(tableSession);
790791
RESTTableOperations ops =
791-
new RESTTableOperations(
792+
newTableOps(
792793
tableClient,
793794
paths.table(ident),
794795
Map::of,
@@ -815,7 +816,7 @@ public Transaction createTransaction() {
815816

816817
RESTClient tableClient = client.withAuthSession(tableSession);
817818
RESTTableOperations ops =
818-
new RESTTableOperations(
819+
newTableOps(
819820
tableClient,
820821
paths.table(ident),
821822
Map::of,
@@ -878,7 +879,7 @@ public Transaction replaceTransaction() {
878879

879880
RESTClient tableClient = client.withAuthSession(tableSession);
880881
RESTTableOperations ops =
881-
new RESTTableOperations(
882+
newTableOps(
882883
tableClient,
883884
paths.table(ident),
884885
Map::of,
@@ -1010,6 +1011,82 @@ private FileIO tableFileIO(
10101011
return newFileIO(context, fullConf, storageCredentials);
10111012
}
10121013

1014+
/**
1015+
* Create a new {@link RESTTableOperations} instance for simple table operations.
1016+
*
1017+
* <p>This method can be overridden in subclasses to provide custom table operations
1018+
* implementations.
1019+
*
1020+
* @param restClient the REST client to use for communicating with the catalog server
1021+
* @param path the REST path for the table
1022+
* @param headers a supplier for additional HTTP headers to include in requests
1023+
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
1024+
* @param current the current table metadata
1025+
* @param supportedEndpoints the set of supported REST endpoints
1026+
* @return a new RESTTableOperations instance
1027+
*/
1028+
protected RESTTableOperations newTableOps(
1029+
RESTClient restClient,
1030+
String path,
1031+
Supplier<Map<String, String>> headers,
1032+
FileIO fileIO,
1033+
TableMetadata current,
1034+
Set<Endpoint> supportedEndpoints) {
1035+
return new RESTTableOperations(restClient, path, headers, fileIO, current, supportedEndpoints);
1036+
}
1037+
1038+
/**
1039+
* Create a new {@link RESTTableOperations} instance for transaction-based operations (create or
1040+
* replace).
1041+
*
1042+
* <p>This method can be overridden in subclasses to provide custom table operations
1043+
* implementations for transaction-based operations.
1044+
*
1045+
* @param restClient the REST client to use for communicating with the catalog server
1046+
* @param path the REST path for the table
1047+
* @param headers a supplier for additional HTTP headers to include in requests
1048+
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
1049+
* @param updateType the {@link RESTTableOperations.UpdateType} being performed
1050+
* @param createChanges the list of metadata updates to apply during table creation or replacement
1051+
* @param current the current table metadata (may be null for CREATE operations)
1052+
* @param supportedEndpoints the set of supported REST endpoints
1053+
* @return a new RESTTableOperations instance
1054+
*/
1055+
protected RESTTableOperations newTableOps(
1056+
RESTClient restClient,
1057+
String path,
1058+
Supplier<Map<String, String>> headers,
1059+
FileIO fileIO,
1060+
RESTTableOperations.UpdateType updateType,
1061+
List<MetadataUpdate> createChanges,
1062+
TableMetadata current,
1063+
Set<Endpoint> supportedEndpoints) {
1064+
return new RESTTableOperations(
1065+
restClient, path, headers, fileIO, updateType, createChanges, current, supportedEndpoints);
1066+
}
1067+
1068+
/**
1069+
* Create a new {@link RESTViewOperations} instance.
1070+
*
1071+
* <p>This method can be overridden in subclasses to provide custom view operations
1072+
* implementations.
1073+
*
1074+
* @param restClient the REST client to use for communicating with the catalog server
1075+
* @param path the REST path for the view
1076+
* @param headers a supplier for additional HTTP headers to include in requests
1077+
* @param current the current view metadata
1078+
* @param supportedEndpoints the set of supported REST endpoints
1079+
* @return a new RESTViewOperations instance
1080+
*/
1081+
protected RESTViewOperations newViewOps(
1082+
RESTClient restClient,
1083+
String path,
1084+
Supplier<Map<String, String>> headers,
1085+
ViewMetadata current,
1086+
Set<Endpoint> supportedEndpoints) {
1087+
return new RESTViewOperations(restClient, path, headers, current, supportedEndpoints);
1088+
}
1089+
10131090
private static ConfigResponse fetchConfig(
10141091
RESTClient client, AuthSession initialAuth, Map<String, String> properties) {
10151092
// send the client's warehouse location to the service to keep in sync
@@ -1154,7 +1231,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) {
11541231
ViewMetadata metadata = response.metadata();
11551232

11561233
RESTViewOperations ops =
1157-
new RESTViewOperations(
1234+
newViewOps(
11581235
client.withAuthSession(tableSession),
11591236
paths.view(identifier),
11601237
Map::of,
@@ -1333,7 +1410,7 @@ public View create() {
13331410
Map<String, String> tableConf = response.config();
13341411
AuthSession tableSession = authManager.tableSession(identifier, tableConf, contextualSession);
13351412
RESTViewOperations ops =
1336-
new RESTViewOperations(
1413+
newViewOps(
13371414
client.withAuthSession(tableSession),
13381415
paths.view(identifier),
13391416
Map::of,
@@ -1424,7 +1501,7 @@ private View replace(LoadViewResponse response) {
14241501
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
14251502
AuthSession tableSession = authManager.tableSession(identifier, tableConf, contextualSession);
14261503
RESTViewOperations ops =
1427-
new RESTViewOperations(
1504+
newViewOps(
14281505
client.withAuthSession(tableSession),
14291506
paths.view(identifier),
14301507
Map::of,

core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,15 @@
4141
import java.util.Map;
4242
import java.util.Objects;
4343
import java.util.Optional;
44+
import java.util.Set;
4445
import java.util.UUID;
4546
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicBoolean;
48+
import java.util.concurrent.atomic.AtomicReference;
49+
import java.util.function.BiFunction;
4650
import java.util.function.Consumer;
51+
import java.util.function.Function;
52+
import java.util.function.Supplier;
4753
import org.apache.hadoop.conf.Configuration;
4854
import org.apache.http.HttpHeaders;
4955
import org.apache.iceberg.BaseTable;
@@ -72,6 +78,7 @@
7278
import org.apache.iceberg.exceptions.ServiceFailureException;
7379
import org.apache.iceberg.expressions.Expressions;
7480
import org.apache.iceberg.inmemory.InMemoryCatalog;
81+
import org.apache.iceberg.io.FileIO;
7582
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
7683
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
7784
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -3107,6 +3114,153 @@ public void testCommitStateUnknownNotReconciled() {
31073114
.satisfies(ex -> assertThat(((CommitStateUnknownException) ex).getSuppressed()).isEmpty());
31083115
}
31093116

3117+
@Test
3118+
public void testCustomTableOperationsInjection() throws IOException {
3119+
AtomicBoolean customTableOpsCalled = new AtomicBoolean();
3120+
AtomicBoolean customTransactionTableOpsCalled = new AtomicBoolean();
3121+
AtomicReference<RESTTableOperations> capturedOps = new AtomicReference<>();
3122+
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
3123+
Map<String, String> customHeaders =
3124+
ImmutableMap.of("X-Custom-Table-Header", "custom-value-12345");
3125+
3126+
// Custom RESTTableOperations that adds a custom header
3127+
class CustomRESTTableOperations extends RESTTableOperations {
3128+
CustomRESTTableOperations(
3129+
RESTClient client,
3130+
String path,
3131+
Supplier<Map<String, String>> headers,
3132+
FileIO fileIO,
3133+
TableMetadata current,
3134+
Set<Endpoint> supportedEndpoints) {
3135+
super(client, path, () -> customHeaders, fileIO, current, supportedEndpoints);
3136+
customTableOpsCalled.set(true);
3137+
}
3138+
3139+
CustomRESTTableOperations(
3140+
RESTClient client,
3141+
String path,
3142+
Supplier<Map<String, String>> headers,
3143+
FileIO fileIO,
3144+
RESTTableOperations.UpdateType updateType,
3145+
List<MetadataUpdate> createChanges,
3146+
TableMetadata current,
3147+
Set<Endpoint> supportedEndpoints) {
3148+
super(
3149+
client,
3150+
path,
3151+
() -> customHeaders,
3152+
fileIO,
3153+
updateType,
3154+
createChanges,
3155+
current,
3156+
supportedEndpoints);
3157+
customTransactionTableOpsCalled.set(true);
3158+
}
3159+
}
3160+
3161+
// Custom RESTSessionCatalog that overrides table operations creation
3162+
class CustomRESTSessionCatalog extends RESTSessionCatalog {
3163+
CustomRESTSessionCatalog(
3164+
Function<Map<String, String>, RESTClient> clientBuilder,
3165+
BiFunction<SessionCatalog.SessionContext, Map<String, String>, FileIO> ioBuilder) {
3166+
super(clientBuilder, ioBuilder);
3167+
}
3168+
3169+
@Override
3170+
protected RESTTableOperations newTableOps(
3171+
RESTClient restClient,
3172+
String path,
3173+
Supplier<Map<String, String>> headers,
3174+
FileIO fileIO,
3175+
TableMetadata current,
3176+
Set<Endpoint> supportedEndpoints) {
3177+
RESTTableOperations ops =
3178+
new CustomRESTTableOperations(
3179+
restClient, path, headers, fileIO, current, supportedEndpoints);
3180+
RESTTableOperations spy = Mockito.spy(ops);
3181+
capturedOps.set(spy);
3182+
return spy;
3183+
}
3184+
3185+
@Override
3186+
protected RESTTableOperations newTableOps(
3187+
RESTClient restClient,
3188+
String path,
3189+
Supplier<Map<String, String>> headers,
3190+
FileIO fileIO,
3191+
RESTTableOperations.UpdateType updateType,
3192+
List<MetadataUpdate> createChanges,
3193+
TableMetadata current,
3194+
Set<Endpoint> supportedEndpoints) {
3195+
RESTTableOperations ops =
3196+
new CustomRESTTableOperations(
3197+
restClient,
3198+
path,
3199+
headers,
3200+
fileIO,
3201+
updateType,
3202+
createChanges,
3203+
current,
3204+
supportedEndpoints);
3205+
RESTTableOperations spy = Mockito.spy(ops);
3206+
capturedOps.set(spy);
3207+
return spy;
3208+
}
3209+
}
3210+
3211+
try (RESTCatalog catalog =
3212+
catalog(adapter, clientBuilder -> new CustomRESTSessionCatalog(clientBuilder, null))) {
3213+
catalog.createNamespace(NS);
3214+
3215+
// Test table operations without UpdateType
3216+
assertThat(customTableOpsCalled).isFalse();
3217+
assertThat(customTransactionTableOpsCalled).isFalse();
3218+
Table table = catalog.createTable(TABLE, SCHEMA);
3219+
assertThat(customTableOpsCalled).isTrue();
3220+
assertThat(customTransactionTableOpsCalled).isFalse();
3221+
3222+
// Trigger a commit through the custom operations
3223+
table.updateProperties().set("test-key", "test-value").commit();
3224+
3225+
// Verify the custom operations object was created and used
3226+
assertThat(capturedOps.get()).isNotNull();
3227+
Mockito.verify(capturedOps.get(), Mockito.atLeastOnce()).current();
3228+
Mockito.verify(capturedOps.get(), Mockito.atLeastOnce()).commit(any(), any());
3229+
3230+
// Verify the custom operations were used with custom headers
3231+
Mockito.verify(adapter, Mockito.atLeastOnce())
3232+
.execute(
3233+
reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE), customHeaders),
3234+
eq(LoadTableResponse.class),
3235+
any(),
3236+
any());
3237+
3238+
// Test table operations with UpdateType and createChanges
3239+
capturedOps.set(null);
3240+
customTableOpsCalled.set(false);
3241+
TableIdentifier table2 = TableIdentifier.of(NS, "table2");
3242+
catalog.buildTable(table2, SCHEMA).createTransaction().commitTransaction();
3243+
assertThat(customTableOpsCalled).isFalse();
3244+
assertThat(customTransactionTableOpsCalled).isTrue();
3245+
3246+
// Trigger another commit to verify transaction operations also work
3247+
catalog.loadTable(table2).updateProperties().set("test-key-2", "test-value-2").commit();
3248+
3249+
// Verify the custom operations object was created and used
3250+
assertThat(capturedOps.get()).isNotNull();
3251+
Mockito.verify(capturedOps.get(), Mockito.atLeastOnce()).current();
3252+
Mockito.verify(capturedOps.get(), Mockito.atLeastOnce()).commit(any(), any());
3253+
3254+
// Verify the custom operations were used with custom headers
3255+
Mockito.verify(adapter, Mockito.atLeastOnce())
3256+
.execute(
3257+
reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(table2), customHeaders),
3258+
eq(LoadTableResponse.class),
3259+
any(),
3260+
any());
3261+
}
3262+
}
3263+
31103264
private RESTCatalog catalog(RESTCatalogAdapter adapter) {
31113265
RESTCatalog catalog =
31123266
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
@@ -3117,6 +3271,25 @@ private RESTCatalog catalog(RESTCatalogAdapter adapter) {
31173271
return catalog;
31183272
}
31193273

3274+
private RESTCatalog catalog(
3275+
RESTCatalogAdapter adapter,
3276+
Function<Function<Map<String, String>, RESTClient>, RESTSessionCatalog>
3277+
sessionCatalogFactory) {
3278+
RESTCatalog catalog =
3279+
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter) {
3280+
@Override
3281+
protected RESTSessionCatalog newSessionCatalog(
3282+
Function<Map<String, String>, RESTClient> clientBuilder) {
3283+
return sessionCatalogFactory.apply(clientBuilder);
3284+
}
3285+
};
3286+
catalog.initialize(
3287+
"test",
3288+
ImmutableMap.of(
3289+
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));
3290+
return catalog;
3291+
}
3292+
31203293
static HTTPRequest reqMatcher(HTTPMethod method) {
31213294
return argThat(req -> req.method() == method);
31223295
}

0 commit comments

Comments
 (0)