Skip to content

Commit 3a2dbce

Browse files
authored
[#9586] improvement(core): optimize checking in use status of catalogs and metalakes (#9574)
### What changes were proposed in this pull request? This pull request refactors catalog usage validation in the Gravitino codebase, improving how the system checks whether catalogs and their parent metalakes are enabled ("in use"). The main changes include removing the dynamic proxy-based `OperationDispatcherInterceptor`, centralizing usage checks within the catalog implementation, and introducing a new property to track metalake usage status. Additionally, the code now ensures that operations on catalogs consistently verify both catalog and metalake usage status. ### Why are the changes needed? To clean and simply code path Fix: #9574 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? UTs
1 parent fbffe05 commit 3a2dbce

File tree

10 files changed

+392
-289
lines changed

10 files changed

+392
-289
lines changed

catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void testPropertyMeta() {
6060
Map<String, PropertyEntry<?>> propertyEntryMap =
6161
HIVE_PROPERTIES_METADATA.catalogPropertiesMetadata().propertyEntries();
6262

63-
Assertions.assertEquals(17, propertyEntryMap.size());
63+
Assertions.assertEquals(18, propertyEntryMap.size());
6464
Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
6565
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
6666
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));

clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,7 @@ public void testCatalogAvailable() {
227227
Assertions.assertThrows(
228228
CatalogNotInUseException.class,
229229
() -> metalake.alterCatalog(catalogName, CatalogChange.updateComment("new comment")));
230-
Assertions.assertTrue(
231-
exception.getMessage().contains("please enable it first"), exception.getMessage());
230+
Assertions.assertTrue(exception.getMessage().contains("is not in use"), exception.getMessage());
232231

233232
// test schema operations under non-in-use catalog
234233
SupportsSchemas schemaOps = loadedCatalog.asSchemas();

core/src/main/java/org/apache/gravitino/GravitinoEnv.java

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.gravitino;
2020

2121
import com.google.common.base.Preconditions;
22-
import java.lang.reflect.Proxy;
2322
import org.apache.gravitino.audit.AuditLogManager;
2423
import org.apache.gravitino.authorization.AccessControlDispatcher;
2524
import org.apache.gravitino.authorization.AccessControlManager;
@@ -41,7 +40,6 @@
4140
import org.apache.gravitino.catalog.ModelDispatcher;
4241
import org.apache.gravitino.catalog.ModelNormalizeDispatcher;
4342
import org.apache.gravitino.catalog.ModelOperationDispatcher;
44-
import org.apache.gravitino.catalog.OperationDispatcherInterceptor;
4543
import org.apache.gravitino.catalog.PartitionDispatcher;
4644
import org.apache.gravitino.catalog.PartitionNormalizeDispatcher;
4745
import org.apache.gravitino.catalog.PartitionOperationDispatcher;
@@ -555,28 +553,14 @@ private void initGravitinoServerComponents() {
555553

556554
SchemaOperationDispatcher schemaOperationDispatcher =
557555
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
558-
SchemaDispatcher schemaDispatcherProxy =
559-
(SchemaDispatcher)
560-
Proxy.newProxyInstance(
561-
SchemaDispatcher.class.getClassLoader(),
562-
new Class[] {SchemaDispatcher.class},
563-
new OperationDispatcherInterceptor(
564-
schemaOperationDispatcher, catalogManager, entityStore));
565-
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaDispatcherProxy);
556+
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaOperationDispatcher);
566557
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
567558
new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
568559
this.schemaDispatcher = new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);
569560

570561
TableOperationDispatcher tableOperationDispatcher =
571562
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
572-
TableDispatcher tableDispatcherProxy =
573-
(TableDispatcher)
574-
Proxy.newProxyInstance(
575-
TableDispatcher.class.getClassLoader(),
576-
new Class[] {TableDispatcher.class},
577-
new OperationDispatcherInterceptor(
578-
tableOperationDispatcher, catalogManager, entityStore));
579-
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableDispatcherProxy);
563+
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableOperationDispatcher);
580564
TableNormalizeDispatcher tableNormalizeDispatcher =
581565
new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
582566
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);
@@ -585,55 +569,28 @@ private void initGravitinoServerComponents() {
585569
// partition doesn't have ownership, so we don't need it now.
586570
PartitionOperationDispatcher partitionOperationDispatcher =
587571
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);
588-
PartitionDispatcher partitionDispatcherProxy =
589-
(PartitionDispatcher)
590-
Proxy.newProxyInstance(
591-
PartitionDispatcher.class.getClassLoader(),
592-
new Class[] {PartitionDispatcher.class},
593-
new OperationDispatcherInterceptor(
594-
partitionOperationDispatcher, catalogManager, entityStore));
595572
PartitionNormalizeDispatcher partitionNormalizeDispatcher =
596-
new PartitionNormalizeDispatcher(partitionDispatcherProxy, catalogManager);
573+
new PartitionNormalizeDispatcher(partitionOperationDispatcher, catalogManager);
597574
this.partitionDispatcher = new PartitionEventDispatcher(eventBus, partitionNormalizeDispatcher);
598575

