Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
059b82c
Fix bypass error in `LocalFileSystemProvider`
yuqi1129 Nov 26, 2025
8222866
feat(metadata): add indexes for improved query performance on metalak…
yuqi1129 Dec 9, 2025
46972c6
fix
yuqi1129 Dec 9, 2025
7759aa0
fix
yuqi1129 Dec 9, 2025
f14fa0a
fix
yuqi1129 Dec 9, 2025
d17009a
fix sql syntax problem
yuqi1129 Dec 9, 2025
3588943
Fix bugs
yuqi1129 Dec 9, 2025
61cdf41
fix
yuqi1129 Dec 10, 2025
202d36c
fix
yuqi1129 Dec 10, 2025
61cdc4c
Merge branch 'main' of github.com:datastrato/graviton into issue_9429
yuqi1129 Dec 23, 2025
02f4c95
Merge branch 'issue_9429' of github.com:yuqi1129/gravitino into issue…
yuqi1129 Dec 23, 2025
e251d20
fix
yuqi1129 Dec 23, 2025
0b76784
Polish the code.
yuqi1129 Dec 23, 2025
0d7b2b4
fix
yuqi1129 Dec 23, 2025
3d192ec
fix
yuqi1129 Dec 24, 2025
ac9aea4
fix
yuqi1129 Dec 24, 2025
b96155a
fix
yuqi1129 Dec 24, 2025
5b72cfb
fix
yuqi1129 Dec 24, 2025
fe1b70a
fix
yuqi1129 Dec 24, 2025
8920e84
fix
yuqi1129 Dec 24, 2025
29ac6fa
fix
yuqi1129 Dec 24, 2025
afdd63b
fix
yuqi1129 Dec 24, 2025
45d50b4
fix
yuqi1129 Dec 29, 2025
bbb28cb
Fix ci problem
yuqi1129 Dec 29, 2025
415dec6
Polish code.
yuqi1129 Dec 30, 2025
9c4e6b8
fix
yuqi1129 Dec 31, 2025
7bdfa0b
Fix
yuqi1129 Jan 4, 2026
5f93a66
Merge remote-tracking branch 'me/issue_9429' into issue_9429
yuqi1129 Jan 4, 2026
d077091
revert
yuqi1129 Jan 4, 2026
6d204be
fix
yuqi1129 Jan 4, 2026
156eaf9
Merge branch 'main' into issue_9429
yuqi1129 Jan 4, 2026
c555744
fix
yuqi1129 Jan 4, 2026
3fe94dc
Merge branch 'main' of github.com:apache/gravitino into issue_9429
yuqi1129 Jan 4, 2026
b74beb0
Merge remote-tracking branch 'me/issue_9429' into issue_9429
yuqi1129 Jan 4, 2026
2cdb7b9
Merge branch 'main' into issue_9429
yuqi1129 Jan 4, 2026
9bb993d
fix
yuqi1129 Jan 4, 2026
5bcfbbf
fix
yuqi1129 Jan 4, 2026
1b78a02
fix
yuqi1129 Jan 4, 2026
5275bc7
fix
yuqi1129 Jan 4, 2026
caecad6
fix
yuqi1129 Jan 4, 2026
135ac3f
fix
yuqi1129 Jan 5, 2026
6fc53c6
fix
yuqi1129 Jan 5, 2026
3f1d272
fix
yuqi1129 Jan 5, 2026
9c1c3d8
fix
yuqi1129 Jan 5, 2026
6ea5f26
fix
yuqi1129 Jan 5, 2026
6b72177
fix
yuqi1129 Jan 5, 2026
a029a2d
fix
yuqi1129 Jan 5, 2026
885195b
fix test error
yuqi1129 Jan 7, 2026
4b7d7dd
fix
yuqi1129 Jan 8, 2026
0457b9d
Merge branch 'issue_9249' of github.com:yuqi1129/gravitino into issue…
yuqi1129 Jan 8, 2026
1e05018
Fix.
yuqi1129 Jan 8, 2026
53b39da
fix format problem
yuqi1129 Jan 8, 2026
696a1c9
Merge branch 'issue_9429' of github.com:yuqi1129/gravitino into issue…
yuqi1129 Jan 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
Expand Down Expand Up @@ -65,6 +71,19 @@ public class MetalakeManager implements MetalakeDispatcher, Closeable {

private final IdGenerator idGenerator;

private static final Cache<NameIdentifier, BaseMetalake> metalakeCache =
Caffeine.newBuilder()
.expireAfterAccess(10 * 60 * 1000 /*10 minutes*/, TimeUnit.MILLISECONDS)
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("metalake-cleaner-%d")
.build())))
.build();

