Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,18 @@
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.connector.BaseCatalog;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchFunctionException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.exceptions.NoSuchPartitionException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.function.Function;
import org.apache.gravitino.function.FunctionChange;
import org.apache.gravitino.function.FunctionDefinitions;
import org.apache.gravitino.function.FunctionImpl;
import org.apache.gravitino.function.FunctionImpls;
import org.apache.gravitino.function.FunctionParams;
import org.apache.gravitino.function.FunctionType;
import org.apache.gravitino.hive.HiveClientPool;
import org.apache.gravitino.hive.HivePartition;
import org.apache.gravitino.hive.HiveSchema;
Expand All @@ -78,6 +86,7 @@
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
Expand Down Expand Up @@ -762,6 +771,71 @@ public void testListTables() {
Assertions.assertEquals(1, tables.length);
}

@Test
public void testFunctions() throws InterruptedException {
// test list functions in a schema which not created by Gravitino
String schemaName1 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
hiveClientPool.run(
client -> {
client.createDatabase(
HiveSchema.builder()
.withName(schemaName1)
.withCatalogName(hmsCatalog)
.withAuditInfo(AuditInfo.EMPTY)
.build());
return null;
});
NameIdentifier[] functionIdents =
catalog.asFunctionCatalog().listFunctions(Namespace.of(schemaName1));
Assertions.assertEquals(0, functionIdents.length);

// test register functions in a schema which not created by Gravitino
String schemaName2 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
hiveClientPool.run(
client -> {
client.createDatabase(
HiveSchema.builder()
.withName(schemaName2)
.withCatalogName(hmsCatalog)
.withAuditInfo(AuditInfo.EMPTY)
.build());
return null;
});
Function function =
catalog
.asFunctionCatalog()
.registerFunction(
NameIdentifier.of(schemaName2, "test_func"),
"test comment",
FunctionType.SCALAR,
true,
Types.StringType.get(),
FunctionDefinitions.of(
FunctionDefinitions.of(
FunctionParams.of(FunctionParams.of("input", Types.StringType.get())),
FunctionImpls.of(
FunctionImpls.ofJava(
FunctionImpl.RuntimeType.SPARK, "mock.udf.class.name")))));
Assertions.assertEquals("test_func", function.name());

// test alter a non-existing function under a schema which not created by Gravitino
String schemaName3 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
NameIdentifier id = NameIdentifier.of(schemaName3, "test_func");
NoSuchFunctionException exception =
assertThrows(
NoSuchFunctionException.class,
() ->
catalog
.asFunctionCatalog()
.alterFunction(id, FunctionChange.updateComment("new comment")));
Assertions.assertTrue(exception.getMessage().contains("does not exist"));

// test drop a non-existing function under a chema which not created by Gravitino
String schemaName4 = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
NameIdentifier id2 = NameIdentifier.of(schemaName4, "test_func");
Assertions.assertFalse(catalog.asFunctionCatalog().dropFunction(id2));
}

