Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void testPropertyMeta() {
Map<String, PropertyEntry<?>> propertyEntryMap =
HIVE_PROPERTIES_METADATA.catalogPropertiesMetadata().propertyEntries();

Assertions.assertEquals(17, propertyEntryMap.size());
Assertions.assertEquals(18, propertyEntryMap.size());
Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ public void testCatalogAvailable() {
Assertions.assertThrows(
CatalogNotInUseException.class,
() -> metalake.alterCatalog(catalogName, CatalogChange.updateComment("new comment")));
Assertions.assertTrue(
exception.getMessage().contains("please enable it first"), exception.getMessage());
Assertions.assertTrue(exception.getMessage().contains("is not in use"), exception.getMessage());

// test schema operations under non-in-use catalog
SupportsSchemas schemaOps = loadedCatalog.asSchemas();
Expand Down
57 changes: 7 additions & 50 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.gravitino;

import com.google.common.base.Preconditions;
import java.lang.reflect.Proxy;
import org.apache.gravitino.audit.AuditLogManager;
import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.AccessControlManager;
Expand All @@ -38,7 +37,6 @@
import org.apache.gravitino.catalog.ModelDispatcher;
import org.apache.gravitino.catalog.ModelNormalizeDispatcher;
import org.apache.gravitino.catalog.ModelOperationDispatcher;
import org.apache.gravitino.catalog.OperationDispatcherInterceptor;
import org.apache.gravitino.catalog.PartitionDispatcher;
import org.apache.gravitino.catalog.PartitionNormalizeDispatcher;
import org.apache.gravitino.catalog.PartitionOperationDispatcher;
Expand Down Expand Up @@ -541,28 +539,14 @@ private void initGravitinoServerComponents() {

SchemaOperationDispatcher schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
SchemaDispatcher schemaDispatcherProxy =
(SchemaDispatcher)
Proxy.newProxyInstance(
SchemaDispatcher.class.getClassLoader(),
new Class[] {SchemaDispatcher.class},
new OperationDispatcherInterceptor(
schemaOperationDispatcher, catalogManager, entityStore));
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaDispatcherProxy);
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaOperationDispatcher);
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
this.schemaDispatcher = new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);

TableOperationDispatcher tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
TableDispatcher tableDispatcherProxy =
(TableDispatcher)
Proxy.newProxyInstance(
TableDispatcher.class.getClassLoader(),
new Class[] {TableDispatcher.class},
new OperationDispatcherInterceptor(
tableOperationDispatcher, catalogManager, entityStore));
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableDispatcherProxy);
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableOperationDispatcher);
TableNormalizeDispatcher tableNormalizeDispatcher =
new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);
Expand All @@ -571,55 +555,28 @@ private void initGravitinoServerComponents() {
// partition doesn't have ownership, so we don't need it now.
PartitionOperationDispatcher partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);
PartitionDispatcher partitionDispatcherProxy =
(PartitionDispatcher)
Proxy.newProxyInstance(
PartitionDispatcher.class.getClassLoader(),
new Class[] {PartitionDispatcher.class},
new OperationDispatcherInterceptor(
partitionOperationDispatcher, catalogManager, entityStore));
PartitionNormalizeDispatcher partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(partitionDispatcherProxy, catalogManager);
new PartitionNormalizeDispatcher(partitionOperationDispatcher, catalogManager);
this.partitionDispatcher = new PartitionEventDispatcher(eventBus, partitionNormalizeDispatcher);

FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
FilesetDispatcher filesetDispatcherProxy =
(FilesetDispatcher)
Proxy.newProxyInstance(
FilesetDispatcher.class.getClassLoader(),
new Class[] {FilesetDispatcher.class},
new OperationDispatcherInterceptor(
filesetOperationDispatcher, catalogManager, entityStore));
FilesetHookDispatcher filesetHookDispatcher = new FilesetHookDispatcher(filesetDispatcherProxy);
FilesetHookDispatcher filesetHookDispatcher =
new FilesetHookDispatcher(filesetOperationDispatcher);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);

TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
TopicDispatcher topicDispatcherProxy =
(TopicDispatcher)
Proxy.newProxyInstance(
TopicDispatcher.class.getClassLoader(),
new Class[] {TopicDispatcher.class},
new OperationDispatcherInterceptor(
topicOperationDispatcher, catalogManager, entityStore));
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicDispatcherProxy);
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicOperationDispatcher);
TopicNormalizeDispatcher topicNormalizeDispatcher =
new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);

