Skip to content

Commit 7be6916

Browse files
committed
[server] Support alter database comment and custom properties
1 parent 9489cce commit 7be6916

File tree

19 files changed

+780
-7
lines changed

19 files changed

+780
-7
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.fluss.exception.TableNotPartitionedException;
5454
import org.apache.fluss.exception.TooManyBucketsException;
5555
import org.apache.fluss.exception.TooManyPartitionsException;
56+
import org.apache.fluss.metadata.DatabaseChange;
5657
import org.apache.fluss.metadata.DatabaseDescriptor;
5758
import org.apache.fluss.metadata.DatabaseInfo;
5859
import org.apache.fluss.metadata.DatabaseSummary;
@@ -137,6 +138,24 @@ public interface Admin extends AutoCloseable {
137138
CompletableFuture<Void> createDatabase(
138139
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists);
139140

141+
/**
142+
* Alter a database with the given {@code databaseChanges}.
143+
*
144+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
145+
*
146+
* <ul>
147+
* <li>{@link DatabaseNotExistException} when the database does not exist and {@code
148+
* ignoreIfNotExists} is false.
149+
* </ul>
150+
*
151+
* @param databaseName The name of the database.
152+
* @param databaseChanges The database changes.
153+
* @param ignoreIfNotExists if it is true, do nothing if database does not exist. If false,
154+
* throw a {@link DatabaseNotExistException}.
155+
*/
156+
CompletableFuture<Void> alterDatabase(
157+
String databaseName, List<DatabaseChange> databaseChanges, boolean ignoreIfNotExists);
158+
140159
/**
141160
* Get the database with the given database name asynchronously.
142161
*
@@ -275,8 +294,8 @@ CompletableFuture<Void> createTable(
275294
*
276295
* <ul>
277296
* <li>{@link DatabaseNotExistException} when the database does not exist.
278-
* <li>{@link TableNotExistException} when the table does not exist, and ignoreIfNotExists is
279-
* false.
297+
* <li>{@link TableNotExistException} when the table does not exist and {@code
298+
* ignoreIfNotExists} is false.
280299
* <li>{@link InvalidAlterTableException} if the alter operation is invalid, such as alter set
281300
* a table option which is not supported to modify currently.
282301
* </ul>

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.config.cluster.ConfigEntry;
3333
import org.apache.fluss.exception.FlussRuntimeException;
3434
import org.apache.fluss.exception.LeaderNotAvailableException;
35+
import org.apache.fluss.metadata.DatabaseChange;
3536
import org.apache.fluss.metadata.DatabaseDescriptor;
3637
import org.apache.fluss.metadata.DatabaseInfo;
3738
import org.apache.fluss.metadata.DatabaseSummary;
@@ -53,6 +54,7 @@
5354
import org.apache.fluss.rpc.gateway.TabletServerGateway;
5455
import org.apache.fluss.rpc.messages.AddServerTagRequest;
5556
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
57+
import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
5658
import org.apache.fluss.rpc.messages.AlterTableRequest;
5759
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
5860
import org.apache.fluss.rpc.messages.CreateAclsRequest;
@@ -108,6 +110,7 @@
108110
import java.util.Map;
109111
import java.util.Optional;
110112
import java.util.concurrent.CompletableFuture;
113+
import java.util.stream.Collectors;
111114

112115
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
113116
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
@@ -283,6 +286,23 @@ public CompletableFuture<Void> alterTable(
283286
return gateway.alterTable(request).thenApply(r -> null);
284287
}
285288

289+
@Override
290+
public CompletableFuture<Void> alterDatabase(
291+
String databaseName, List<DatabaseChange> databaseChanges, boolean ignoreIfNotExists) {
292+
TablePath.validateDatabaseName(databaseName);
293+
AlterDatabaseRequest request = new AlterDatabaseRequest();
294+
295+
List<PbAlterConfig> pbDatabaseChanges =
296+
databaseChanges.stream()
297+
.map(ClientRpcMessageUtils::toPbAlterConfigsForDatabase)
298+
.collect(Collectors.toList());
299+
300+
request.addAllConfigChanges(pbDatabaseChanges)
301+
.setDatabaseName(databaseName)
302+
.setIgnoreIfNotExists(ignoreIfNotExists);
303+
return gateway.alterDatabase(request).thenApply(r -> null);
304+
}
305+
286306
@Override
287307
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
288308
GetTableInfoRequest request = new GetTableInfoRequest();

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.fluss.fs.FsPath;
3838
import org.apache.fluss.fs.FsPathAndFileName;
3939
import org.apache.fluss.fs.token.ObtainedSecurityToken;
40+
import org.apache.fluss.metadata.DatabaseChange;
4041
import org.apache.fluss.metadata.DatabaseSummary;
4142
import org.apache.fluss.metadata.PartitionInfo;
4243
import org.apache.fluss.metadata.PartitionSpec;
@@ -108,6 +109,7 @@
108109
import java.util.stream.Collectors;
109110

110111
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
112+
import static org.apache.fluss.config.FlussConfigUtils.COMMENT_PROP_NAME;
111113
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
112114
import static org.apache.fluss.utils.Preconditions.checkArgument;
113115
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -652,6 +654,35 @@ public static List<ConfigEntry> toConfigEntries(List<PbDescribeConfig> pbDescrib
652654
.collect(Collectors.toList());
653655
}
654656

657+
public static PbAlterConfig toPbAlterConfigsForDatabase(DatabaseChange databaseChange) {
658+
PbAlterConfig info = new PbAlterConfig();
659+
if (databaseChange instanceof DatabaseChange.SetOption) {
660+
DatabaseChange.SetOption setOption = (DatabaseChange.SetOption) databaseChange;
661+
info.setConfigKey(setOption.getKey());
662+
info.setConfigValue(setOption.getValue());
663+
info.setOpType(AlterConfigOpType.SET.value());
664+
} else if (databaseChange instanceof DatabaseChange.ResetOption) {
665+
DatabaseChange.ResetOption resetOption = (DatabaseChange.ResetOption) databaseChange;
666+
info.setConfigKey(resetOption.getKey());
667+
info.setOpType(AlterConfigOpType.DELETE.value());
668+
} else if (databaseChange instanceof DatabaseChange.UpdateComment) {
669+
DatabaseChange.UpdateComment updateComment =
670+
(DatabaseChange.UpdateComment) databaseChange;
671+
if (updateComment.getComment() != null) {
672+
info.setConfigKey(COMMENT_PROP_NAME);
673+
info.setConfigValue(updateComment.getComment());
674+
info.setOpType(AlterConfigOpType.SET.value());
675+
} else {
676+
info.setConfigKey(COMMENT_PROP_NAME);
677+
info.setOpType(AlterConfigOpType.DELETE.value());
678+
}
679+
} else {
680+
throw new IllegalArgumentException(
681+
"Unsupported database change: " + databaseChange.getClass());
682+
}
683+
return info;
684+
}
685+
655686
/**
656687
* Parses a PbProducerTableOffsets into a map of TableBucket to offset.
657688
*

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.fluss.fs.FsPath;
5656
import org.apache.fluss.fs.FsPathAndFileName;
5757
import org.apache.fluss.metadata.AggFunctions;
58+
import org.apache.fluss.metadata.DatabaseChange;
5859
import org.apache.fluss.metadata.DatabaseDescriptor;
5960
import org.apache.fluss.metadata.DatabaseInfo;
6061
import org.apache.fluss.metadata.DatabaseSummary;
@@ -178,6 +179,76 @@ void testGetDatabaseInfo() throws Exception {
178179
.isBetween(timestampBeforeCreate, timestampAfterCreate);
179180
}
180181

182+
@Test
183+
void testAlterDatabase() throws Exception {
184+
// create database
185+
String dbName = "test_alter_db";
186+
admin.createDatabase(
187+
dbName,
188+
DatabaseDescriptor.builder()
189+
.comment("original comment")
190+
.customProperty("key1", "value1")
191+
.customProperty("key2", "value2")
192+
.build(),
193+
false)
194+
.get();
195+
196+
DatabaseInfo databaseInfo = admin.getDatabaseInfo(dbName).get();
197+
DatabaseDescriptor existingDescriptor = databaseInfo.getDatabaseDescriptor();
198+
199+
// Verify initial state
200+
assertThat(existingDescriptor.getComment().get()).isEqualTo("original comment");
201+
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key1", "value1");
202+
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key2", "value2");
203+
204+
// Alter database: add and modify custom properties
205+
List<DatabaseChange> databaseChanges = new ArrayList<>();
206+
databaseChanges.add(DatabaseChange.set("key3", "value3"));
207+
databaseChanges.add(DatabaseChange.set("key1", "updated_value1"));
208+
databaseChanges.add(DatabaseChange.updateComment("updated comment"));
209+
admin.alterDatabase(dbName, databaseChanges, false).get();
210+
211+
// Verify alterations
212+
DatabaseInfo alteredDatabaseInfo = admin.getDatabaseInfo(dbName).get();
213+
DatabaseDescriptor alteredDescriptor = alteredDatabaseInfo.getDatabaseDescriptor();
214+
assertThat(alteredDescriptor.getComment().get()).isEqualTo("updated comment");
215+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key1", "updated_value1");
216+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key2", "value2");
217+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key3", "value3");
218+
assertThat(alteredDescriptor.getCustomProperties()).hasSize(3);
219+
220+
// Alter database: reset a property
221+
databaseChanges = new ArrayList<>();
222+
databaseChanges.add(DatabaseChange.reset("key2"));
223+
databaseChanges.add(DatabaseChange.updateComment(null));
224+
admin.alterDatabase(dbName, databaseChanges, false).get();
225+
226+
// Verify reset
227+
DatabaseInfo resetDatabaseInfo = admin.getDatabaseInfo(dbName).get();
228+
DatabaseDescriptor resetDescriptor = resetDatabaseInfo.getDatabaseDescriptor();
229+
assertThat(resetDescriptor.getComment()).isEmpty();
230+
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key1", "updated_value1");
231+
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key3", "value3");
232+
assertThat(resetDescriptor.getCustomProperties()).doesNotContainKey("key2");
233+
assertThat(resetDescriptor.getCustomProperties()).hasSize(2);
234+
235+
// throw exception if database not exist
236+
List<DatabaseChange> finalDatabaseChanges = databaseChanges;
237+
assertThatThrownBy(
238+
() ->
239+
admin.alterDatabase(
240+
"test_alter_db_not_exist",
241+
finalDatabaseChanges,
242+
false)
243+
.get())
244+
.cause()
245+
.isInstanceOf(DatabaseNotExistException.class)
246+
.hasMessage(String.format("Database %s not exists.", "test_alter_db_not_exist"));
247+
248+
// should success if ignore not exist
249+
admin.alterDatabase("test_alter_db_not_exist", databaseChanges, true).get();
250+
}
251+
181252
@Test
182253
void testGetTableInfoAndSchema() throws Exception {
183254
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public class FlussConfigUtils {
3838

3939
public static final List<String> ALTERABLE_TABLE_OPTIONS;
4040

41+
public static final String COMMENT_PROP_NAME = "comment";
42+
4143
static {
4244
TABLE_OPTIONS = extractConfigOptions("table.");
4345
CLIENT_OPTIONS = extractConfigOptions("client.");

0 commit comments

Comments
 (0)