Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
Expand Down Expand Up @@ -137,6 +138,24 @@ public interface Admin extends AutoCloseable {
CompletableFuture<Void> createDatabase(
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists);

/**
* Alter a database with the given {@code databaseChanges}.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link DatabaseNotExistException} when the database does not exist and {@code
* ignoreIfNotExists} is false.
* </ul>
*
* @param databaseName The name of the database.
* @param databaseChanges The database changes.
* @param ignoreIfNotExists if it is true, do nothing if database does not exist. If false,
* throw a {@link DatabaseNotExistException}.
*/
CompletableFuture<Void> alterDatabase(
String databaseName, List<DatabaseChange> databaseChanges, boolean ignoreIfNotExists);

/**
* Get the database with the given database name asynchronously.
*
Expand Down Expand Up @@ -275,8 +294,8 @@ CompletableFuture<Void> createTable(
*
* <ul>
* <li>{@link DatabaseNotExistException} when the database does not exist.
* <li>{@link TableNotExistException} when the table does not exist, and ignoreIfNotExists is
* false.
* <li>{@link TableNotExistException} when the table does not exist and {@code
* ignoreIfNotExists} is false.
* <li>{@link InvalidAlterTableException} if the alter operation is invalid, such as alter set
* a table option which is not supported to modify currently.
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
Expand All @@ -53,6 +54,7 @@
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AddServerTagRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
Expand Down Expand Up @@ -108,6 +110,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
Expand Down Expand Up @@ -283,6 +286,23 @@ public CompletableFuture<Void> alterTable(
return gateway.alterTable(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Void> alterDatabase(
String databaseName, List<DatabaseChange> databaseChanges, boolean ignoreIfNotExists) {
TablePath.validateDatabaseName(databaseName);
AlterDatabaseRequest request = new AlterDatabaseRequest();

List<PbAlterConfig> pbDatabaseChanges =
databaseChanges.stream()
.map(ClientRpcMessageUtils::toPbAlterConfigsForDatabase)
.collect(Collectors.toList());

request.addAllConfigChanges(pbDatabaseChanges)
.setDatabaseName(databaseName)
.setIgnoreIfNotExists(ignoreIfNotExists);
return gateway.alterDatabase(request).thenApply(r -> null);
}

@Override
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
GetTableInfoRequest request = new GetTableInfoRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
Expand Down Expand Up @@ -108,6 +109,7 @@
import java.util.stream.Collectors;

import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
import static org.apache.fluss.config.FlussConfigUtils.COMMENT_PROP_NAME;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkState;
Expand Down Expand Up @@ -652,6 +654,35 @@ public static List<ConfigEntry> toConfigEntries(List<PbDescribeConfig> pbDescrib
.collect(Collectors.toList());
}

public static PbAlterConfig toPbAlterConfigsForDatabase(DatabaseChange databaseChange) {
PbAlterConfig info = new PbAlterConfig();
if (databaseChange instanceof DatabaseChange.SetOption) {
DatabaseChange.SetOption setOption = (DatabaseChange.SetOption) databaseChange;
info.setConfigKey(setOption.getKey());
info.setConfigValue(setOption.getValue());
info.setOpType(AlterConfigOpType.SET.value());
} else if (databaseChange instanceof DatabaseChange.ResetOption) {
DatabaseChange.ResetOption resetOption = (DatabaseChange.ResetOption) databaseChange;
info.setConfigKey(resetOption.getKey());
info.setOpType(AlterConfigOpType.DELETE.value());
} else if (databaseChange instanceof DatabaseChange.UpdateComment) {
DatabaseChange.UpdateComment updateComment =
(DatabaseChange.UpdateComment) databaseChange;
if (updateComment.getComment() != null) {
info.setConfigKey(COMMENT_PROP_NAME);
info.setConfigValue(updateComment.getComment());
info.setOpType(AlterConfigOpType.SET.value());
} else {
info.setConfigKey(COMMENT_PROP_NAME);
info.setOpType(AlterConfigOpType.DELETE.value());
}
} else {
throw new IllegalArgumentException(
"Unsupported database change: " + databaseChange.getClass());
}
return info;
}

/**
* Parses a PbProducerTableOffsets into a map of TableBucket to offset.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.metadata.AggFunctions;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
Expand Down Expand Up @@ -178,6 +179,76 @@ void testGetDatabaseInfo() throws Exception {
.isBetween(timestampBeforeCreate, timestampAfterCreate);
}

@Test
void testAlterDatabase() throws Exception {
// create database
String dbName = "test_alter_db";
admin.createDatabase(
dbName,
DatabaseDescriptor.builder()
.comment("original comment")
.customProperty("key1", "value1")
.customProperty("key2", "value2")
.build(),
false)
.get();

DatabaseInfo databaseInfo = admin.getDatabaseInfo(dbName).get();
DatabaseDescriptor existingDescriptor = databaseInfo.getDatabaseDescriptor();

// Verify initial state
assertThat(existingDescriptor.getComment().get()).isEqualTo("original comment");
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key1", "value1");
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key2", "value2");

// Alter database: add and modify custom properties
List<DatabaseChange> databaseChanges = new ArrayList<>();
databaseChanges.add(DatabaseChange.set("key3", "value3"));
databaseChanges.add(DatabaseChange.set("key1", "updated_value1"));
databaseChanges.add(DatabaseChange.updateComment("updated comment"));
admin.alterDatabase(dbName, databaseChanges, false).get();

// Verify alterations
DatabaseInfo alteredDatabaseInfo = admin.getDatabaseInfo(dbName).get();
DatabaseDescriptor alteredDescriptor = alteredDatabaseInfo.getDatabaseDescriptor();
assertThat(alteredDescriptor.getComment().get()).isEqualTo("updated comment");
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key1", "updated_value1");
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key2", "value2");
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key3", "value3");
assertThat(alteredDescriptor.getCustomProperties()).hasSize(3);

// Alter database: reset a property
databaseChanges = new ArrayList<>();
databaseChanges.add(DatabaseChange.reset("key2"));
databaseChanges.add(DatabaseChange.updateComment(null));
admin.alterDatabase(dbName, databaseChanges, false).get();

// Verify reset
DatabaseInfo resetDatabaseInfo = admin.getDatabaseInfo(dbName).get();
DatabaseDescriptor resetDescriptor = resetDatabaseInfo.getDatabaseDescriptor();
assertThat(resetDescriptor.getComment()).isEmpty();
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key1", "updated_value1");
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key3", "value3");
assertThat(resetDescriptor.getCustomProperties()).doesNotContainKey("key2");
assertThat(resetDescriptor.getCustomProperties()).hasSize(2);

// throw exception if database not exist
List<DatabaseChange> finalDatabaseChanges = databaseChanges;
assertThatThrownBy(
() ->
admin.alterDatabase(
"test_alter_db_not_exist",
finalDatabaseChanges,
false)
.get())
.cause()
.isInstanceOf(DatabaseNotExistException.class)
.hasMessage(String.format("Database %s not exists.", "test_alter_db_not_exist"));

// should success if ignore not exist
admin.alterDatabase("test_alter_db_not_exist", databaseChanges, true).get();
}

@Test
void testGetTableInfoAndSchema() throws Exception {
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class FlussConfigUtils {

public static final List<String> ALTERABLE_TABLE_OPTIONS;

public static final String COMMENT_PROP_NAME = "comment";

static {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
Expand Down
Loading
Loading