diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..5c3be3250 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,85 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +DataHub GMA (General Metadata Architecture) is the backend framework for LinkedIn's DataHub metadata search & discovery +platform. It provides type-safe, schema-driven metadata management with event-driven data consistency across multiple +storage backends (SQL via Ebean, Elasticsearch for search). Built on LinkedIn's Pegasus data framework and Rest.li for +REST APIs. + +**Java version**: 1.8 (Java 8) — required, newer versions will cause build failures. + +## Build Commands + +```bash +./gradlew build # Full build + all tests +./gradlew :module-name:build # Build a specific module (e.g., :dao-api, :dao-impl:ebean-dao) +./gradlew :module-name:test # Run tests for a specific module +./gradlew spotlessCheck # Check code formatting (CI enforced) +./gradlew spotlessApply # Auto-fix formatting +./gradlew checkstyleMain # Run checkstyle on main sources +./gradlew idea # Generate IntelliJ project files +``` + +Tests use **TestNG** (not JUnit) as the default test framework across all subprojects. The elasticsearch integration +tests require Docker. + +**Apple Silicon (M-series Mac)**: Requires `brew install mariadb` and uncommenting three lines in +`EmbeddedMariaInstance.java` (see `docs/developers.md`). + +## Module Architecture + +``` +core-models/ → Pegasus PDL schemas: Urn, AuditStamp, Url, Time (no Java logic) +core-models-utils/ → URN utility helpers +dao-api/ → DAO abstractions (BaseLocalDAO, BaseSearchDAO, BaseBrowseDAO), + event producers, query utilities, retention policies +dao-impl/ + ebean-dao/ → SQL storage via Ebean ORM (EbeanLocalDAO, relationship queries) + elasticsearch-dao/ → ES 5.x/6.x search implementation + elasticsearch-dao-7/→ ES 7.x search implementation +restli-resources/ → Rest.li resource base classes (BaseEntityResource, + BaseSearchableEntityResource) mapping DAOs to REST endpoints +validators/ → Schema validators ensuring PDL models conform to GMA conventions + (AspectValidator, EntityValidator, SnapshotValidator, etc.) +gradle-plugins/ → Annotation parsing (@gma) and code generation for metadata events +testing/ → Test infrastructure, ES integration test harness, test models +``` + +## Key Architectural Patterns + +- **Urn (Universal Resource Name)**: `urn:li:entityType:entityKey` — the universal identifier for all entities. Typed + URN subclasses provide entity-specific keys. +- **Aspect Union Pattern**: Each entity type defines a Pegasus union of its supported aspects. Validators enforce that + union members are record types only. +- **Aspect Versioning**: Version 0 = latest. Each aspect write creates a new immutable version. Retention policies + (indefinite, time-based, version-based) control history. +- **Layered Storage**: BaseLocalDAO (SQL, source of truth) → BaseSearchDAO (Elasticsearch, derived index) → + BaseBrowseDAO (hierarchical navigation). BaseRemoteDAO proxies to other GMS instances. +- **Event Sourcing**: Writes to LocalDAO trigger MCE/MAE event emission via BaseMetadataEventProducer. The + `gradle-plugins` auto-generate event PDL schemas from `@gma` annotations on aspect PDL files. +- **Generic Type Binding**: DAOs are heavily parameterized with generics (``) and validate type + constraints at construction time using reflection via `ModelUtils`. + +## Pegasus/Rest.li Data Models + +PDL (Pegasus Data Language) schemas live in `src/main/pegasus/` directories and compile to Java `RecordTemplate` +classes. Key namespaces: + +- `com.linkedin.common.*` — Core types (Urn, AuditStamp) +- `com.linkedin.metadata.aspect.*` — Aspect wrappers +- `com.linkedin.metadata.query.*` — Search/filter structures +- `com.linkedin.metadata.events.*` — Change tracking types +- `com.linkedin.metadata.snapshot.*` — Entity snapshots (versioned aspect collections) + +When modifying PDL schemas, the Pegasus gradle plugin regenerates Java bindings automatically during build. + +## Commit Convention + +Follow [Conventional Commits](https://www.conventionalcommits.org/): `(scope): description` + +Types: `feat`, `fix`, `refactor`, `docs`, `test`, `perf`, `style`, `build`, `ci` + +Max line length: 88 characters. Use imperative present tense, no capitalized first letter, no trailing dot. diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java b/dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java new file mode 100644 index 000000000..6dbf46a74 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java @@ -0,0 +1,27 @@ +package com.linkedin.metadata.dao; + +import java.sql.Timestamp; +import java.util.Map; +import lombok.Builder; +import lombok.Value; + + +/** + * A value class that holds deletion-relevant fields for a single entity, used by batch deletion validation. + * Contains status flags for deletion eligibility checks and all aspect column values for Kafka archival. + */ +@Value +@Builder +public class EntityDeletionInfo { + + Timestamp deletedTs; + + boolean statusRemoved; + + String statusLastModifiedOn; + + /** + * All aspect column values (column name → raw JSON string) for Kafka archival by the service layer. + */ + Map aspectColumns; +} diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 73acd122e..6b9a312b8 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -72,6 +72,8 @@ public class EbeanLocalAccess implements IEbeanLocalAccess // TODO confirm if the default page size is 1000 in other code context. private static final int DEFAULT_PAGE_SIZE = 1000; + private static final int MAX_BATCH_DELETE_SIZE = 2000; + private static final String STATUS_ASPECT_FQCN = "com.linkedin.common.Status"; private static final String ASPECT_JSON_PLACEHOLDER = "__PLACEHOLDER__"; private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN"; private static final String EBEAN_SERVER_CONFIG = "EbeanServerConfig"; @@ -344,6 +346,58 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { return _server.createSqlUpdate(deleteSqlStatement).execute(); } + @Override + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + if (urns.isEmpty()) { + return Collections.emptyMap(); + } + if (urns.size() > MAX_BATCH_DELETE_SIZE) { + throw new IllegalArgumentException( + String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE)); + } + + final String statusColumnName = getStatusColumnName(); + final Urn firstUrn = urns.get(0); + final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); + final List columns = getDeletionInfoColumns(tableName); + final String sql = SQLStatementUtils.createReadDeletionInfoByUrnsSql(urns, columns, isTestMode); + return EBeanDAOUtils.convertSqlRowsToEntityDeletionInfoMap( + _server.createSqlQuery(sql).findList(), _urnClass, statusColumnName); + } + + /** + * Returns the column list needed for batch deletion info: urn, deleted_ts, and all aspect columns (a_*). + * Excludes index columns (i_*) and other derived columns to reduce data transfer. + */ + private List getDeletionInfoColumns(@Nonnull String tableName) { + return validator.getColumns(tableName).stream() + .filter(c -> c.equals("urn") || c.equals("deleted_ts") || c.startsWith(ASPECT_PREFIX)) + .collect(Collectors.toList()); + } + + /** + * Resolves the entity table column name for the Status aspect via {@link SQLSchemaUtils}. + */ + private String getStatusColumnName() { + return getAspectColumnName(_entityType, STATUS_ASPECT_FQCN); + } + + @Override + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + if (urns.isEmpty()) { + return 0; + } + if (urns.size() > MAX_BATCH_DELETE_SIZE) { + throw new IllegalArgumentException( + String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE)); + } + + final String statusColumnName = getStatusColumnName(); + final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, statusColumnName, + isTestMode); + return _server.createSqlUpdate(sql).execute(); + } + @Override public List listUrns(@Nullable IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 389c4c809..61e5ff091 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -750,6 +750,20 @@ protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) { return _localAccess.softDeleteAsset(urn, isTestMode); } + /** + * Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}. + */ + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + return _localAccess.readDeletionInfoBatch(urns, isTestMode); + } + + /** + * Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}. + */ + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode); + } + @Override public void updateEntityTables(@Nonnull URN urn, @Nonnull Class aspectClass) { if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 67532ced0..d807522f4 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -97,6 +97,30 @@ List batchGetUnion(@Nonnull */ int softDeleteAsset(@Nonnull URN urn, boolean isTestMode); + /** + * Read deletion-relevant fields for a batch of URNs in a single SELECT. + * Returns deletion-relevant fields for validation and all aspect columns for Kafka archival. + * URNs not found in the database will not have entries in the returned map. + * + * @param urns list of URNs to check + * @param isTestMode whether to use test schema + * @return map of URN to {@link EntityDeletionInfo} + */ + Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode); + + /** + * Batch soft-delete entities by setting deleted_ts = NOW() for URNs that meet all deletion criteria. + * The UPDATE includes guard clauses (deleted_ts IS NULL, Status.removed = true, lastmodifiedon < cutoff) + * as defense-in-depth against race conditions. The Status aspect column name is resolved internally via + * {@link com.linkedin.metadata.dao.utils.SQLSchemaUtils#getAspectColumnName}. + * + * @param urns list of URNs to soft-delete + * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp + * @param isTestMode whether to use test schema + * @return number of rows actually soft-deleted + */ + int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode); + /** * Returns list of urns that satisfy the given filter conditions. * diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java index 518b6be10..d9bda4898 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java @@ -99,6 +99,18 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { return instrument("softDeleteAsset", () -> _delegate.softDeleteAsset(urn, isTestMode)); } + @Override + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + return instrument("readDeletionInfoBatch.urns_" + urns.size(), + () -> _delegate.readDeletionInfoBatch(urns, isTestMode)); + } + + @Override + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + return instrument("batchSoftDeleteAssets.urns_" + urns.size(), + () -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode)); + } + @Override public List listUrns(@Nullable IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java index abb40335b..fdf84106f 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.dao.utils; import com.linkedin.common.urn.Urn; +import com.linkedin.data.DataMap; import com.linkedin.data.schema.RecordDataSchema; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; @@ -12,6 +13,7 @@ import com.linkedin.metadata.aspect.AuditedAspect; import com.linkedin.metadata.aspect.SoftDeletedAspect; import com.linkedin.metadata.dao.EbeanMetadataAspect; +import com.linkedin.metadata.dao.EntityDeletionInfo; import com.linkedin.metadata.dao.ListResult; import com.linkedin.metadata.query.AspectField; import com.linkedin.metadata.query.Condition; @@ -257,6 +259,73 @@ public static List readSqlR }).collect(Collectors.toList()); } + /** + * Parse a list of {@link SqlRow} results from an entity table into a map of + * URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and the Status aspect column. + * Rows that cannot be parsed as a valid URN are skipped with a warning. + * + * @param sqlRows list of {@link SqlRow} from entity table query + * @param urnClass URN class for deserialization + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") + * @param URN type + * @return map of URN to {@link EntityDeletionInfo} + */ + public static Map convertSqlRowsToEntityDeletionInfoMap( + @Nonnull List sqlRows, @Nonnull Class urnClass, @Nonnull String statusColumnName) { + final Map result = new HashMap<>(); + for (SqlRow row : sqlRows) { + final String urnStr = row.getString("urn"); + try { + result.put(getUrn(urnStr, urnClass), toEntityDeletionInfo(row, statusColumnName)); + } catch (IllegalArgumentException e) { + log.warn("Failed to parse URN string: {}, skipping row", urnStr, e); + } + } + return result; + } + + /** + * Parse a single {@link SqlRow} from an entity table query into an {@link EntityDeletionInfo}. + * Extracts deletion eligibility fields from the Status aspect column (statusRemoved, statusLastModifiedOn) + * and collects all aspect columns for Kafka archival. + * + * @param row {@link SqlRow} from entity table query + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") + * @return {@link EntityDeletionInfo} + */ + @Nonnull + static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row, @Nonnull String statusColumnName) { + // Collect all aspect columns (a_* prefixed, non-null), same pattern as readSqlRows() + final Map aspectColumnValues = new HashMap<>(); + for (String key : row.keySet()) { + if (key.startsWith(SQLSchemaUtils.ASPECT_PREFIX) && row.get(key) != null) { + aspectColumnValues.put(key, row.getString(key)); + } + } + + boolean statusRemoved = false; + String statusLastModifiedOn = null; + final String statusJson = row.getString(statusColumnName); + if (statusJson != null) { + final DataMap statusData = RecordUtils.toDataMap(statusJson); + final Object lastModObj = statusData.get("lastmodifiedon"); + if (lastModObj != null) { + statusLastModifiedOn = lastModObj.toString(); + } + final Object aspectObj = statusData.get("aspect"); + if (aspectObj instanceof DataMap) { + statusRemoved = Boolean.TRUE.equals(((DataMap) aspectObj).get("removed")); + } + } + + return EntityDeletionInfo.builder() + .deletedTs(row.getTimestamp("deleted_ts")) + .statusRemoved(statusRemoved) + .statusLastModifiedOn(statusLastModifiedOn) + .aspectColumns(aspectColumnValues) + .build(); + } + /** * Read EbeanMetadataAspect from {@link SqlRow}. * @param sqlRow {@link SqlRow} diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 45cc8b32d..c8877c25e 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -44,6 +44,8 @@ public class SQLStatementUtils { .addEscape('\'', "''") .addEscape('\\', "\\\\").build(); + private static final String TIMESTAMP_FORMAT_PATTERN = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}"; + public static final String SOFT_DELETED_CHECK = "JSON_EXTRACT(%s, '$.gma_deleted') IS NULL"; // true when not soft deleted public static final String DELETED_TS_IS_NULL_CHECK = "deleted_ts IS NULL"; // true when the deleted_ts is NULL, meaning the record is not soft deleted @@ -83,6 +85,15 @@ public class SQLStatementUtils { public static final String SQL_INSERT_ASSET_VALUES = "VALUES (:urn, :lastmodifiedon, :lastmodifiedby,"; // Delete prefix of the sql statement for deleting from metadata_aspect table public static final String SQL_SOFT_DELETE_ASSET_WITH_URN = "UPDATE %s SET deleted_ts = NOW() WHERE urn = '%s';"; + + private static final String SQL_READ_COLUMNS_BY_URNS_TEMPLATE = "SELECT %s FROM %s WHERE urn IN (%s)"; + + private static final String SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE = + "UPDATE %s SET deleted_ts = NOW()" + + " WHERE urn IN (%s)" + + " AND deleted_ts IS NULL" + + " AND JSON_EXTRACT(%s, '$.aspect.removed') = true" + + " AND JSON_EXTRACT(%s, '$.lastmodifiedon') < '%s'"; // closing bracket for the sql statement INSERT prefix // e.g. INSERT INTO metadata_aspect (urn, a_urn, lastmodifiedon, lastmodifiedby) public static final String CLOSING_BRACKET = ") "; @@ -302,6 +313,53 @@ public static String createSoftDeleteAssetSql(@N return String.format(SQL_SOFT_DELETE_ASSET_WITH_URN, tableName, urn); } + /** + * Create SELECT SQL statement for reading deletion-relevant columns for a batch of URNs. + * Selects only urn, deleted_ts, and aspect columns (a_* prefix), excluding index columns (i_*) + * and other derived columns to reduce data transfer. + * + * @param urns list of URNs to read + * @param columns list of column names to select + * @param isTestMode whether the test mode is enabled or not + * @return select columns sql + */ + public static String createReadDeletionInfoByUrnsSql(@Nonnull List urns, + @Nonnull List columns, boolean isTestMode) { + final Urn firstUrn = urns.get(0); + final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); + final String urnList = urns.stream() + .map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'") + .collect(Collectors.joining(", ")); + final String columnList = String.join(", ", columns); + return String.format(SQL_READ_COLUMNS_BY_URNS_TEMPLATE, columnList, tableName, urnList); + } + + /** + * Create batch soft-delete SQL statement with guard clauses for defense-in-depth. + * The UPDATE includes conditions (deleted_ts IS NULL, Status.removed = true, lastmodifiedon < cutoff) + * to protect against race conditions between validation SELECT and this UPDATE. + * + * @param urns list of URNs to soft-delete + * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") + * @param isTestMode whether the test mode is enabled or not + * @return batch soft-delete sql + */ + public static String createBatchSoftDeleteAssetSql(@Nonnull List urns, + @Nonnull String cutoffTimestamp, @Nonnull String statusColumnName, boolean isTestMode) { + if (!cutoffTimestamp.matches(TIMESTAMP_FORMAT_PATTERN)) { + throw new IllegalArgumentException( + "cutoffTimestamp must be in yyyy-MM-dd HH:mm:ss.SSS format, got: " + cutoffTimestamp); + } + final Urn firstUrn = urns.get(0); + final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); + final String urnList = urns.stream() + .map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'") + .collect(Collectors.joining(", ")); + return String.format(SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE, tableName, urnList, statusColumnName, + statusColumnName, cutoffTimestamp); + } + /** * Create Update with optimistic locking SQL statement. The SQL UPDATE use old_timestamp as a compareAndSet to check * if the current update is made on an unchange record. For example: UPDATE table WHERE modifiedon = :oldTimestamp. diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java index 2a7aacaa2..83d4012fb 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java @@ -98,6 +98,21 @@ public boolean columnExists(@Nonnull String tableName, @Nonnull String columnNam return columns.contains(lowerColumn); } + /** + * Returns all column names (lowercase) for the given table, using the cache. + * + * @param tableName Table name + * @return set of lowercase column names + */ + @Nonnull + public Set getColumns(@Nonnull String tableName) { + String lowerTable = tableName.toLowerCase(); + return columnCache.get(lowerTable, tbl -> { + log.info("Refreshing column cache for table '{}'", tbl); + return loadColumns(tbl); + }); + } + /** * Checks whether the given index exists in the specified table. * diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 345960305..ecf40f924 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -20,6 +20,7 @@ import com.linkedin.testing.AspectBar; import com.linkedin.testing.AspectBaz; import com.linkedin.testing.AspectFoo; +import com.linkedin.testing.FooAsset; import com.linkedin.testing.urn.BurgerUrn; import com.linkedin.testing.urn.FooUrn; import io.ebean.Ebean; @@ -72,6 +73,7 @@ public static Object[][] inputList() { @BeforeClass public void init() { + GlobalAssetRegistry.register(FooUrn.ENTITY_TYPE, FooAsset.class); _server = EmbeddedMariaInstance.getServer(EbeanLocalAccessTest.class.getSimpleName()); _ebeanLocalAccessFoo = new EbeanLocalAccess<>(_server, EmbeddedMariaInstance.SERVER_CONFIG_MAP.get(_server.getName()), FooUrn.class, new FooUrnPathExtractor(), _ebeanConfig.isNonDollarVirtualColumnsEnabled()); @@ -520,4 +522,222 @@ public void testDeleteAll() { int numRowsDeleted = _ebeanLocalAccessFoo.softDeleteAsset(fooUrn, false); assertEquals(numRowsDeleted, 1); } -} \ No newline at end of file + + // ==================== readDeletionInfoBatch tests ==================== + + /** + * Helper to insert a row into metadata_entity_foo with specific a_status JSON via raw SQL. + * Uses URN IDs starting at 500+ to avoid collisions with the 0-99 range from setupTest(). + */ + private void insertFooEntityWithStatus(int id, String statusJson, String deletedTs) { + String urn = "urn:li:foo:" + id; + String deletedTsClause = deletedTs != null ? "'" + deletedTs + "'" : "NULL"; + String statusClause = statusJson != null ? "'" + statusJson + "'" : "NULL"; + String sql = String.format( + "INSERT INTO metadata_entity_foo (urn, lastmodifiedon, lastmodifiedby, a_status, deleted_ts) " + + "VALUES ('%s', NOW(), 'testActor', %s, %s) " + + "ON DUPLICATE KEY UPDATE a_status = %s, deleted_ts = %s", + urn, statusClause, deletedTsClause, statusClause, deletedTsClause); + _server.createSqlUpdate(sql).execute(); + } + + private static String makeStatusJson(boolean removed, String lastModifiedOn) { + return String.format("{\"aspect\":{\"removed\":%s},\"lastmodifiedon\":\"%s\",\"lastmodifiedby\":\"urn:li:corpuser:testActor\"}", + removed, lastModifiedOn); + } + + @Test + public void testReadDeletionInfoBatchHappyPath() { + // Given: 3 URNs with known status + String oldTimestamp = "2025-01-01 00:00:00.000"; + insertFooEntityWithStatus(500, makeStatusJson(true, oldTimestamp), null); + insertFooEntityWithStatus(501, makeStatusJson(false, oldTimestamp), null); + insertFooEntityWithStatus(502, makeStatusJson(true, oldTimestamp), null); + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(500)); + urns.add(makeFooUrn(501)); + urns.add(makeFooUrn(502)); + + // When + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + + // Then: all 3 returned with correct fields + assertEquals(result.size(), 3); + + EntityDeletionInfo info500 = result.get(makeFooUrn(500)); + assertNotNull(info500); + assertNull(info500.getDeletedTs()); + assertTrue(info500.isStatusRemoved()); + assertNotNull(info500.getAspectColumns()); + assertTrue(info500.getAspectColumns().containsKey("a_status")); + + EntityDeletionInfo info501 = result.get(makeFooUrn(501)); + assertNotNull(info501); + assertFalse(info501.isStatusRemoved()); + } + + @Test + public void testReadDeletionInfoBatchEmptyList() { + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), false); + assertTrue(result.isEmpty()); + } + + @Test + public void testReadDeletionInfoBatchNonExistentUrns() { + List urns = Collections.singletonList(makeFooUrn(9998)); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + // Non-existent URNs are simply absent from the map + assertFalse(result.containsKey(makeFooUrn(9998))); + } + + @Test + public void testReadDeletionInfoBatchMixedExistAndNonExist() { + String statusJson = makeStatusJson(true, "2025-01-01 00:00:00.000"); + insertFooEntityWithStatus(510, statusJson, null); + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(510)); // exists + urns.add(makeFooUrn(9997)); // does not exist + + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + + assertEquals(result.size(), 1); + assertTrue(result.containsKey(makeFooUrn(510))); + assertFalse(result.containsKey(makeFooUrn(9997))); + } + + @Test + public void testReadDeletionInfoBatchAlreadySoftDeleted() { + String statusJson = makeStatusJson(true, "2025-01-01 00:00:00.000"); + insertFooEntityWithStatus(520, statusJson, "2025-06-01 00:00:00.000"); + + List urns = Collections.singletonList(makeFooUrn(520)); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + + assertEquals(result.size(), 1); + EntityDeletionInfo info = result.get(makeFooUrn(520)); + assertNotNull(info); + assertNotNull(info.getDeletedTs()); + } + + // ==================== batchSoftDeleteAssets tests ==================== + + @Test + public void testBatchSoftDeleteAssetsHappyPath() { + // Given: URNs with Status.removed=true and old lastmodifiedon + String oldTimestamp = "2025-01-01 00:00:00.000"; + insertFooEntityWithStatus(600, makeStatusJson(true, oldTimestamp), null); + insertFooEntityWithStatus(601, makeStatusJson(true, oldTimestamp), null); + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(600)); + urns.add(makeFooUrn(601)); + + // Cutoff is after the lastmodifiedon, so these should be eligible + String cutoffTimestamp = "2026-01-01 00:00:00.000"; + + // When + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, cutoffTimestamp, false); + + // Then: both rows soft-deleted + assertEquals(rowsAffected, 2); + + // Verify deleted_ts is set in DB + SqlRow row600 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:600'").findOne(); + assertNotNull(row600.getTimestamp("deleted_ts")); + SqlRow row601 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:601'").findOne(); + assertNotNull(row601.getTimestamp("deleted_ts")); + } + + @Test + public void testBatchSoftDeleteAssetsEmptyList() { + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", false); + assertEquals(rowsAffected, 0); + } + + @Test + public void testBatchSoftDeleteAssetsGuardsStatusNotRemoved() { + // Given: URN with Status.removed=false + insertFooEntityWithStatus(610, makeStatusJson(false, "2025-01-01 00:00:00.000"), null); + + List urns = Collections.singletonList(makeFooUrn(610)); + + // When: cutoff is in the future (would pass retention check) + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + + // Then: guard clause prevents deletion + assertEquals(rowsAffected, 0); + SqlRow row = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:610'").findOne(); + assertNull(row.getTimestamp("deleted_ts")); + } + + @Test + public void testBatchSoftDeleteAssetsGuardsRetentionNotMet() { + // Given: URN with Status.removed=true but recent lastmodifiedon + String recentTimestamp = "2026-03-01 00:00:00.000"; + insertFooEntityWithStatus(620, makeStatusJson(true, recentTimestamp), null); + + List urns = Collections.singletonList(makeFooUrn(620)); + + // When: cutoff is BEFORE the lastmodifiedon (retention window not met) + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-02-01 00:00:00.000", false); + + // Then: guard clause prevents deletion + assertEquals(rowsAffected, 0); + } + + @Test + public void testBatchSoftDeleteAssetsGuardsAlreadyDeleted() { + // Given: URN already soft-deleted + insertFooEntityWithStatus(630, makeStatusJson(true, "2025-01-01 00:00:00.000"), "2025-06-01 00:00:00.000"); + + List urns = Collections.singletonList(makeFooUrn(630)); + + // When + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + + // Then: guard clause prevents re-deletion + assertEquals(rowsAffected, 0); + } + + @Test + public void testBatchSoftDeleteAssetsMixedEligibility() { + // Given: mix of eligible and ineligible URNs + String oldTimestamp = "2025-01-01 00:00:00.000"; + insertFooEntityWithStatus(640, makeStatusJson(true, oldTimestamp), null); // eligible + insertFooEntityWithStatus(641, makeStatusJson(false, oldTimestamp), null); // ineligible: not removed + insertFooEntityWithStatus(642, makeStatusJson(true, oldTimestamp), null); // eligible + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(640)); + urns.add(makeFooUrn(641)); + urns.add(makeFooUrn(642)); + + // When + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + + // Then: only 2 eligible rows deleted + assertEquals(rowsAffected, 2); + + // Verify: 640 and 642 deleted, 641 not + SqlRow row640 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:640'").findOne(); + assertNotNull(row640.getTimestamp("deleted_ts")); + SqlRow row641 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:641'").findOne(); + assertNull(row641.getTimestamp("deleted_ts")); + SqlRow row642 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:642'").findOne(); + assertNotNull(row642.getTimestamp("deleted_ts")); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBatchSoftDeleteAssetsRejectsInvalidTimestampFormat() { + List urns = Collections.singletonList(makeFooUrn(0)); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01T00:00:00Z", false); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBatchSoftDeleteAssetsRejectsSqlInjection() { + List urns = Collections.singletonList(makeFooUrn(0)); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", false); + } +} diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java index 57b115a64..842bc5441 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java @@ -215,4 +215,31 @@ public void testSetUrnPathExtractorDelegates() { _instrumented.setUrnPathExtractor(null); verify(_mockDelegate).setUrnPathExtractor(null); } + + @Test + public void testReadDeletionInfoBatchDelegatesAndRecordsLatency() { + Map expected = new HashMap<>(); + when(_mockDelegate.readDeletionInfoBatch(any(), anyBoolean())).thenReturn(expected); + + List urns = Collections.singletonList(null); + Map result = _instrumented.readDeletionInfoBatch(urns, false); + + assertSame(result, expected); + verify(_mockDelegate).readDeletionInfoBatch(urns, false); + verify(_mockMetrics).recordOperationLatency(eq("readDeletionInfoBatch.urns_1"), eq("test"), anyLong()); + verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); + } + + @Test + public void testBatchSoftDeleteAssetsDelegatesAndRecordsLatency() { + when(_mockDelegate.batchSoftDeleteAssets(any(), any(), anyBoolean())).thenReturn(5); + + List urns = Collections.singletonList(null); + int result = _instrumented.batchSoftDeleteAssets(urns, "2026-01-01", false); + + assertEquals(result, 5); + verify(_mockDelegate).batchSoftDeleteAssets(urns, "2026-01-01", false); + verify(_mockMetrics).recordOperationLatency(eq("batchSoftDeleteAssets.urns_1"), eq("test"), anyLong()); + verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); + } } diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql index a8761aa9d..6ecbfcdb2 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql @@ -73,6 +73,9 @@ ALTER TABLE metadata_entity_bar ADD a_urn JSON; ALTER TABLE metadata_entity_foo ADD COLUMN i_urn0fooId VARCHAR(255) GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(a_urn, '$."\\\/fooId"'))); +-- add status aspect to foo entity (used by batch delete tests) +ALTER TABLE metadata_entity_foo ADD a_status JSON; + -- add foo aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectfoo JSON; diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql index 55042f2da..c5c6ab7e0 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql @@ -73,6 +73,9 @@ ALTER TABLE metadata_entity_bar ADD a_urn JSON; ALTER TABLE metadata_entity_foo ADD COLUMN i_urn$fooId VARCHAR(255) GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(a_urn, '$."\\\/fooId"'))); +-- add status aspect to foo entity (used by batch delete tests) +ALTER TABLE metadata_entity_foo ADD a_status JSON; + -- add foo aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectfoo JSON; diff --git a/spec/batch_delete_dao_changes.md b/spec/batch_delete_dao_changes.md new file mode 100644 index 000000000..7fda662fa --- /dev/null +++ b/spec/batch_delete_dao_changes.md @@ -0,0 +1,115 @@ +# Batch Deletion DAO Support + +**PR scope:** datahub-gma only **Project:** META-23501 — Metadata Graph Stale Metadata Cleanup Phase 2 + +--- + +## Why + +Stale metadata cleanup jobs need to soft-delete entities in bulk. The existing `softDeleteAsset()` method operates on a +single URN. This change adds two new DAO operations that work on a batch of URNs in exactly two DB round-trips, +regardless of batch size. + +The DAO layer is intentionally kept simple: pure SQL, no business logic, no Kafka. The consuming service +(`metadata-graph-assets`) handles validation, Kafka archival, and per-URN result reporting. + +--- + +## What Was Added + +### `EntityDeletionInfo` (new, `dao-api`) + +An immutable value object returned by the batch read operation. Contains the fields a caller needs to determine whether +each entity is eligible for deletion: + +- `deletedTs` — whether the entity is already soft-deleted +- `statusRemoved` — whether the entity's Status aspect has `removed = true` +- `statusLastModifiedOn` — when the Status was last changed (for retention window checks) +- `aspectColumns` — all aspect column values as raw JSON strings, for use by the service layer's Kafka archival step + +Presence in the returned map means the entity exists. Absence means it was not found. + +### Two new `IEbeanLocalAccess` methods + +**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single SELECT (explicit column +list: `urn`, `deleted_ts`, and all `a_*` aspect columns — index columns are excluded). The Status aspect column name is +resolved internally via `SQLSchemaUtils.getAspectColumnName()`. Returns a map of URN → `EntityDeletionInfo` for all URNs +found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not found." + +**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The Status aspect column name is +resolved internally, and is used in the WHERE clause guard conditions (not already deleted, Status.removed = true, +lastmodifiedon before cutoff) as defense-in-depth against race conditions between the SELECT and UPDATE. + +### Layered implementation (no logic in `EbeanLocalAccess`) + +- **`SQLStatementUtils`**: two new factory methods that build the SELECT and UPDATE statements +- **`EBeanDAOUtils`**: two new methods that parse `SqlRow` results into `EntityDeletionInfo`. Status fields are + extracted using `RecordUtils.toDataMap()` (the same Pegasus data framework used throughout the codebase — no manual + JSON parsing). + +### `InstrumentedEbeanLocalAccess` + +Both new methods are wired through the existing `instrument()` decorator, consistent with every other method on this +class. + +--- + +## Intended Usage + +``` +caller → readDeletionInfoBatch(urns) // 1 SELECT + → validate each URN, partition into eligible / skipped + → archive eligible to Kafka // service layer responsibility + → batchSoftDeleteAssets(eligible, cutoff) // 1 UPDATE +``` + +The two methods are designed to be called together in sequence. The `aspectColumns` field in `EntityDeletionInfo` +carries the full entity state needed for Kafka archival between the two calls. + +--- + +## Tests + +### `EbeanLocalAccessTest` — integration tests against embedded MariaDB + +Tests verify actual SQL execution and result correctness end-to-end. Each test inserts rows directly via raw SQL to +control `a_status` and `deleted_ts` precisely, then asserts on the returned `EntityDeletionInfo` values or on DB state +after the UPDATE. + +**`readDeletionInfoBatch`:** + +- Happy path: returns correct `statusRemoved`, `statusLastModifiedOn`, `deletedTs`, and `aspectColumns` for found URNs +- Empty input: returns empty map +- URNs not in DB: absent from result map +- Mixed found/not-found: only found URNs in result +- Already soft-deleted URN: `deletedTs` is non-null in result + +**`batchSoftDeleteAssets`:** + +- Happy path: eligible URNs are soft-deleted, returns correct affected row count +- Empty input: returns 0 +- `Status.removed = false`: guard clause blocks deletion +- Retention window not met (recent `lastmodifiedon`): guard clause blocks deletion +- Already soft-deleted (`deleted_ts` set): guard clause blocks re-deletion +- Mixed batch: only eligible URNs are deleted; ineligible ones are untouched + +The test SQL schema (both `ebean-local-access-create-all.sql` files) was extended with an `a_status` column on +`metadata_entity_foo` to support these tests. + +### `InstrumentedEbeanLocalAccessTest` — mock-based unit tests + +Verifies that `InstrumentedEbeanLocalAccess` correctly delegates both new methods to the underlying `IEbeanLocalAccess` +and records latency via `BaseDaoBenchmarkMetrics`. No database required. + +--- + +## Design Constraints + +- **DAO layer is Kafka-free.** No archival, no event publishing. The shared library stays generic. +- **Exactly 2 DB calls per batch.** No per-URN queries. +- **Guard clauses in the UPDATE.** Even if a caller skips the SELECT validation, the UPDATE will not soft-delete + entities that don't meet all safety conditions. +- **No hardcoded column names.** The Status aspect column name is resolved internally via + `SQLSchemaUtils.getAspectColumnName()`, consistent with how all other DAO methods resolve column names. Entity types + that map Status to a different column (e.g. `a_foo_bar`) work without changes. +- **Batch size limit.** Both methods reject batches exceeding 2000 URNs as defense-in-depth against overwhelming the DB. diff --git a/testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl b/testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl new file mode 100644 index 000000000..8d4ac4287 --- /dev/null +++ b/testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl @@ -0,0 +1,10 @@ +namespace com.linkedin.common + +/** + * Test stub for com.linkedin.common.Status — the real Status aspect lives in metadata-models. + * This stub exists so that batch deletion tests can resolve the Status column name via + * SQLSchemaUtils.getAspectColumnName() without depending on the downstream metadata-models MP. + */ +record Status { + removed: boolean = false +} diff --git a/testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl b/testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl new file mode 100644 index 000000000..97d323761 --- /dev/null +++ b/testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl @@ -0,0 +1,23 @@ +namespace com.linkedin.testing + +import com.linkedin.common.Status +import com.linkedin.testing.localrelationship.AspectFooBar +import com.linkedin.testing.localrelationship.AspectFooBaz +import com.linkedin.testing.localrelationship.AspectFooBarBaz + +/** + * For unit tests — asset model for FooUrn entity type. + * Must include all aspects used across test classes for the foo entity, + * since GlobalAssetRegistry is a static singleton shared across test classes. + */ +record FooAsset { + urn: optional FooUrn + aspectfoo: optional AspectFoo, + aspectbar: optional AspectBar, + aspectbaz: optional AspectBaz, + aspectfoobar: optional AspectFooBar, + aspectfoobaz: optional AspectFooBaz, + aspectfoobarbaz: optional AspectFooBarBaz, + aspectattributes: optional AspectAttributes, + status: optional Status +}