Skip to content

Commit cc9af59

Browse files
committed
[#9528] feat(core): Add storage layer support for UDF persistence
This PR implements the storage layer for UDF (User-Defined Function) persistence in the relational database backend. Changes include: - Add FunctionPO and FunctionVersionPO for database persistence - Add FunctionMetaMapper and FunctionVersionMetaMapper for MyBatis - Add FunctionMetaService for function metadata operations - Add SQL schema scripts for MySQL, PostgreSQL, and H2 (v1.2.0) - Add upgrade scripts from v1.1.0 to v1.2.0 - Update JDBCBackend to support FUNCTION entity type - Add unit tests for FunctionMetaService
1 parent 5872174 commit cc9af59

28 files changed

+4491
-3
lines changed

common/src/main/java/org/apache/gravitino/config/ConfigConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,5 @@ private ConfigConstants() {}
8787
public static final String VERSION_1_2_0 = "1.2.0";
8888

8989
/** The current version of backend storage initialization script. */
90-
public static final String CURRENT_SCRIPT_VERSION = VERSION_1_1_0;
90+
public static final String CURRENT_SCRIPT_VERSION = VERSION_1_2_0;
9191
}

core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.gravitino.meta.BaseMetalake;
4545
import org.apache.gravitino.meta.CatalogEntity;
4646
import org.apache.gravitino.meta.FilesetEntity;
47+
import org.apache.gravitino.meta.FunctionEntity;
4748
import org.apache.gravitino.meta.GroupEntity;
4849
import org.apache.gravitino.meta.JobEntity;
4950
import org.apache.gravitino.meta.JobTemplateEntity;
@@ -61,6 +62,7 @@
6162
import org.apache.gravitino.storage.relational.database.H2Database;
6263
import org.apache.gravitino.storage.relational.service.CatalogMetaService;
6364
import org.apache.gravitino.storage.relational.service.FilesetMetaService;
65+
import org.apache.gravitino.storage.relational.service.FunctionMetaService;
6466
import org.apache.gravitino.storage.relational.service.GroupMetaService;
6567
import org.apache.gravitino.storage.relational.service.JobMetaService;
6668
import org.apache.gravitino.storage.relational.service.JobTemplateMetaService;
@@ -134,6 +136,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
134136
case MODEL_VERSION:
135137
return (List<E>)
136138
ModelVersionMetaService.getInstance().listModelVersionsByNamespace(namespace);
139+
case FUNCTION:
140+
return (List<E>) FunctionMetaService.getInstance().listFunctionsByNamespace(namespace);
137141
case POLICY:
138142
return (List<E>) PolicyMetaService.getInstance().listPoliciesByNamespace(namespace);
139143
case JOB_TEMPLATE:
@@ -194,6 +198,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
194198
+ "inserting the new model version.");
195199
}
196200
ModelVersionMetaService.getInstance().insertModelVersion((ModelVersionEntity) e);
201+
} else if (e instanceof FunctionEntity) {
202+
FunctionMetaService.getInstance().insertFunction((FunctionEntity) e, overwritten);
197203
} else if (e instanceof PolicyEntity) {
198204
PolicyMetaService.getInstance().insertPolicy((PolicyEntity) e, overwritten);
199205
} else if (e instanceof JobTemplateEntity) {
@@ -235,6 +241,8 @@ public <E extends Entity & HasIdentifier> E update(
235241
return (E) ModelMetaService.getInstance().updateModel(ident, updater);
236242
case MODEL_VERSION:
237243
return (E) ModelVersionMetaService.getInstance().updateModelVersion(ident, updater);
244+
case FUNCTION:
245+
return (E) FunctionMetaService.getInstance().updateFunction(ident, updater);
238246
case POLICY:
239247
return (E) PolicyMetaService.getInstance().updatePolicy(ident, updater);
240248
case JOB_TEMPLATE:
@@ -274,6 +282,8 @@ public <E extends Entity & HasIdentifier> E get(
274282
return (E) ModelMetaService.getInstance().getModelByIdentifier(ident);
275283
case MODEL_VERSION:
276284
return (E) ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident);
285+
case FUNCTION:
286+
return (E) FunctionMetaService.getInstance().getFunctionByIdentifier(ident);
277287
case POLICY:
278288
return (E) PolicyMetaService.getInstance().getPolicyByIdentifier(ident);
279289
case JOB_TEMPLATE:
@@ -314,6 +324,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
314324
return ModelMetaService.getInstance().deleteModel(ident);
315325
case MODEL_VERSION:
316326
return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
327+
case FUNCTION:
328+
return FunctionMetaService.getInstance().deleteFunction(ident);
317329
case POLICY:
318330
return PolicyMetaService.getInstance().deletePolicy(ident);
319331
case JOB_TEMPLATE:
@@ -385,6 +397,10 @@ public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimelin
385397
return ModelVersionMetaService.getInstance()
386398
.deleteModelVersionMetasByLegacyTimeline(
387399
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
400+
case FUNCTION:
401+
return FunctionMetaService.getInstance()
402+
.deleteFunctionMetasByLegacyTimeline(
403+
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
388404
case TABLE_STATISTIC:
389405
return StatisticMetaService.getInstance()
390406
.deleteStatisticsByLegacyTimeline(
@@ -397,7 +413,6 @@ public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimelin
397413
return JobMetaService.getInstance()
398414
.deleteJobsByLegacyTimeline(legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
399415
case AUDIT:
400-
case FUNCTION:
401416
return 0;
402417
// TODO: Implement hard delete logic for these entity types.
403418

@@ -427,7 +442,6 @@ public int deleteOldVersionData(Entity.EntityType entityType, long versionRetent
427442
case TABLE_STATISTIC:
428443
case JOB_TEMPLATE:
429444
case JOB:
430-
case FUNCTION: // todo: remove once function versioning is supported
431445
// These entity types have not implemented multi-versions, so we can skip.
432446
return 0;
433447

@@ -441,6 +455,11 @@ public int deleteOldVersionData(Entity.EntityType entityType, long versionRetent
441455
.deletePolicyVersionsByRetentionCount(
442456
versionRetentionCount, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
443457

458+
case FUNCTION:
459+
return FunctionMetaService.getInstance()
460+
.deleteFunctionVersionsByRetentionCount(
461+
versionRetentionCount, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
462+
444463
default:
445464
throw new IllegalArgumentException(
446465
"Unsupported entity type when collectAndRemoveOldVersionData: " + entityType);
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.gravitino.storage.relational.mapper;
20+
21+
import java.util.List;
22+
import org.apache.gravitino.storage.relational.po.FunctionPO;
23+
import org.apache.gravitino.storage.relational.po.FunctionVersionPO;
24+
import org.apache.ibatis.annotations.DeleteProvider;
25+
import org.apache.ibatis.annotations.InsertProvider;
26+
import org.apache.ibatis.annotations.One;
27+
import org.apache.ibatis.annotations.Param;
28+
import org.apache.ibatis.annotations.Result;
29+
import org.apache.ibatis.annotations.ResultMap;
30+
import org.apache.ibatis.annotations.Results;
31+
import org.apache.ibatis.annotations.Select;
32+
import org.apache.ibatis.annotations.SelectProvider;
33+
import org.apache.ibatis.annotations.UpdateProvider;
34+
35+
public interface FunctionMetaMapper {
36+
String TABLE_NAME = "function_meta";
37+
String VERSION_TABLE_NAME = "function_version_info";
38+
39+
@Results(
40+
id = "mapToFunctionVersionPO",
41+
value = {
42+
@Result(property = "id", column = "id", id = true),
43+
@Result(property = "metalakeId", column = "version_metalake_id"),
44+
@Result(property = "catalogId", column = "version_catalog_id"),
45+
@Result(property = "schemaId", column = "version_schema_id"),
46+
@Result(property = "functionId", column = "version_function_id"),
47+
@Result(property = "functionVersion", column = "version"),
48+
@Result(property = "functionComment", column = "function_comment"),
49+
@Result(property = "definitions", column = "definitions"),
50+
@Result(property = "auditInfo", column = "version_audit_info"),
51+
@Result(property = "deletedAt", column = "version_deleted_at")
52+
})
53+
@Select("SELECT 1") // Dummy SQL to avoid MyBatis error, never be executed
54+
FunctionVersionPO mapToFunctionVersionPO();
55+
56+
@InsertProvider(type = FunctionMetaSQLProviderFactory.class, method = "insertFunctionMeta")
57+
void insertFunctionMeta(@Param("functionMeta") FunctionPO functionPO);
58+
59+
@InsertProvider(
60+
type = FunctionMetaSQLProviderFactory.class,
61+
method = "insertFunctionMetaOnDuplicateKeyUpdate")
62+
void insertFunctionMetaOnDuplicateKeyUpdate(@Param("functionMeta") FunctionPO functionPO);
63+
64+
@Results({
65+
@Result(property = "functionId", column = "function_id", id = true),
66+
@Result(property = "functionName", column = "function_name"),
67+
@Result(property = "metalakeId", column = "metalake_id"),
68+
@Result(property = "catalogId", column = "catalog_id"),
69+
@Result(property = "schemaId", column = "schema_id"),
70+
@Result(property = "functionType", column = "function_type"),
71+
@Result(property = "deterministic", column = "deterministic"),
72+
@Result(property = "returnType", column = "return_type"),
73+
@Result(property = "functionCurrentVersion", column = "function_current_version"),
74+
@Result(property = "functionLatestVersion", column = "function_latest_version"),
75+
@Result(property = "auditInfo", column = "audit_info"),
76+
@Result(property = "deletedAt", column = "deleted_at"),
77+
@Result(
78+
property = "functionVersionPO",
79+
javaType = FunctionVersionPO.class,
80+
column =
81+
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_function_id,"
82+
+ "version,function_comment,definitions,version_audit_info,version_deleted_at}",
83+
one = @One(resultMap = "mapToFunctionVersionPO"))
84+
})
85+
@SelectProvider(type = FunctionMetaSQLProviderFactory.class, method = "listFunctionPOsBySchemaId")
86+
List<FunctionPO> listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId);
87+
88+
@Results(
89+
id = "functionPOResultMap",
90+
value = {
91+
@Result(property = "functionId", column = "function_id", id = true),
92+
@Result(property = "functionName", column = "function_name"),
93+
@Result(property = "metalakeId", column = "metalake_id"),
94+
@Result(property = "catalogId", column = "catalog_id"),
95+
@Result(property = "schemaId", column = "schema_id"),
96+
@Result(property = "functionType", column = "function_type"),
97+
@Result(property = "deterministic", column = "deterministic"),
98+
@Result(property = "returnType", column = "return_type"),
99+
@Result(property = "functionCurrentVersion", column = "function_current_version"),
100+
@Result(property = "functionLatestVersion", column = "function_latest_version"),
101+
@Result(property = "auditInfo", column = "audit_info"),
102+
@Result(property = "deletedAt", column = "deleted_at"),
103+
@Result(
104+
property = "functionVersionPO",
105+
javaType = FunctionVersionPO.class,
106+
column =
107+
"{id,version_metalake_id,version_catalog_id,version_schema_id,version_function_id,"
108+
+ "version,function_comment,definitions,version_audit_info,version_deleted_at}",
109+
one = @One(resultMap = "mapToFunctionVersionPO"))
110+
})
111+
@SelectProvider(
112+
type = FunctionMetaSQLProviderFactory.class,
113+
method = "listFunctionPOsByFullQualifiedName")
114+
List<FunctionPO> listFunctionPOsByFullQualifiedName(
115+
@Param("metalakeName") String metalakeName,
116+
@Param("catalogName") String catalogName,
117+
@Param("schemaName") String schemaName);
118+
119+
@ResultMap("functionPOResultMap")
120+
@SelectProvider(
121+
type = FunctionMetaSQLProviderFactory.class,
122+
method = "selectFunctionMetaByFullQualifiedName")
123+
FunctionPO selectFunctionMetaByFullQualifiedName(
124+
@Param("metalakeName") String metalakeName,
125+
@Param("catalogName") String catalogName,
126+
@Param("schemaName") String schemaName,
127+
@Param("functionName") String functionName);
128+
129+
@ResultMap("functionPOResultMap")
130+
@SelectProvider(
131+
type = FunctionMetaSQLProviderFactory.class,
132+
method = "selectFunctionMetaBySchemaIdAndName")
133+
FunctionPO selectFunctionMetaBySchemaIdAndName(
134+
@Param("schemaId") Long schemaId, @Param("functionName") String functionName);
135+
136+
@UpdateProvider(
137+
type = FunctionMetaSQLProviderFactory.class,
138+
method = "softDeleteFunctionMetaByFunctionId")
139+
Integer softDeleteFunctionMetaByFunctionId(@Param("functionId") Long functionId);
140+
141+
@UpdateProvider(
142+
type = FunctionMetaSQLProviderFactory.class,
143+
method = "softDeleteFunctionMetasByCatalogId")
144+
Integer softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long catalogId);
145+
146+
@UpdateProvider(
147+
type = FunctionMetaSQLProviderFactory.class,
148+
method = "softDeleteFunctionMetasByMetalakeId")
149+
Integer softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
150+
151+
@UpdateProvider(
152+
type = FunctionMetaSQLProviderFactory.class,
153+
method = "softDeleteFunctionMetasBySchemaId")
154+
Integer softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long schemaId);
155+
156+
@DeleteProvider(
157+
type = FunctionMetaSQLProviderFactory.class,
158+
method = "deleteFunctionMetasByLegacyTimeline")
159+
Integer deleteFunctionMetasByLegacyTimeline(
160+
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
161+
162+
@UpdateProvider(type = FunctionMetaSQLProviderFactory.class, method = "updateFunctionMeta")
163+
Integer updateFunctionMeta(
164+
@Param("newFunctionMeta") FunctionPO newFunctionPO,
165+
@Param("oldFunctionMeta") FunctionPO oldFunctionPO);
166+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.gravitino.storage.relational.mapper;
20+
21+
import com.google.common.collect.ImmutableMap;
22+
import java.util.Map;
23+
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
24+
import org.apache.gravitino.storage.relational.mapper.provider.base.FunctionMetaBaseSQLProvider;
25+
import org.apache.gravitino.storage.relational.mapper.provider.postgresql.FunctionMetaPostgreSQLProvider;
26+
import org.apache.gravitino.storage.relational.po.FunctionPO;
27+
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
28+
import org.apache.ibatis.annotations.Param;
29+
30+
public class FunctionMetaSQLProviderFactory {
31+
32+
static class FunctionMetaMySQLProvider extends FunctionMetaBaseSQLProvider {}
33+
34+
static class FunctionMetaH2Provider extends FunctionMetaBaseSQLProvider {}
35+
36+
private static final Map<JDBCBackendType, FunctionMetaBaseSQLProvider>
37+
FUNCTION_META_SQL_PROVIDER_MAP =
38+
ImmutableMap.of(
39+
JDBCBackendType.MYSQL, new FunctionMetaMySQLProvider(),
40+
JDBCBackendType.H2, new FunctionMetaH2Provider(),
41+
JDBCBackendType.POSTGRESQL, new FunctionMetaPostgreSQLProvider());
42+
43+
public static FunctionMetaBaseSQLProvider getProvider() {
44+
String databaseId =
45+
SqlSessionFactoryHelper.getInstance()
46+
.getSqlSessionFactory()
47+
.getConfiguration()
48+
.getDatabaseId();
49+
50+
JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
51+
return FUNCTION_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
52+
}
53+
54+
public static String insertFunctionMeta(@Param("functionMeta") FunctionPO functionPO) {
55+
return getProvider().insertFunctionMeta(functionPO);
56+
}
57+
58+
public static String insertFunctionMetaOnDuplicateKeyUpdate(
59+
@Param("functionMeta") FunctionPO functionPO) {
60+
return getProvider().insertFunctionMetaOnDuplicateKeyUpdate(functionPO);
61+
}
62+
63+
public static String listFunctionPOsBySchemaId(@Param("schemaId") Long schemaId) {
64+
return getProvider().listFunctionPOsBySchemaId(schemaId);
65+
}
66+
67+
public static String listFunctionPOsByFullQualifiedName(
68+
@Param("metalakeName") String metalakeName,
69+
@Param("catalogName") String catalogName,
70+
@Param("schemaName") String schemaName) {
71+
return getProvider().listFunctionPOsByFullQualifiedName(metalakeName, catalogName, schemaName);
72+
}
73+
74+
public static String selectFunctionMetaByFullQualifiedName(
75+
@Param("metalakeName") String metalakeName,
76+
@Param("catalogName") String catalogName,
77+
@Param("schemaName") String schemaName,
78+
@Param("functionName") String functionName) {
79+
return getProvider()
80+
.selectFunctionMetaByFullQualifiedName(metalakeName, catalogName, schemaName, functionName);
81+
}
82+
83+
public static String selectFunctionMetaBySchemaIdAndName(
84+
@Param("schemaId") Long schemaId, @Param("functionName") String functionName) {
85+
return getProvider().selectFunctionMetaBySchemaIdAndName(schemaId, functionName);
86+
}
87+
88+
public static String softDeleteFunctionMetaByFunctionId(@Param("functionId") Long functionId) {
89+
return getProvider().softDeleteFunctionMetaByFunctionId(functionId);
90+
}
91+
92+
public static String softDeleteFunctionMetasByCatalogId(@Param("catalogId") Long catalogId) {
93+
return getProvider().softDeleteFunctionMetasByCatalogId(catalogId);
94+
}
95+
96+
public static String softDeleteFunctionMetasByMetalakeId(@Param("metalakeId") Long metalakeId) {
97+
return getProvider().softDeleteFunctionMetasByMetalakeId(metalakeId);
98+
}
99+
100+
public static String softDeleteFunctionMetasBySchemaId(@Param("schemaId") Long schemaId) {
101+
return getProvider().softDeleteFunctionMetasBySchemaId(schemaId);
102+
}
103+
104+
public static String deleteFunctionMetasByLegacyTimeline(
105+
@Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) {
106+
return getProvider().deleteFunctionMetasByLegacyTimeline(legacyTimeline, limit);
107+
}
108+
109+
public static String updateFunctionMeta(
110+
@Param("newFunctionMeta") FunctionPO newFunctionPO,
111+
@Param("oldFunctionMeta") FunctionPO oldFunctionPO) {
112+
return getProvider().updateFunctionMeta(newFunctionPO, oldFunctionPO);
113+
}
114+
}

0 commit comments

Comments
 (0)