599576
FilesetOperationDispatcher filesetOperationDispatcher =
600577
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
601-
FilesetDispatcher filesetDispatcherProxy =
602-
(FilesetDispatcher)
603-
Proxy.newProxyInstance(
604-
FilesetDispatcher.class.getClassLoader(),
605-
new Class[] {FilesetDispatcher.class},
606-
new OperationDispatcherInterceptor(
607-
filesetOperationDispatcher, catalogManager, entityStore));
608-
FilesetHookDispatcher filesetHookDispatcher = new FilesetHookDispatcher(filesetDispatcherProxy);
578+
FilesetHookDispatcher filesetHookDispatcher =
579+
new FilesetHookDispatcher(filesetOperationDispatcher);
609580
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
610581
new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
611582
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);
612583

613584
TopicOperationDispatcher topicOperationDispatcher =
614585
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
615-
TopicDispatcher topicDispatcherProxy =
616-
(TopicDispatcher)
617-
Proxy.newProxyInstance(
618-
TopicDispatcher.class.getClassLoader(),
619-
new Class[] {TopicDispatcher.class},
620-
new OperationDispatcherInterceptor(
621-
topicOperationDispatcher, catalogManager, entityStore));
622-
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicDispatcherProxy);
586+
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicOperationDispatcher);
623587
TopicNormalizeDispatcher topicNormalizeDispatcher =
624588
new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
625589
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);
626590

627591
ModelOperationDispatcher modelOperationDispatcher =
628592
new ModelOperationDispatcher(catalogManager, entityStore, idGenerator);
629-
ModelDispatcher modelDispatcherProxy =
630-
(ModelDispatcher)
631-
Proxy.newProxyInstance(
632-
ModelDispatcher.class.getClassLoader(),
633-
new Class[] {ModelDispatcher.class},
634-
new OperationDispatcherInterceptor(
635-
modelOperationDispatcher, catalogManager, entityStore));
636-
ModelHookDispatcher modelHookDispatcher = new ModelHookDispatcher(modelDispatcherProxy);
593+
ModelHookDispatcher modelHookDispatcher = new ModelHookDispatcher(modelOperationDispatcher);
637594
ModelNormalizeDispatcher modelNormalizeDispatcher =
638595
new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
639596
this.modelDispatcher = new ModelEventDispatcher(eventBus, modelNormalizeDispatcher);

core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java

Lines changed: 72 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import static org.apache.gravitino.StringIdentifier.DUMMY_ID;
2424
import static org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;
2525
import static org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
26-
import static org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.BASIC_CATALOG_PROPERTIES_METADATA;
26+
import static org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.PROPERTY_METALAKE_IN_USE;
2727
import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
28-
import static org.apache.gravitino.metalake.MetalakeManager.metalakeInUse;
2928