@Override
public void close() {
// do nothing
Expand All @@ -84,7 +103,8 @@ public MetalakeManager(EntityStore store, IdGenerator idGenerator) {
// directly without list/get metalake first.
BaseMetalake[] baseMetalakes = listMetalakes();
for (BaseMetalake baseMetalake : baseMetalakes) {
loadMetalake(baseMetalake.nameIdentifier());
BaseMetalake newBaseMetalake = loadMetalake(baseMetalake.nameIdentifier());
metalakeCache.put(baseMetalake.nameIdentifier(), newBaseMetalake);
}
}

Expand Down Expand Up @@ -115,18 +135,24 @@ public static void checkMetalake(NameIdentifier ident, EntityStore store)
*/
public static boolean metalakeInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException {
try {
BaseMetalake metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
return (boolean)
metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);

} catch (IOException e) {
LOG.error("Failed to do store operation", e);
throw new RuntimeException(e);
}

BaseMetalake metalake =
metalakeCache.get(
ident,
key -> {
try {
return store.get(key, EntityType.METALAKE, BaseMetalake.class);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", key, e);
throw new RuntimeException(e);
} catch (IOException e) {
LOG.error("Failed to do store operation", e);
throw new RuntimeException(e);
}
});

return (boolean)
metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE);
}