ModelOperationDispatcher modelOperationDispatcher =
new ModelOperationDispatcher(catalogManager, entityStore, idGenerator);
ModelDispatcher modelDispatcherProxy =
(ModelDispatcher)
Proxy.newProxyInstance(
ModelDispatcher.class.getClassLoader(),
new Class[] {ModelDispatcher.class},
new OperationDispatcherInterceptor(
modelOperationDispatcher, catalogManager, entityStore));
ModelHookDispatcher modelHookDispatcher = new ModelHookDispatcher(modelDispatcherProxy);
ModelHookDispatcher modelHookDispatcher = new ModelHookDispatcher(modelOperationDispatcher);
ModelNormalizeDispatcher modelNormalizeDispatcher =
new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
this.modelDispatcher = new ModelEventDispatcher(eventBus, modelNormalizeDispatcher);
Expand Down
153 changes: 72 additions & 81 deletions core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import static org.apache.gravitino.StringIdentifier.DUMMY_ID;
import static org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;
import static org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
import static org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.BASIC_CATALOG_PROPERTIES_METADATA;
import static org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.PROPERTY_METALAKE_IN_USE;
import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
import static org.apache.gravitino.metalake.MetalakeManager.metalakeInUse;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -76,15 +75,13 @@
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertyEntry;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.connector.authorization.BaseAuthorization;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
import org.apache.gravitino.exceptions.CatalogInUseException;
import org.apache.gravitino.exceptions.CatalogNotInUseException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
Expand Down Expand Up @@ -116,17 +113,6 @@ public class CatalogManager implements CatalogDispatcher, Closeable {

private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);

public void checkCatalogInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException, NoSuchCatalogException, CatalogNotInUseException,
MetalakeNotInUseException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
checkMetalake(metalakeIdent, store);

if (!getCatalogInUseValue(store, ident)) {
throw new CatalogNotInUseException("Catalog %s is not in use, please enable it first", ident);
}
}

/** Wrapper class for a catalog instance and its class loader. */
public static class CatalogWrapper {

Expand Down Expand Up @@ -388,13 +374,13 @@ public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeExce
*/
@Override
public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
return TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> {
checkMetalake(metalakeIdent, store);
return loadCatalogAndWrap(ident).catalog;
BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
baseCatalog.checkMetalakeInUse();
return baseCatalog;
});
}

Expand Down Expand Up @@ -555,18 +541,18 @@ public void testConnection(
public void enableCatalog(NameIdentifier ident)
throws NoSuchCatalogException, CatalogNotInUseException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());

