Skip to content

Commit a7486b0

Browse files
authored
[rest] Introduce snapshot loading to REST Catalog (#5147)
1 parent 2439452 commit a7486b0

33 files changed

+450
-56
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,12 @@ private List<Path> createExternalPaths() {
168168

169169
@Override
170170
public SnapshotManager snapshotManager() {
171-
return new SnapshotManager(fileIO, options.path(), options.branch(), snapshotCache);
171+
return new SnapshotManager(
172+
fileIO,
173+
options.path(),
174+
options.branch(),
175+
catalogEnvironment.snapshotLoader(),
176+
snapshotCache);
172177
}
173178

174179
@Override

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,11 @@ public static Table loadTable(
193193

194194
CatalogEnvironment catalogEnv =
195195
new CatalogEnvironment(
196-
identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory);
196+
identifier,
197+
metadata.uuid(),
198+
catalog.catalogLoader(),
199+
commitFactory,
200+
catalog instanceof SupportsSnapshots);
197201
Path path = new Path(schema.options().get(PATH.key()));
198202
FileStoreTable table =
199203
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import org.apache.paimon.Snapshot;
22+
23+
import java.util.Optional;
24+
25+
/** A {@link Catalog} supports loading table snapshots. */
26+
public interface SupportsSnapshots {
27+
28+
/**
29+
* Return the snapshot of table identified by the given {@link Identifier}.
30+
*
31+
* @param identifier Path of the table
32+
* @return The requested snapshot of the table
33+
* @throws Catalog.TableNotExistException if the target does not exist
34+
*/
35+
Optional<Snapshot> loadSnapshot(Identifier identifier) throws Catalog.TableNotExistException;
36+
}

paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.catalog.Database;
2626
import org.apache.paimon.catalog.Identifier;
2727
import org.apache.paimon.catalog.PropertyChange;
28+
import org.apache.paimon.catalog.SupportsSnapshots;
2829
import org.apache.paimon.catalog.TableMetadata;
2930
import org.apache.paimon.fs.FileIO;
3031
import org.apache.paimon.fs.Path;
@@ -58,6 +59,7 @@
5859
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
5960
import org.apache.paimon.rest.responses.GetDatabaseResponse;
6061
import org.apache.paimon.rest.responses.GetTableResponse;
62+
import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
6163
import org.apache.paimon.rest.responses.GetTableTokenResponse;
6264
import org.apache.paimon.rest.responses.GetViewResponse;
6365
import org.apache.paimon.rest.responses.ListDatabasesResponse;
@@ -83,6 +85,7 @@
8385
import java.util.Collections;
8486
import java.util.List;
8587
import java.util.Map;
88+
import java.util.Optional;
8689
import java.util.Set;
8790
import java.util.concurrent.ScheduledExecutorService;
8891

@@ -100,7 +103,7 @@
100103
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
101104

102105
/** A catalog implementation for REST. */
103-
public class RESTCatalog implements Catalog {
106+
public class RESTCatalog implements Catalog, SupportsSnapshots {
104107

105108
public static final String HEADER_PREFIX = "header.";
106109

@@ -297,11 +300,44 @@ private FileIO fileIOFromOptions(Path path) {
297300
}
298301
}
299302

300-
protected GetTableTokenResponse loadTableToken(Identifier identifier) {
301-
return client.get(
302-
resourcePaths.tableToken(identifier.getDatabaseName(), identifier.getObjectName()),
303-
GetTableTokenResponse.class,
304-
restAuthFunction);
303+
protected GetTableTokenResponse loadTableToken(Identifier identifier)
304+
throws TableNotExistException {
305+
GetTableTokenResponse response;
306+
try {
307+
response =
308+
client.get(
309+
resourcePaths.tableToken(
310+
identifier.getDatabaseName(), identifier.getObjectName()),
311+
GetTableTokenResponse.class,
312+
restAuthFunction);
313+
} catch (NoSuchResourceException e) {
314+
throw new TableNotExistException(identifier);
315+
} catch (ForbiddenException e) {
316+
throw new TableNoPermissionException(identifier, e);
317+
}
318+
return response;
319+
}
320+
321+
@Override
322+
public Optional<Snapshot> loadSnapshot(Identifier identifier) throws TableNotExistException {
323+
GetTableSnapshotResponse response;
324+
try {
325+
response =
326+
client.get(
327+
resourcePaths.tableSnapshot(
328+
identifier.getDatabaseName(), identifier.getObjectName()),
329+
GetTableSnapshotResponse.class,
330+
restAuthFunction);
331+
} catch (NoSuchResourceException e) {
332+
if (e.resourceType() == ErrorResponseResourceType.SNAPSHOT) {
333+
return Optional.empty();
334+
}
335+
throw new TableNotExistException(identifier);
336+
} catch (ForbiddenException e) {
337+
throw new TableNoPermissionException(identifier, e);
338+
}
339+
340+
return Optional.of(response.getSnapshot());
305341
}
306342

307343
public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {

paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.rest;
2020

21+
import org.apache.paimon.catalog.Catalog;
2122
import org.apache.paimon.catalog.CatalogContext;
2223
import org.apache.paimon.catalog.Identifier;
2324
import org.apache.paimon.fs.FileIO;
@@ -182,7 +183,11 @@ private boolean shouldRefresh() {
182183
private void refreshToken() {
183184
GetTableTokenResponse response;
184185
if (catalogInstance != null) {
185-
response = catalogInstance.loadTableToken(identifier);
186+
try {
187+
response = catalogInstance.loadTableToken(identifier);
188+
} catch (Catalog.TableNotExistException e) {
189+
throw new RuntimeException(e);
190+
}
186191
} else {
187192
try (RESTCatalog catalog = catalogLoader.load()) {
188193
response = catalog.loadTableToken(identifier);

paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ public String tableToken(String databaseName, String tableName) {
7474
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "token");
7575
}
7676

77+
public String tableSnapshot(String databaseName, String tableName) {
78+
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "snapshot");
79+
}
80+
7781
public String partitions(String databaseName, String tableName) {
7882
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions");
7983
}

paimon-core/src/main/java/org/apache/paimon/rest/responses/ErrorResponseResourceType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ public enum ErrorResponseResourceType {
2323
DATABASE,
2424
TABLE,
2525
COLUMN,
26-
VIEW
26+
VIEW,
27+
SNAPSHOT
2728
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.rest.responses;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.rest.RESTResponse;
23+
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
25+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
26+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
28+
29+
/** Response for table snapshot. */
30+
@JsonIgnoreProperties(ignoreUnknown = true)
31+
public class GetTableSnapshotResponse implements RESTResponse {
32+
33+
private static final String FIELD_SNAPSHOT = "snapshot";
34+
35+
@JsonProperty(FIELD_SNAPSHOT)
36+
private final Snapshot snapshot;
37+
38+
@JsonCreator
39+
public GetTableSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot) {
40+
this.snapshot = snapshot;
41+
}
42+
43+
@JsonGetter(FIELD_SNAPSHOT)
44+
public Snapshot getSnapshot() {
45+
return snapshot;
46+
}
47+
}

paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception {
270270
public TableSchema commitChanges(List<SchemaChange> changes)
271271
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
272272
Catalog.ColumnNotExistException {
273-
SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch);
273+
SnapshotManager snapshotManager =
274+
new SnapshotManager(fileIO, tableRoot, branch, null, null);
274275
boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);
275276

276277
while (true) {

paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.paimon.catalog.CatalogLoader;
2323
import org.apache.paimon.catalog.Identifier;
2424
import org.apache.paimon.catalog.SnapshotCommit;
25+
import org.apache.paimon.tag.SnapshotLoaderImpl;
26+
import org.apache.paimon.utils.SnapshotLoader;
2527
import org.apache.paimon.utils.SnapshotManager;
2628

2729
import javax.annotation.Nullable;
@@ -37,20 +39,23 @@ public class CatalogEnvironment implements Serializable {
3739
@Nullable private final String uuid;
3840
@Nullable private final CatalogLoader catalogLoader;
3941
@Nullable private final SnapshotCommit.Factory commitFactory;
42+
private final boolean supportsSnapshots;
4043

4144
public CatalogEnvironment(
4245
@Nullable Identifier identifier,
4346
@Nullable String uuid,
4447
@Nullable CatalogLoader catalogLoader,
45-
@Nullable SnapshotCommit.Factory commitFactory) {
48+
@Nullable SnapshotCommit.Factory commitFactory,
49+
boolean supportsSnapshots) {
4650
this.identifier = identifier;
4751
this.uuid = uuid;
4852
this.catalogLoader = catalogLoader;
4953
this.commitFactory = commitFactory;
54+
this.supportsSnapshots = supportsSnapshots;
5055
}
5156

5257
public static CatalogEnvironment empty() {
53-
return new CatalogEnvironment(null, null, null, null);
58+
return new CatalogEnvironment(null, null, null, null, false);
5459
}
5560

5661
@Nullable
@@ -79,6 +84,14 @@ public PartitionHandler partitionHandler() {
7984
return PartitionHandler.create(catalogLoader.load(), identifier);
8085
}
8186

87+
@Nullable
88+
public SnapshotLoader snapshotLoader() {
89+
if (catalogLoader == null || !supportsSnapshots) {
90+
return null;
91+
}
92+
return new SnapshotLoaderImpl(catalogLoader, identifier);
93+
}
94+
8295
@VisibleForTesting
8396
public SnapshotCommit.Factory commitFactory() {
8497
return commitFactory;

0 commit comments

Comments
 (0)