Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/gravitino/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ enum CloudName {
/** The property indicates the catalog is in use. */
String PROPERTY_IN_USE = "in-use";

/**
* The property indicates the Metalake catalog is in use and this catalog is managed by Metalake.
*/
String PROPERTY_METALAKE_IN_USE = "metalake-in-use";

/**
* The property name for the catalog location. This property indicates the physical location of
* the catalog's data, such as a file path or a URI.
Expand Down
63 changes: 38 additions & 25 deletions core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.apache.gravitino.exceptions.CatalogInUseException;
import org.apache.gravitino.exceptions.CatalogNotInUseException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
Expand Down Expand Up @@ -115,17 +114,6 @@ public class CatalogManager implements CatalogDispatcher, Closeable {

private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);

public void checkCatalogInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException, NoSuchCatalogException, CatalogNotInUseException,
MetalakeNotInUseException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
checkMetalake(metalakeIdent, store);

if (!getCatalogInUseValue(store, ident)) {
throw new CatalogNotInUseException("Catalog %s is not in use, please enable it first", ident);
}
}

/** Wrapper class for a catalog instance and its class loader. */
public static class CatalogWrapper {

Expand Down Expand Up @@ -559,13 +547,7 @@ public void enableCatalog(NameIdentifier ident)
metalakeIdent,
LockType.WRITE,
() -> {
checkMetalake(metalakeIdent, store);

try {
if (catalogInUse(store, ident)) {
return null;
}

store.update(
ident,
CatalogEntity.class,
Expand All @@ -591,6 +573,44 @@ public void enableCatalog(NameIdentifier ident)
});
}

public void updateCatalogProperty(
NameIdentifier nameIdentifier, String propertyKey, String propertyValue) {
try {
store.update(
nameIdentifier,
CatalogEntity.class,
EntityType.CATALOG,
catalog -> {
CatalogEntity.Builder newCatalogBuilder =
newCatalogBuilder(nameIdentifier.namespace(), catalog);

Map<String, String> newProps =
catalog.getProperties() == null
? new HashMap<>()
: new HashMap<>(catalog.getProperties());
newProps.put(propertyKey, propertyValue);
newCatalogBuilder.withProperties(newProps);

return newCatalogBuilder.build();
});
catalogCache.invalidate(nameIdentifier);

} catch (NoSuchCatalogException e) {
LOG.error("Catalog {} does not exist", nameIdentifier, e);
throw new RuntimeException(e);
} catch (IllegalArgumentException e) {
LOG.error(
"Failed to update catalog {} property {} with unknown change",
nameIdentifier,
propertyKey,
e);
throw e;
} catch (IOException ioe) {
LOG.error("Failed to update catalog {} property {}", nameIdentifier, propertyKey, ioe);
throw new RuntimeException(ioe);
}
}

@Override
public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException {
NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels());
Expand All @@ -599,12 +619,7 @@ public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException {
metalakeIdent,
LockType.WRITE,
() -> {
checkMetalake(metalakeIdent, store);

try {
if (!catalogInUse(store, ident)) {
return null;
}
store.update(
ident,
CatalogEntity.class,
Expand Down Expand Up @@ -646,8 +661,6 @@ public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
ident,
LockType.READ,
() -> {
checkCatalogInUse(store, ident);

// There could be a race issue that someone is using the catalog from cache while we are
// updating it.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@
*/
public class OperationDispatcherInterceptor implements InvocationHandler {
private final Object dispatcher;

@SuppressWarnings("unused")
private final CatalogManager catalogManager;

@SuppressWarnings("unused")
private final EntityStore store;

/**
Expand Down Expand Up @@ -87,6 +91,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}

if (catalogIdent != null) {
@SuppressWarnings("unused")
final NameIdentifier finalCatalogIdent = catalogIdent;
// Note: In this implementation, the catalog-in-use check is performed separately
// under a tree lock before invoking the main operation. In the original code,
Expand All @@ -101,7 +106,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
catalogIdent,
LockType.READ,
() -> {
catalogManager.checkCatalogInUse(store, finalCatalogIdent);
return null;
});
}
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.gravitino.connector.authorization.BaseAuthorization;
import org.apache.gravitino.connector.capability.Capability;
import org.apache.gravitino.credential.CatalogCredentialManager;
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.utils.IsolatedClassLoader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -175,6 +176,8 @@ public CatalogOperations ops() {
Preconditions.checkArgument(
entity != null && conf != null, "entity and conf must be set before calling ops()");
CatalogOperations newOps = createOps(conf);
// Check metalake and catalogInuse
checkMetalakeAndCatalogInUse(entity);
newOps.initialize(conf, entity.toCatalogInfo(), this);
ops =
newProxyPlugin(conf)
Expand Down Expand Up @@ -376,4 +379,21 @@ public Audit auditInfo() {
private CatalogOperations asProxyOps(CatalogOperations ops, ProxyPlugin plugin) {
return OperationsProxy.createProxy(ops, plugin);
}

private void checkMetalakeAndCatalogInUse(CatalogEntity catalogEntity) {
boolean metalakeInuse =
Boolean.parseBoolean(
catalogEntity.getProperties().getOrDefault(Catalog.PROPERTY_METALAKE_IN_USE, "true"));
if (!metalakeInuse) {
throw new MetalakeNotInUseException(
String.format("The metalake that holds catalog %s is not in use", catalogEntity.name()));
}

boolean catalogInuse =
Boolean.parseBoolean(catalogEntity.getProperties().getOrDefault(PROPERTY_IN_USE, "true"));
if (!catalogInuse) {
throw new MetalakeNotInUseException(
String.format("The catalog %s is not in use", catalogEntity.name()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
PROPERTY_IN_USE,
"The property indicating the catalog is in use",
true /* default value */,
false /* hidden */)),
false /* hidden */),
PropertyEntry.booleanReservedPropertyEntry(
PROPERTY_IN_USE,
"The property indicating the metalake that holds the catalog is in use",
true /* default value */,
true /* hidden */)),
PropertyEntry::getName);

@Override
Expand Down
106 changes: 72 additions & 34 deletions core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.MetalakeChange;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
Expand Down Expand Up @@ -342,25 +344,43 @@ public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
() -> {
try {
boolean inUse = metalakeInUse(store, ident);
if (!inUse) {
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "true");
builder.withProperties(newProps);

return builder.build();
});
if (inUse) {
return null;
}

store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "true");
builder.withProperties(newProps);

return builder.build();
});

// The only problem is that we can't make sure we can change all catalog properties
// in a transaction, if any of them fails, the metalake is already enabled and the value
// in catalog is inconsistent.
store
.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG)
.forEach(
catalogEntity -> {
// update the properties metalake-in-use in catalog to false
GravitinoEnv.getInstance()
.catalogManager()
.updateCatalogProperty(
catalogEntity.nameIdentifier(),
Catalog.PROPERTY_METALAKE_IN_USE,
"false");
});

return null;
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -376,24 +396,42 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
() -> {
try {
boolean inUse = metalakeInUse(store, ident);
if (inUse) {
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "false");
builder.withProperties(newProps);

return builder.build();
});
if (!inUse) {
return null;
}

store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "false");
builder.withProperties(newProps);

return builder.build();
});

// The only problem is that we can't make sure we can change all catalog properties
// in a transaction, if any of them fails, the metalake is already enabled and the value
// in catalog is inconsistent.
store
.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG)
.forEach(
catalogEntity -> {
// update the properties metalake-in-use in catalog to false
GravitinoEnv.getInstance()
.catalogManager()
.updateCatalogProperty(
catalogEntity.nameIdentifier(),
Catalog.PROPERTY_METALAKE_IN_USE,
"false");
});
return null;
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Loading