TreeLockUtils.doWithTreeLock(
metalakeIdent,
LockType.WRITE,
() -> {
checkMetalake(metalakeIdent, store);
BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
baseCatalog.checkMetalakeInUse();

try {
if (catalogInUse(store, ident)) {
return null;
}
if (baseCatalog.catalogInUse()) {
return null;
}

try {
store.update(
ident,
CatalogEntity.class,
Expand Down Expand Up @@ -595,17 +581,18 @@ public void enableCatalog(NameIdentifier ident)
@Override
public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());

TreeLockUtils.doWithTreeLock(
metalakeIdent,
LockType.WRITE,
() -> {
checkMetalake(metalakeIdent, store);
BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
baseCatalog.checkMetalakeInUse();

if (!baseCatalog.catalogInUse()) {
return null;
}

try {
if (!catalogInUse(store, ident)) {
return null;
}
store.update(
ident,
CatalogEntity.class,
Expand Down Expand Up @@ -643,20 +630,21 @@ public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException {
@Override
public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
throws NoSuchCatalogException, IllegalArgumentException {

TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> {
checkCatalogInUse(store, ident);

// There could be a race issue that someone is using the catalog from cache while we are
// updating it.

CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
if (catalogWrapper == null) {
throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, ident);
}

BaseCatalog catalog = catalogWrapper.catalog();
catalog.checkMetalakeAndCatalogInUse();

try {
catalogWrapper.doWithPropertiesMeta(
f -> {
Expand Down Expand Up @@ -739,9 +727,11 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
metalakeIdent,
LockType.WRITE,
() -> {
checkMetalake(metalakeIdent, store);
try {
boolean catalogInUse = getCatalogInUseValue(store, ident);
CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
catalogWrapper.catalog().checkMetalakeInUse();

boolean catalogInUse = catalogWrapper.catalog().catalogInUse();
if (catalogInUse && !force) {
throw new CatalogInUseException(
"Catalog %s is in use, please disable it first or use force option", ident);
Expand All @@ -750,8 +740,6 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
Namespace schemaNs = Namespace.of(ident.namespace().level(0), ident.name());
List<SchemaEntity> schemaEntities =
store.list(schemaNs, SchemaEntity.class, EntityType.SCHEMA);

CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
if (!force && containsUserCreatedSchemas(schemaEntities, catalogWrapper)) {
throw new NonEmptyCatalogException(
"Catalog %s has schemas, please drop them first or use force option", ident);
Expand Down Expand Up @@ -865,35 +853,6 @@ public CatalogWrapper loadCatalogAndWrap(NameIdentifier ident) throws NoSuchCata
return catalogCache.get(ident, this::loadCatalogInternal);
}

private boolean catalogInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException, NoSuchCatalogException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
return metalakeInUse(store, metalakeIdent) && getCatalogInUseValue(store, ident);
}

private boolean getCatalogInUseValue(EntityStore store, NameIdentifier catalogIdent) {
try {
CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent);
CatalogEntity catalogEntity;
if (wrapper != null) {
catalogEntity = wrapper.catalog.entity();
} else {
catalogEntity = store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class);
}
return (boolean)
BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault(
catalogEntity.getProperties(), PROPERTY_IN_USE);

} catch (NoSuchEntityException e) {
LOG.warn("Catalog {} does not exist", catalogIdent, e);
throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, catalogIdent);

} catch (IOException e) {
LOG.error("Failed to do store operation", e);
throw new RuntimeException(e);
}
}

private boolean isManagedStorageCatalog(CatalogWrapper catalogWrapper) {
try {
Capability capability = catalogWrapper.capabilities();
Expand Down Expand Up @@ -1026,22 +985,6 @@ private Map<String, String> getResolvedProperties(CatalogEntity entity) {
}
}

private Set<String> getHiddenPropertyNames(CatalogEntity entity) {
Map<String, String> conf = entity.getProperties();
String provider = entity.getProvider();

try (IsolatedClassLoader classLoader = createClassLoader(provider, conf)) {
BaseCatalog<?> catalog = createBaseCatalog(classLoader, entity);
return classLoader.withClassLoader(
cl ->
catalog.catalogPropertiesMetadata().propertyEntries().values().stream()
.filter(PropertyEntry::isHidden)
.map(PropertyEntry::getName)
.collect(Collectors.toSet()),
RuntimeException.class);
}
}

private BaseCatalog<?> createBaseCatalog(IsolatedClassLoader classLoader, CatalogEntity entity) {
// Load Catalog class instance
BaseCatalog<?> catalog = createCatalogInstance(classLoader, entity.getProvider());
Expand Down Expand Up @@ -1255,4 +1198,52 @@ private CatalogEntity convertFilesetCatalogEntity(CatalogEntity entity) {
// If the provider is not "hadoop", we assume it is already a fileset catalog entity.
return entity;
}

/**
* Set the metalake in-use status in a specified catalog.
*
* @param nameIdentifier The name identifier of the catalog.
* @param status The in-use status to set.
*/
public void setMetalakeInUseStatus(NameIdentifier nameIdentifier, boolean status) {
updateCatalogProperty(nameIdentifier, PROPERTY_METALAKE_IN_USE, String.valueOf(status));
}

private void updateCatalogProperty(
NameIdentifier nameIdentifier, String propertyKey, String propertyValue) {
try {
store.update(
nameIdentifier,
CatalogEntity.class,
EntityType.CATALOG,
catalog -> {
CatalogEntity.Builder newCatalogBuilder =
newCatalogBuilder(nameIdentifier.namespace(), catalog);

Map<String, String> newProps =
catalog.getProperties() == null
? new HashMap<>()
: new HashMap<>(catalog.getProperties());
newProps.put(propertyKey, propertyValue);
newCatalogBuilder.withProperties(newProps);

return newCatalogBuilder.build();
});
catalogCache.invalidate(nameIdentifier);

} catch (NoSuchCatalogException e) {
LOG.error("Catalog {} does not exist", nameIdentifier, e);
throw new RuntimeException(e);
} catch (IllegalArgumentException e) {
LOG.error(
"Failed to update catalog {} property {} with unknown change",
nameIdentifier,
propertyKey,
e);
throw e;
} catch (IOException ioe) {
LOG.error("Failed to update catalog {} property {}", nameIdentifier, propertyKey, ioe);
throw new RuntimeException(ioe);
}
}
}
Loading