Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,40 +34,116 @@ public interface SupportsBranches extends Catalog {
* @param branch the branch name
* @param fromTag from the tag
* @throws TableNotExistException if the table in identifier doesn't exist
* @throws DatabaseNotExistException if the database in identifier doesn't exist
* @throws BranchAlreadyExistException if the branch already exists
* @throws TagNotExistException if the tag doesn't exist
*/
void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
throws TableNotExistException, DatabaseNotExistException;
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException;

/**
* Drop the branch for this table.
*
* @param identifier path of the table, cannot be system or branch name.
* @param branch the branch name
* @throws TableNotExistException if the table in identifier doesn't exist
* @throws DatabaseNotExistException if the database in identifier doesn't exist
* @throws BranchNotExistException if the branch doesn't exist
*/
void dropBranch(Identifier identifier, String branch)
throws TableNotExistException, DatabaseNotExistException;
void dropBranch(Identifier identifier, String branch) throws BranchNotExistException;

/**
* Fast-forward a branch to main branch.
*
* @param identifier path of the table, cannot be system or branch name.
* @param branch the branch name
* @throws TableNotExistException if the table in identifier doesn't exist
* @throws DatabaseNotExistException if the database in identifier doesn't exist
* @throws BranchNotExistException if the branch doesn't exist
*/
void fastForward(Identifier identifier, String branch)
throws TableNotExistException, DatabaseNotExistException;
void fastForward(Identifier identifier, String branch) throws BranchNotExistException;

/**
* List all branches of the table.
*
* @param identifier path of the table, cannot be system or branch name.
* @throws TableNotExistException if the table in identifier doesn't exist
* @throws DatabaseNotExistException if the database in identifier doesn't exist
*/
List<String> listBranches(Identifier identifier)
throws TableNotExistException, DatabaseNotExistException;
List<String> listBranches(Identifier identifier) throws TableNotExistException;

/** Exception for trying to create a branch that already exists. */
class BranchAlreadyExistException extends Exception {

private static final String MSG = "Branch %s in table %s already exists.";

private final Identifier identifier;
private final String branch;

public BranchAlreadyExistException(Identifier identifier, String branch) {
this(identifier, branch, null);
}

public BranchAlreadyExistException(Identifier identifier, String branch, Throwable cause) {
super(String.format(MSG, branch, identifier.getFullName()), cause);
this.identifier = identifier;
this.branch = branch;
}

public Identifier identifier() {
return identifier;
}

public String branch() {
return branch;
}
}

/** Exception for trying to operate on a branch that doesn't exist. */
class BranchNotExistException extends Exception {

private static final String MSG = "Branch %s in table %s doesn't exist.";

private final Identifier identifier;
private final String branch;

public BranchNotExistException(Identifier identifier, String branch) {
this(identifier, branch, null);
}

public BranchNotExistException(Identifier identifier, String branch, Throwable cause) {
super(String.format(MSG, branch, identifier.getFullName()), cause);
this.identifier = identifier;
this.branch = branch;
}

public Identifier identifier() {
return identifier;
}

public String branch() {
return branch;
}
}

/** Exception for trying to operate on a tag that doesn't exist. */
class TagNotExistException extends Exception {

private static final String MSG = "Tag %s in table %s doesn't exist.";

private final Identifier identifier;
private final String tag;

public TagNotExistException(Identifier identifier, String tag) {
this(identifier, tag, null);
}

public TagNotExistException(Identifier identifier, String tag, Throwable cause) {
super(String.format(MSG, tag, identifier.getFullName()), cause);
this.identifier = identifier;
this.tag = tag;
}

public Identifier identifier() {
return identifier;
}

public String tag() {
return tag;
}
}
}
96 changes: 86 additions & 10 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.SupportsBranches;
import org.apache.paimon.catalog.SupportsSnapshots;
import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
Expand All @@ -45,11 +46,13 @@
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CommitTableRequest;
import org.apache.paimon.rest.requests.CreateBranchRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.CreatePartitionsRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.requests.CreateViewRequest;
import org.apache.paimon.rest.requests.DropPartitionsRequest;
import org.apache.paimon.rest.requests.ForwardBranchRequest;
import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
Expand All @@ -62,6 +65,7 @@
import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListBranchesResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
Expand All @@ -79,6 +83,8 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand All @@ -103,7 +109,7 @@
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;