/**
Expand Down Expand Up @@ -165,16 +191,21 @@ public BaseMetalake loadMetalake(NameIdentifier ident) throws NoSuchMetalakeExce
ident,
LockType.READ,
() -> {
try {
BaseMetalake baseMetalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
return newMetalakeWithResolvedProperties(baseMetalake);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
} catch (IOException ioe) {
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe);
throw new RuntimeException(ioe);
}
BaseMetalake baseMetalake =
metalakeCache.get(
ident,
key -> {
try {
return store.get(key, EntityType.METALAKE, BaseMetalake.class);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", key, e);
throw new RuntimeException(e);
} catch (IOException e) {
LOG.error("Failed to do store operation", e);
throw new RuntimeException(e);
}
});
return newMetalakeWithResolvedProperties(baseMetalake);
});
}

Expand Down Expand Up @@ -267,6 +298,8 @@ public BaseMetalake alterMetalake(NameIdentifier ident, MetalakeChange... change
"Metalake %s is not in use, please enable it first", ident);
}

metalakeCache.invalidate(ident);

return store.update(
ident,
BaseMetalake.class,
Expand Down Expand Up @@ -317,6 +350,8 @@ public boolean dropMetalake(NameIdentifier ident, boolean force)
"Metalake %s is in use, please disable it first or use force option", ident);
}

metalakeCache.invalidate(ident);

List<CatalogEntity> catalogEntities =
store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG);
if (!catalogEntities.isEmpty() && !force) {
Expand All @@ -343,6 +378,7 @@ public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
try {
boolean inUse = metalakeInUse(store, ident);
if (!inUse) {
metalakeCache.invalidate(ident);
store.update(
ident,
BaseMetalake.class,
Expand Down Expand Up @@ -377,6 +413,7 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
try {
boolean inUse = metalakeInUse(store, ident);
if (inUse) {
metalakeCache.invalidate(ident);
store.update(
ident,
BaseMetalake.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ Long selectTableIdBySchemaIdAndName(
TablePO selectTableMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name);

@SelectProvider(
type = TableMetaSQLProviderFactory.class,
method = "selectTableByFullQualifiedName")
TablePO selectTableByFullQualifiedName(
@Param("metalakeName") String metalakeName,
@Param("catalogName") String catalogName,
@Param("schemaName") String schemaName,
@Param("tableName") String tableName);

@InsertProvider(type = TableMetaSQLProviderFactory.class, method = "insertTableMeta")
void insertTableMeta(@Param("tableMeta") TablePO tablePO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ public static String selectTableMetaBySchemaIdAndName(
return getProvider().selectTableMetaBySchemaIdAndName(schemaId, name);
}

public static String selectTableByFullQualifiedName(
@Param("metalakeName") String metalakeName,
@Param("catalogName") String catalogName,
@Param("schemaName") String schemaName,
@Param("tableName") String tableName) {
return getProvider()
.selectTableByFullQualifiedName(metalakeName, catalogName, schemaName, tableName);
}

public static String selectTableMetaById(@Param("tableId") Long tableId) {
return getProvider().selectTableMetaById(tableId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;

import java.util.List;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import org.apache.gravitino.storage.relational.po.TablePO;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -230,4 +233,61 @@ public String deleteTableMetasByLegacyTimeline(
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}";
}

public String selectTableByFullQualifiedName(
@Param("metalakeName") String metalakeName,
@Param("catalogName") String catalogName,
@Param("schemaName") String schemaName,
@Param("tableName") String tableName) {
return """
SELECT
tm.table_id AS tableId,
tm.table_name AS tableName,
tm.metalake_id AS metalakeId,
tm.catalog_id AS catalogId,
tm.schema_id AS schemaId,
tm.audit_info AS auditInfo,
tm.current_version AS currentVersion,
tm.last_version AS lastVersion,
tm.deleted_at AS deletedAt,
tvi.format AS format,
tvi.properties AS properties,
tvi.partitioning AS partitions,
tvi.sort_orders AS sortOrders,
tvi.distribution AS distribution,
tvi.indexes AS indexes,
tvi.comment AS comment
FROM
%s mm
INNER JOIN
%s cm ON mm.metalake_id = cm.metalake_id
INNER JOIN
%s sm ON cm.catalog_id = sm.catalog_id
INNER JOIN
%s tm ON sm.schema_id = tm.schema_id
LEFT JOIN
%s tvi ON tm.table_id = tvi.table_id
AND tm.current_version = tvi.version
AND tvi.deleted_at = 0
WHERE
mm.metalake_name = '%s'
AND mm.deleted_at = 0
AND cm.catalog_name = '%s'
AND cm.deleted_at = 0
AND sm.schema_name = '%s'
AND sm.deleted_at = 0
AND tm.table_name = '%s'
AND tm.deleted_at = 0;
"""
.formatted(
MetalakeMetaMapper.TABLE_NAME,
CatalogMetaMapper.TABLE_NAME,
SchemaMetaMapper.TABLE_NAME,
TABLE_NAME,
TableVersionMapper.TABLE_NAME,
metalakeName,
catalogName,
schemaName,
tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ public Long getTableIdBySchemaIdAndName(Long schemaId, String tableName) {
public TableEntity getTableByIdentifier(NameIdentifier identifier) {
NameIdentifierUtil.checkTable(identifier);

Long schemaId =
EntityIdService.getEntityId(
NameIdentifier.of(identifier.namespace().levels()), Entity.EntityType.SCHEMA);
String[] namespaceLevels = identifier.namespace().levels();
String metalakeName = namespaceLevels[0];
String catalogName = namespaceLevels[1];
String schemaName = namespaceLevels[2];
String tableName = identifier.name();
TablePO tablePO = getTableByFullQualifiedName(metalakeName, catalogName, schemaName, tableName);

TablePO tablePO = getTablePOBySchemaIdAndName(schemaId, identifier.name());
List<ColumnPO> columnPOs =
TableColumnMetaService.getInstance()
.getColumnsByTableIdAndVersion(tablePO.getTableId(), tablePO.getCurrentVersion());
Expand Down Expand Up @@ -342,4 +344,21 @@ private TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) {
}
return tablePO;
}

private TablePO getTableByFullQualifiedName(
String metalakeName, String catalogName, String schemaName, String tableName) {
TablePO tablePO =
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper ->
mapper.selectTableByFullQualifiedName(
metalakeName, catalogName, schemaName, tableName));
if (tablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.TABLE.name().toLowerCase(),
tableName);
}
return tablePO;
}
}
4 changes: 4 additions & 0 deletions scripts/h2/schema-1.1.0-h2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'metalake deleted at',
PRIMARY KEY (metalake_id),
KEY idx_name_da (metalake_name, deleted_at),
CONSTRAINT uk_mn_del UNIQUE (metalake_name, deleted_at)
) ENGINE = InnoDB;

Expand All @@ -45,6 +46,7 @@ CREATE TABLE IF NOT EXISTS `catalog_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'catalog last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'catalog deleted at',
PRIMARY KEY (catalog_id),
KEY idx_name_da (catalog_name, deleted_at),
CONSTRAINT uk_mid_cn_del UNIQUE (metalake_id, catalog_name, deleted_at)
) ENGINE=InnoDB;

Expand All @@ -61,6 +63,7 @@ CREATE TABLE IF NOT EXISTS `schema_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'schema last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'schema deleted at',
PRIMARY KEY (schema_id),
KEY idx_name_da (catalog_name, deleted_at),
CONSTRAINT uk_cid_sn_del UNIQUE (catalog_id, schema_name, deleted_at),
-- Aliases are used here, and indexes with the same name in H2 can only be created once.
KEY idx_smid (metalake_id)
Expand All @@ -77,6 +80,7 @@ CREATE TABLE IF NOT EXISTS `table_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'table last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'table deleted at',
PRIMARY KEY (table_id),
KEY idx_name_da (table_name, deleted_at),
CONSTRAINT uk_sid_tn_del UNIQUE (schema_id, table_name, deleted_at),
-- Aliases are used here, and indexes with the same name in H2 can only be created once.
KEY idx_tmid (metalake_id),
Expand Down
6 changes: 6 additions & 0 deletions scripts/h2/upgrade-1.0.0-to-1.1.0-h2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
`deleted_at` BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion timestamp, 0 means not deleted',
UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`, `deleted_at`)
) ENGINE=InnoDB COMMENT 'table detail information including format, location, properties, partition, distribution, sort order, index and so on';

alter table `metalake_meta` add index idx_name_da (metalake_name, deleted_at);
alter table `catalog_meta` add index idx_name_da (catalog_name, deleted_at);
alter table `schema_meta` add index idx_name_da (schema_name, deleted_at);
alter table `table_meta` add index idx_name_da (table_name, deleted_at);

4 changes: 4 additions & 0 deletions scripts/mysql/schema-1.1.0-mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'metalake deleted at',
PRIMARY KEY (`metalake_id`),
KEY idx_name_da (metalake_name, deleted_at),
UNIQUE KEY `uk_mn_del` (`metalake_name`, `deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'metalake metadata';

Expand All @@ -44,6 +45,7 @@ CREATE TABLE IF NOT EXISTS `catalog_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'catalog last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'catalog deleted at',
PRIMARY KEY (`catalog_id`),
KEY idx_name_da (catalog_name, deleted_at),
UNIQUE KEY `uk_mid_cn_del` (`metalake_id`, `catalog_name`, `deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'catalog metadata';

Expand All @@ -59,6 +61,7 @@ CREATE TABLE IF NOT EXISTS `schema_meta` (
`last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'schema last version',
`deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'schema deleted at',
PRIMARY KEY (`schema_id`),
KEY idx_name_da (schema_name, deleted_at),
UNIQUE KEY `uk_cid_sn_del` (`catalog_id`, `schema_name`, `deleted_at`),
KEY `idx_mid` (`metalake_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'schema metadata';
Expand All @@ -77,6 +80,7 @@ CREATE TABLE IF NOT EXISTS `table_meta` (
UNIQUE KEY `uk_sid_tn_del` (`schema_id`, `table_name`, `deleted_at`),
KEY `idx_mid` (`metalake_id`),
KEY `idx_cid` (`catalog_id`)
KEY idx_name_da (table_name, deleted_at),
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table metadata';

CREATE TABLE IF NOT EXISTS `table_column_version_info` (
Expand Down
5 changes: 5 additions & 0 deletions scripts/mysql/upgrade-1.0.0-to-1.1.0-mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ CREATE TABLE IF NOT EXISTS `table_version_info` (
`deleted_at` BIGINT(20) UNSIGNED DEFAULT 0 COMMENT 'table deletion timestamp, 0 means not deleted',
UNIQUE KEY `uk_table_id_version_deleted_at` (`table_id`, `version`, `deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table detail information including format, location, properties, partition, distribution, sort order, index and so on';

alter table `metalake_meta` add index idx_name_da (metalake_name, deleted_at);
alter table `catalog_meta` add index idx_name_da (catalog_name, deleted_at);
alter table `schema_meta` add index idx_name_da (schema_name, deleted_at);
alter table `table_meta` add index idx_name_da (table_name, deleted_at);
Loading
Loading