3029
import com.github.benmanes.caffeine.cache.Cache;
3130
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -76,15 +75,13 @@
7675
import org.apache.gravitino.connector.BaseCatalog;
7776
import org.apache.gravitino.connector.CatalogOperations;
7877
import org.apache.gravitino.connector.HasPropertyMetadata;
79-
import org.apache.gravitino.connector.PropertyEntry;
8078
import org.apache.gravitino.connector.SupportsSchemas;
8179
import org.apache.gravitino.connector.authorization.BaseAuthorization;
8280
import org.apache.gravitino.connector.capability.Capability;
8381
import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
8482
import org.apache.gravitino.exceptions.CatalogInUseException;
8583
import org.apache.gravitino.exceptions.CatalogNotInUseException;
8684
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
87-
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
8885
import org.apache.gravitino.exceptions.NoSuchCatalogException;
8986
import org.apache.gravitino.exceptions.NoSuchEntityException;
9087
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -116,17 +113,6 @@ public class CatalogManager implements CatalogDispatcher, Closeable {
116113

117114
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
118115

119-
public void checkCatalogInUse(EntityStore store, NameIdentifier ident)
120-
throws NoSuchMetalakeException, NoSuchCatalogException, CatalogNotInUseException,
121-
MetalakeNotInUseException {
122-
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
123-
checkMetalake(metalakeIdent, store);
124-
125-
if (!getCatalogInUseValue(store, ident)) {
126-
throw new CatalogNotInUseException("Catalog %s is not in use, please enable it first", ident);
127-
}
128-
}
129-
130116
/** Wrapper class for a catalog instance and its class loader. */
131117
public static class CatalogWrapper {
132118

@@ -388,13 +374,13 @@ public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeExce
388374
*/
389375
@Override
390376
public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
391-
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
392377
return TreeLockUtils.doWithTreeLock(
393378
ident,
394379
LockType.READ,
395380
() -> {
396-
checkMetalake(metalakeIdent, store);
397-
return loadCatalogAndWrap(ident).catalog;
381+
BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
382+
baseCatalog.checkMetalakeInUse();
383+
return baseCatalog;
398384
});
399385
}
400386

@@ -555,18 +541,18 @@ public void testConnection(
555541
public void enableCatalog(NameIdentifier ident)
556542
throws NoSuchCatalogException, CatalogNotInUseException {
557543
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
558-
559544
TreeLockUtils.doWithTreeLock(
560545
metalakeIdent,
561546
LockType.WRITE,
562547
() -> {
563-
checkMetalake(metalakeIdent, store);
548+
BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
549+
baseCatalog.checkMetalakeInUse();
564550

565-
try {
566-
if (catalogInUse(store, ident)) {
567-
return null;
568-
}
551+
if (baseCatalog.catalogInUse()) {
552+
return null;
553+
}
569554

555+
try {
570556
store.update(
571557
ident,
572558
CatalogEntity.class,
@@ -595,17 +581,18 @@ public void enableCatalog(NameIdentifier ident)
595581
@Override
596582
public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException {
597583
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
598-
599584
TreeLockUtils.doWithTreeLock(
600585
metalakeIdent,
601586
LockType.WRITE,
602587
() -> {
603-
checkMetalake(metalakeIdent, store);
588+
BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
589+
baseCatalog.checkMetalakeInUse();
590+
591+
if (!baseCatalog.catalogInUse()) {
592+
return null;
593+
}
604594

605595
try {
606-
if (!catalogInUse(store, ident)) {
607-
return null;
608-
}
609596
store.update(
610597
ident,
611598
CatalogEntity.class,
@@ -643,20 +630,21 @@ public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException {
643630
@Override
644631
public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
645632
throws NoSuchCatalogException, IllegalArgumentException {
633+
646634
TreeLockUtils.doWithTreeLock(
647635
ident,
648636
LockType.READ,
649637
() -> {
650-
checkCatalogInUse(store, ident);
651-
652638
// There could be a race issue that someone is using the catalog from cache while we are
653639
// updating it.
654-
655640
CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
656641
if (catalogWrapper == null) {
657642
throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, ident);
658643
}
659644

645+
BaseCatalog catalog = catalogWrapper.catalog();
646+
catalog.checkMetalakeAndCatalogInUse();
647+
660648
try {
661649
catalogWrapper.doWithPropertiesMeta(
662650
f -> {
@@ -739,9 +727,11 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
739727
metalakeIdent,
740728
LockType.WRITE,
741729
() -> {
742-
checkMetalake(metalakeIdent, store);
743730
try {
744-
boolean catalogInUse = getCatalogInUseValue(store, ident);
731+
CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
732+
catalogWrapper.catalog().checkMetalakeInUse();
733+
734+
boolean catalogInUse = catalogWrapper.catalog().catalogInUse();
745735
if (catalogInUse && !force) {
746736
throw new CatalogInUseException(
747737
"Catalog %s is in use, please disable it first or use force option", ident);
@@ -750,8 +740,6 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
750740
Namespace schemaNs = Namespace.of(ident.namespace().level(0), ident.name());
751741
List<SchemaEntity> schemaEntities =
752742
store.list(schemaNs, SchemaEntity.class, EntityType.SCHEMA);
753-
754-
CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
755743
if (!force && containsUserCreatedSchemas(schemaEntities, catalogWrapper)) {
756744
throw new NonEmptyCatalogException(
757745
"Catalog %s has schemas, please drop them first or use force option", ident);
@@ -865,35 +853,6 @@ public CatalogWrapper loadCatalogAndWrap(NameIdentifier ident) throws NoSuchCata
865853
return catalogCache.get(ident, this::loadCatalogInternal);
866854
}
867855

868-
private boolean catalogInUse(EntityStore store, NameIdentifier ident)
869-
throws NoSuchMetalakeException, NoSuchCatalogException {
870-
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
871-
return metalakeInUse(store, metalakeIdent) && getCatalogInUseValue(store, ident);
872-
}
873-
874-
private boolean getCatalogInUseValue(EntityStore store, NameIdentifier catalogIdent) {
875-
try {
876-
CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent);
877-
CatalogEntity catalogEntity;
878-
if (wrapper != null) {
879-
catalogEntity = wrapper.catalog.entity();
880-
} else {
881-
catalogEntity = store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class);
882-
}
883-
return (boolean)
884-
BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault(
885-
catalogEntity.getProperties(), PROPERTY_IN_USE);
886-
887-
} catch (NoSuchEntityException e) {
888-
LOG.warn("Catalog {} does not exist", catalogIdent, e);
889-
throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, catalogIdent);
890-
891-
} catch (IOException e) {
892-
LOG.error("Failed to do store operation", e);
893-
throw new RuntimeException(e);
894-
}
895-
}
896-
897856
private boolean isManagedStorageCatalog(CatalogWrapper catalogWrapper) {
898857
try {
899858
Capability capability = catalogWrapper.capabilities();
@@ -1026,22 +985,6 @@ private Map<String, String> getResolvedProperties(CatalogEntity entity) {
1026985
}
1027986
}
1028987

1029-
private Set<String> getHiddenPropertyNames(CatalogEntity entity) {
1030-
Map<String, String> conf = entity.getProperties();
1031-
String provider = entity.getProvider();
1032-
1033-
try (IsolatedClassLoader classLoader = createClassLoader(provider, conf)) {
1034-
BaseCatalog<?> catalog = createBaseCatalog(classLoader, entity);
1035-
return classLoader.withClassLoader(
1036-
cl ->
1037-
catalog.catalogPropertiesMetadata().propertyEntries().values().stream()
1038-
.filter(PropertyEntry::isHidden)
1039-
.map(PropertyEntry::getName)
1040-
.collect(Collectors.toSet()),
1041-
RuntimeException.class);
1042-
}
1043-
}
1044-
1045988
private BaseCatalog<?> createBaseCatalog(IsolatedClassLoader classLoader, CatalogEntity entity) {
1046989
// Load Catalog class instance
1047990
BaseCatalog<?> catalog = createCatalogInstance(classLoader, entity.getProvider());
@@ -1255,4 +1198,52 @@ private CatalogEntity convertFilesetCatalogEntity(CatalogEntity entity) {
12551198
// If the provider is not "hadoop", we assume it is already a fileset catalog entity.
12561199
return entity;
12571200
}
1201+
1202+
/**
1203+
* Set the metalake in-use status in a specified catalog.
1204+
*
1205+
* @param nameIdentifier The name identifier of the catalog.
1206+
* @param status The in-use status to set.
1207+
*/
1208+
public void setMetalakeInUseStatus(NameIdentifier nameIdentifier, boolean status) {
1209+
updateCatalogProperty(nameIdentifier, PROPERTY_METALAKE_IN_USE, String.valueOf(status));
1210+
}
1211+
1212+
private void updateCatalogProperty(
1213+
NameIdentifier nameIdentifier, String propertyKey, String propertyValue) {
1214+
try {
1215+
store.update(
1216+
nameIdentifier,
1217+
CatalogEntity.class,
1218+
EntityType.CATALOG,
1219+
catalog -> {
1220+
CatalogEntity.Builder newCatalogBuilder =
1221+
newCatalogBuilder(nameIdentifier.namespace(), catalog);
1222+
1223+
Map<String, String> newProps =
1224+
catalog.getProperties() == null
1225+
? new HashMap<>()
1226+
: new HashMap<>(catalog.getProperties());
1227+
newProps.put(propertyKey, propertyValue);
1228+
newCatalogBuilder.withProperties(newProps);
1229+
1230+
return newCatalogBuilder.build();
1231+
});
1232+
catalogCache.invalidate(nameIdentifier);
1233+
1234+
} catch (NoSuchCatalogException e) {
1235+
LOG.error("Catalog {} does not exist", nameIdentifier, e);
1236+
throw new RuntimeException(e);
1237+
} catch (IllegalArgumentException e) {
1238+
LOG.error(
1239+
"Failed to update catalog {} property {} with unknown change",
1240+
nameIdentifier,
1241+
propertyKey,
1242+
e);
1243+
throw e;
1244+
} catch (IOException ioe) {
1245+
LOG.error("Failed to update catalog {} property {}", nameIdentifier, propertyKey, ioe);
1246+
throw new RuntimeException(ioe);
1247+
}
1248+
}
12581249
}

0 commit comments

Comments
 (0)