/** A catalog implementation for REST. */
public class RESTCatalog implements Catalog, SupportsSnapshots {
public class RESTCatalog implements Catalog, SupportsSnapshots, SupportsBranches {

public static final String HEADER_PREFIX = "header.";

Expand Down Expand Up @@ -235,15 +241,11 @@ public void alterDatabase(String name, List<PropertyChange> changes, boolean ign
Set<String> removeKeys = setPropertiesToRemoveKeys.getRight();
AlterDatabaseRequest request =
new AlterDatabaseRequest(new ArrayList<>(removeKeys), updateProperties);
AlterDatabaseResponse response =
client.post(
resourcePaths.databaseProperties(name),
request,
AlterDatabaseResponse.class,
restAuthFunction);
// if (response.getUpdated().isEmpty()) {
// throw new IllegalStateException("Failed to update properties");
// }
client.post(
resourcePaths.databaseProperties(name),
request,
AlterDatabaseResponse.class,
restAuthFunction);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(name);
Expand Down Expand Up @@ -578,6 +580,80 @@ public List<Partition> listPartitions(Identifier identifier) throws TableNotExis
}
}

@Override
public void createBranch(Identifier identifier, String branch, @Nullable String fromTag)
throws TableNotExistException, BranchAlreadyExistException, TagNotExistException {
try {
CreateBranchRequest request = new CreateBranchRequest(branch, fromTag);
client.post(
resourcePaths.branches(identifier.getDatabaseName(), identifier.getTableName()),
request,
restAuthFunction);
} catch (NoSuchResourceException e) {
if (e.resourceType() == ErrorResponseResourceType.TABLE) {
throw new TableNotExistException(identifier, e);
} else if (e.resourceType() == ErrorResponseResourceType.TAG) {
throw new TagNotExistException(identifier, fromTag, e);
} else {
throw e;
}
} catch (AlreadyExistsException e) {
throw new BranchAlreadyExistException(identifier, branch, e);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

@Override
public void dropBranch(Identifier identifier, String branch) throws BranchNotExistException {
try {
client.delete(
resourcePaths.branch(
identifier.getDatabaseName(), identifier.getTableName(), branch),
restAuthFunction);
} catch (NoSuchResourceException e) {
throw new BranchNotExistException(identifier, branch, e);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

@Override
public void fastForward(Identifier identifier, String branch) throws BranchNotExistException {
try {
ForwardBranchRequest request = new ForwardBranchRequest(branch);
client.post(
resourcePaths.forwardBranch(
identifier.getDatabaseName(), identifier.getTableName()),
request,
restAuthFunction);
} catch (NoSuchResourceException e) {
throw new BranchNotExistException(identifier, branch, e);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

@Override
public List<String> listBranches(Identifier identifier) throws TableNotExistException {
try {
ListBranchesResponse response =
client.get(
resourcePaths.branches(
identifier.getDatabaseName(), identifier.getTableName()),
ListBranchesResponse.class,
restAuthFunction);
if (response == null || response.branches() == null) {
return Collections.emptyList();
}
return response.branches();
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the database does not exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not have this exception. According to the current design, we only need to check the upper level exception when creating xx and listing xx (the upper level of branch is table)

} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

@Override
public View getView(Identifier identifier) throws ViewNotExistException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class ResourcePaths {
private static final String V1 = "/v1";
private static final String DATABASES = "databases";
private static final String TABLES = "tables";
private static final String PARTITIONS = "partitions";
private static final String BRANCHES = "branches";
private static final String VIEWS = "views";
public static final String QUERY_PARAMETER_WAREHOUSE_KEY = "warehouse";

public static String config(String warehouse) {
Expand Down Expand Up @@ -82,33 +85,47 @@ public String tableSnapshot(String databaseName, String tableName) {
}

public String partitions(String databaseName, String tableName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions");
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, PARTITIONS);
}

public String dropPartitions(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions", "drop");
V1, prefix, DATABASES, databaseName, TABLES, tableName, PARTITIONS, "drop");
}

public String alterPartitions(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions", "alter");
V1, prefix, DATABASES, databaseName, TABLES, tableName, PARTITIONS, "alter");
}

public String markDonePartitions(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions", "mark");
V1, prefix, DATABASES, databaseName, TABLES, tableName, PARTITIONS, "mark");
}

public String branches(String databaseName, String tableName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, BRANCHES);
}

public String branch(String databaseName, String tableName, String branchName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, BRANCHES, branchName);
}

public String forwardBranch(String databaseName, String tableName) {
return SLASH.join(
V1, prefix, DATABASES, databaseName, TABLES, tableName, BRANCHES, "forward");
}

public String views(String databaseName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, "views");
return SLASH.join(V1, prefix, DATABASES, databaseName, VIEWS);
}

public String view(String databaseName, String viewName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, "views", viewName);
return SLASH.join(V1, prefix, DATABASES, databaseName, VIEWS, viewName);
}

public String renameView(String databaseName) {
return SLASH.join(V1, prefix, DATABASES, databaseName, "views", "rename");
return SLASH.join(V1, prefix, DATABASES, databaseName, VIEWS, "rename");
}
}
Loading
Loading