@Test
public void testHiveSchemaProperties() throws TException, InterruptedException {
Assumptions.assumeTrue(enableSparkTest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static HiveSchema fromHiveDB(Database db) {
.build();
return hiveSchema;
}

/**
* Add a comment on lines L57 to L65Add diff commentMarkdown input: edit mode
* selected.WritePreviewHeadingBoldItalicQuoteCodeLinkUnordered listNumbered listTask
Expand All @@ -78,14 +79,17 @@ public static Database toHiveDb(HiveSchema hiveSchema) {
Database hiveDb = new Database();

hiveDb.setName(hiveSchema.name());
Optional.ofNullable(hiveSchema.properties().get(LOCATION)).ifPresent(hiveDb::setLocationUri);
Optional.ofNullable(hiveSchema.properties())
.map(props -> props.get(LOCATION))
.ifPresent(hiveDb::setLocationUri);
Optional.ofNullable(hiveSchema.comment()).ifPresent(hiveDb::setDescription);

// TODO: Add more privilege info to Hive's Database object after Gravitino supports privilege.
hiveDb.setOwnerName(hiveSchema.auditInfo().creator());
hiveDb.setOwnerType(PrincipalType.USER);

Map<String, String> parameters = new HashMap<>(hiveSchema.properties());
Map<String, String> parameters =
Optional.ofNullable(hiveSchema.properties()).map(HashMap::new).orElseGet(HashMap::new);
parameters.remove(LOCATION);
hiveDb.setParameters(parameters);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.PolicyAlreadyAssociatedException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.function.Function;
import org.apache.gravitino.function.FunctionCatalog;
import org.apache.gravitino.function.FunctionChange;
import org.apache.gravitino.function.FunctionColumn;
import org.apache.gravitino.function.FunctionDefinition;
import org.apache.gravitino.function.FunctionType;
import org.apache.gravitino.policy.Policy;
import org.apache.gravitino.policy.SupportsPolicies;
import org.apache.gravitino.rest.RESTUtils;
Expand All @@ -58,7 +64,12 @@
* create, load, alter and drop a schema with specified identifier.
*/
abstract class BaseSchemaCatalog extends CatalogDTO
implements Catalog, SupportsSchemas, SupportsTags, SupportsRoles, SupportsPolicies {
implements Catalog,
SupportsSchemas,
SupportsTags,
SupportsRoles,
SupportsPolicies,
FunctionCatalog {

/** The REST client to send the requests. */
protected final RESTClient restClient;
Expand All @@ -70,6 +81,7 @@ abstract class BaseSchemaCatalog extends CatalogDTO
private final MetadataObjectPolicyOperations objectPolicyOperations;
private final MetadataObjectRoleOperations objectRoleOperations;
protected final MetadataObjectCredentialOperations objectCredentialOperations;
private final FunctionCatalogOperations functionOperations;

BaseSchemaCatalog(
Namespace catalogNamespace,
Expand Down Expand Up @@ -100,6 +112,8 @@ abstract class BaseSchemaCatalog extends CatalogDTO
this.objectCredentialOperations =
new MetadataObjectCredentialOperations(
catalogNamespace.level(0), metadataObject, restClient);
this.functionOperations =
new FunctionCatalogOperations(restClient, catalogNamespace, this.name());
}

@Override
Expand Down Expand Up @@ -317,4 +331,57 @@ static String formatSchemaRequestPath(Namespace ns) {
.append("/schemas")
.toString();
}

@Override
public FunctionCatalog asFunctionCatalog() {
return this;
}

@Override
public NameIdentifier[] listFunctions(Namespace namespace) {
return functionOperations.listFunctions(namespace);
}

@Override
public Function[] listFunctionInfos(Namespace namespace) throws NoSuchSchemaException {
return functionOperations.listFunctionInfos(namespace);
}

@Override
public Function getFunction(NameIdentifier ident) {
return functionOperations.getFunction(ident);
}

@Override
public Function registerFunction(
NameIdentifier ident,
String comment,
FunctionType functionType,
boolean deterministic,
org.apache.gravitino.rel.types.Type returnType,
FunctionDefinition[] definitions) {
return functionOperations.registerFunction(
ident, comment, functionType, deterministic, returnType, definitions);
}

@Override
public Function registerFunction(
NameIdentifier ident,
String comment,
boolean deterministic,
FunctionColumn[] returnColumns,
FunctionDefinition[] definitions) {
return functionOperations.registerFunction(
ident, comment, deterministic, returnColumns, definitions);
}

@Override
public Function alterFunction(NameIdentifier ident, FunctionChange... changes) {
return functionOperations.alterFunction(ident, changes);
}

@Override
public boolean dropFunction(NameIdentifier ident) {
return functionOperations.dropFunction(ident);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import org.apache.gravitino.dto.MetalakeDTO;
import org.apache.gravitino.dto.authorization.PrivilegeDTO;
import org.apache.gravitino.dto.authorization.SecurableObjectDTO;
import org.apache.gravitino.dto.function.FunctionColumnDTO;
import org.apache.gravitino.dto.function.FunctionDefinitionDTO;
import org.apache.gravitino.dto.function.FunctionImplDTO;
import org.apache.gravitino.dto.function.FunctionParamDTO;
import org.apache.gravitino.dto.job.JobTemplateDTO;
import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
import org.apache.gravitino.dto.job.ShellTemplateUpdateDTO;
Expand All @@ -44,6 +48,7 @@
import org.apache.gravitino.dto.job.TemplateUpdateDTO;
import org.apache.gravitino.dto.requests.CatalogUpdateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
import org.apache.gravitino.dto.requests.FunctionUpdateRequest;
import org.apache.gravitino.dto.requests.JobTemplateUpdateRequest;
import org.apache.gravitino.dto.requests.MetalakeUpdateRequest;
import org.apache.gravitino.dto.requests.ModelUpdateRequest;
Expand All @@ -54,6 +59,10 @@
import org.apache.gravitino.dto.requests.TagUpdateRequest;
import org.apache.gravitino.dto.requests.TopicUpdateRequest;
import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.function.FunctionChange;
import org.apache.gravitino.function.FunctionColumn;
import org.apache.gravitino.function.FunctionDefinition;
import org.apache.gravitino.function.FunctionParam;
import org.apache.gravitino.job.JobTemplate;
import org.apache.gravitino.job.JobTemplateChange;
import org.apache.gravitino.job.ShellJobTemplate;
Expand Down Expand Up @@ -552,4 +561,61 @@ static TemplateUpdateDTO toTemplateUpdateDTO(JobTemplateChange.TemplateUpdate ch
"Unknown template update type: " + change.getClass().getSimpleName());
}
}

static FunctionUpdateRequest toFunctionUpdateRequest(FunctionChange change) {
if (change instanceof FunctionChange.UpdateComment) {
return new FunctionUpdateRequest.UpdateCommentRequest(
((FunctionChange.UpdateComment) change).newComment());

} else if (change instanceof FunctionChange.AddDefinition) {
FunctionDefinition def = ((FunctionChange.AddDefinition) change).definition();
return new FunctionUpdateRequest.AddDefinitionRequest(
FunctionDefinitionDTO.fromFunctionDefinition(def));

} else if (change instanceof FunctionChange.RemoveDefinition) {
FunctionParam[] params = ((FunctionChange.RemoveDefinition) change).parameters();
return new FunctionUpdateRequest.RemoveDefinitionRequest(toFunctionParamDTOs(params));

} else if (change instanceof FunctionChange.AddImpl) {
FunctionChange.AddImpl addImpl = (FunctionChange.AddImpl) change;
return new FunctionUpdateRequest.AddImplRequest(
toFunctionParamDTOs(addImpl.parameters()),
FunctionImplDTO.fromFunctionImpl(addImpl.implementation()));

} else if (change instanceof FunctionChange.UpdateImpl) {
FunctionChange.UpdateImpl updateImpl = (FunctionChange.UpdateImpl) change;
return new FunctionUpdateRequest.UpdateImplRequest(
toFunctionParamDTOs(updateImpl.parameters()),
updateImpl.runtime().name(),
FunctionImplDTO.fromFunctionImpl(updateImpl.implementation()));

} else if (change instanceof FunctionChange.RemoveImpl) {
FunctionChange.RemoveImpl removeImpl = (FunctionChange.RemoveImpl) change;
return new FunctionUpdateRequest.RemoveImplRequest(
toFunctionParamDTOs(removeImpl.parameters()), removeImpl.runtime().name());

} else {
throw new IllegalArgumentException(
"Unknown function change type: " + change.getClass().getSimpleName());
}
}

static FunctionDefinitionDTO toFunctionDefinitionDTO(FunctionDefinition definition) {
return FunctionDefinitionDTO.fromFunctionDefinition(definition);
}

static FunctionColumnDTO toFunctionColumnDTO(FunctionColumn column) {
return FunctionColumnDTO.fromFunctionColumn(column);
}

private static FunctionParamDTO[] toFunctionParamDTOs(FunctionParam[] params) {
if (params == null) {
return new FunctionParamDTO[0];
}
FunctionParamDTO[] dtos = new FunctionParamDTO[params.length];
for (int i = 0; i < params.length; i++) {
dtos[i] = FunctionParamDTO.fromFunctionParam(params[i]);
}
return dtos;
}
}
Loading