From 93ed2908b1c4b65bd8685ae94b3e3cf51a773dba Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Tue, 17 Mar 2026 14:39:01 -0700 Subject: [PATCH 01/10] feat(dao): add batch deletion DAO support for stale metadata cleanup Add readDeletionInfoBatch() and batchSoftDeleteAssets() to IEbeanLocalAccess to support bulk soft-deletion with exactly 2 DB round-trips per batch. - EntityDeletionInfo: new @Value @Builder data class in dao-api holding deletion eligibility fields and aspect columns for Kafka archival - readDeletionInfoBatch: single SELECT * to read all URNs and parse a_status for statusRemoved/statusLastModifiedOn - batchSoftDeleteAssets: single guarded UPDATE with defense-in-depth WHERE clauses (deleted_ts IS NULL, removed=true, lastmodifiedon < cutoff) - SQL templates in SQLStatementUtils, SqlRow parsing in EBeanDAOUtils (convertSqlRowsToEntityDeletionInfoMap / toEntityDeletionInfo) - InstrumentedEbeanLocalAccess: delegation with instrument() wrapper - Integration tests in EbeanLocalAccessTest (11 tests against embedded MariaDB) and mock-based tests in InstrumentedEbeanLocalAccessTest (2) Part of META-23501 -- Metadata Graph Stale Metadata Cleanup Phase 2. DAO layer is intentionally Kafka-free; archival lives in metadata-graph-assets. Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 72 ++++++ .../metadata/dao/EntityDeletionInfo.java | 27 +++ .../metadata/dao/EbeanLocalAccess.java | 21 ++ .../metadata/dao/IEbeanLocalAccess.java | 23 ++ .../dao/InstrumentedEbeanLocalAccess.java | 12 + .../metadata/dao/utils/EBeanDAOUtils.java | 68 ++++++ .../metadata/dao/utils/SQLStatementUtils.java | 46 ++++ .../metadata/dao/EbeanLocalAccessTest.java | 206 ++++++++++++++++++ .../dao/InstrumentedEbeanLocalAccessTest.java | 27 +++ .../dao/utils/EmbeddedMariaInstance.java | 11 +- ...l-with-non-dollar-virtual-column-names.sql | 3 + .../ebean-local-access-create-all.sql | 3 + spec/batch_delete_dao_changes.md | 95 ++++++++ 13 files changed, 607 insertions(+), 7 deletions(-) create mode 100644 CLAUDE.md create mode 100644 dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java create mode 100644 spec/batch_delete_dao_changes.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..22cdfd9d4 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,72 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +DataHub GMA (General Metadata Architecture) is the backend framework for LinkedIn's DataHub metadata search & discovery platform. It provides type-safe, schema-driven metadata management with event-driven data consistency across multiple storage backends (SQL via Ebean, Elasticsearch for search). Built on LinkedIn's Pegasus data framework and Rest.li for REST APIs. + +**Java version**: 1.8 (Java 8) — required, newer versions will cause build failures. + +## Build Commands + +```bash +./gradlew build # Full build + all tests +./gradlew :module-name:build # Build a specific module (e.g., :dao-api, :dao-impl:ebean-dao) +./gradlew :module-name:test # Run tests for a specific module +./gradlew spotlessCheck # Check code formatting (CI enforced) +./gradlew spotlessApply # Auto-fix formatting +./gradlew checkstyleMain # Run checkstyle on main sources +./gradlew idea # Generate IntelliJ project files +``` + +Tests use **TestNG** (not JUnit) as the default test framework across all subprojects. The elasticsearch integration tests require Docker. + +**Apple Silicon (M-series Mac)**: Requires `brew install mariadb` and uncommenting three lines in `EmbeddedMariaInstance.java` (see `docs/developers.md`). + +## Module Architecture + +``` +core-models/ → Pegasus PDL schemas: Urn, AuditStamp, Url, Time (no Java logic) +core-models-utils/ → URN utility helpers +dao-api/ → DAO abstractions (BaseLocalDAO, BaseSearchDAO, BaseBrowseDAO), + event producers, query utilities, retention policies +dao-impl/ + ebean-dao/ → SQL storage via Ebean ORM (EbeanLocalDAO, relationship queries) + elasticsearch-dao/ → ES 5.x/6.x search implementation + elasticsearch-dao-7/→ ES 7.x search implementation +restli-resources/ → Rest.li resource base classes (BaseEntityResource, + BaseSearchableEntityResource) mapping DAOs to REST endpoints +validators/ → Schema validators ensuring PDL models conform to GMA conventions + (AspectValidator, EntityValidator, SnapshotValidator, etc.) +gradle-plugins/ → Annotation parsing (@gma) and code generation for metadata events +testing/ → Test infrastructure, ES integration test harness, test models +``` + +## Key Architectural Patterns + +- **Urn (Universal Resource Name)**: `urn:li:entityType:entityKey` — the universal identifier for all entities. Typed URN subclasses provide entity-specific keys. +- **Aspect Union Pattern**: Each entity type defines a Pegasus union of its supported aspects. Validators enforce that union members are record types only. +- **Aspect Versioning**: Version 0 = latest. Each aspect write creates a new immutable version. Retention policies (indefinite, time-based, version-based) control history. +- **Layered Storage**: BaseLocalDAO (SQL, source of truth) → BaseSearchDAO (Elasticsearch, derived index) → BaseBrowseDAO (hierarchical navigation). BaseRemoteDAO proxies to other GMS instances. +- **Event Sourcing**: Writes to LocalDAO trigger MCE/MAE event emission via BaseMetadataEventProducer. The `gradle-plugins` auto-generate event PDL schemas from `@gma` annotations on aspect PDL files. +- **Generic Type Binding**: DAOs are heavily parameterized with generics (``) and validate type constraints at construction time using reflection via `ModelUtils`. + +## Pegasus/Rest.li Data Models + +PDL (Pegasus Data Language) schemas live in `src/main/pegasus/` directories and compile to Java `RecordTemplate` classes. Key namespaces: +- `com.linkedin.common.*` — Core types (Urn, AuditStamp) +- `com.linkedin.metadata.aspect.*` — Aspect wrappers +- `com.linkedin.metadata.query.*` — Search/filter structures +- `com.linkedin.metadata.events.*` — Change tracking types +- `com.linkedin.metadata.snapshot.*` — Entity snapshots (versioned aspect collections) + +When modifying PDL schemas, the Pegasus gradle plugin regenerates Java bindings automatically during build. + +## Commit Convention + +Follow [Conventional Commits](https://www.conventionalcommits.org/): `(scope): description` + +Types: `feat`, `fix`, `refactor`, `docs`, `test`, `perf`, `style`, `build`, `ci` + +Max line length: 88 characters. Use imperative present tense, no capitalized first letter, no trailing dot. diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java b/dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java new file mode 100644 index 000000000..6dbf46a74 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/EntityDeletionInfo.java @@ -0,0 +1,27 @@ +package com.linkedin.metadata.dao; + +import java.sql.Timestamp; +import java.util.Map; +import lombok.Builder; +import lombok.Value; + + +/** + * A value class that holds deletion-relevant fields for a single entity, used by batch deletion validation. + * Contains status flags for deletion eligibility checks and all aspect column values for Kafka archival. + */ +@Value +@Builder +public class EntityDeletionInfo { + + Timestamp deletedTs; + + boolean statusRemoved; + + String statusLastModifiedOn; + + /** + * All aspect column values (column name → raw JSON string) for Kafka archival by the service layer. + */ + Map aspectColumns; +} diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 73acd122e..b9c28fbc5 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -344,6 +344,27 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { return _server.createSqlUpdate(deleteSqlStatement).execute(); } + @Override + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + if (urns.isEmpty()) { + return Collections.emptyMap(); + } + + final String sql = SQLStatementUtils.createReadAllColumnsByUrnsSql(urns, isTestMode); + return EBeanDAOUtils.convertSqlRowsToEntityDeletionInfoMap( + _server.createSqlQuery(sql).findList(), _urnClass); + } + + @Override + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + if (urns.isEmpty()) { + return 0; + } + + final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, isTestMode); + return _server.createSqlUpdate(sql).execute(); + } + @Override public List listUrns(@Nullable IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 67532ced0..0ceb0723b 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -97,6 +97,29 @@ List batchGetUnion(@Nonnull */ int softDeleteAsset(@Nonnull URN urn, boolean isTestMode); + /** + * Read deletion-relevant fields for a batch of URNs in a single SELECT. + * Returns deletion-relevant fields for validation and all aspect columns for Kafka archival. + * URNs not found in the database will not have entries in the returned map. + * + * @param urns list of URNs to check + * @param isTestMode whether to use test schema + * @return map of URN to {@link EntityDeletionInfo} + */ + Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode); + + /** + * Batch soft-delete entities by setting deleted_ts = NOW() for URNs that meet all deletion criteria. + * The UPDATE includes guard clauses (deleted_ts IS NULL, Status.removed = true, lastmodifiedon < cutoff) + * as defense-in-depth against race conditions. + * + * @param urns list of URNs to soft-delete + * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp + * @param isTestMode whether to use test schema + * @return number of rows actually soft-deleted + */ + int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode); + /** * Returns list of urns that satisfy the given filter conditions. * diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java index 518b6be10..d9bda4898 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java @@ -99,6 +99,18 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { return instrument("softDeleteAsset", () -> _delegate.softDeleteAsset(urn, isTestMode)); } + @Override + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + return instrument("readDeletionInfoBatch.urns_" + urns.size(), + () -> _delegate.readDeletionInfoBatch(urns, isTestMode)); + } + + @Override + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + return instrument("batchSoftDeleteAssets.urns_" + urns.size(), + () -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode)); + } + @Override public List listUrns(@Nullable IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java index abb40335b..53fdd9a01 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.dao.utils; import com.linkedin.common.urn.Urn; +import com.linkedin.data.DataMap; import com.linkedin.data.schema.RecordDataSchema; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; @@ -12,6 +13,7 @@ import com.linkedin.metadata.aspect.AuditedAspect; import com.linkedin.metadata.aspect.SoftDeletedAspect; import com.linkedin.metadata.dao.EbeanMetadataAspect; +import com.linkedin.metadata.dao.EntityDeletionInfo; import com.linkedin.metadata.dao.ListResult; import com.linkedin.metadata.query.AspectField; import com.linkedin.metadata.query.Condition; @@ -257,6 +259,72 @@ public static List readSqlR }).collect(Collectors.toList()); } + /** + * Parse a list of {@link SqlRow} results (from a SELECT * on an entity table) into a map of + * URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and a_status columns. + * Rows that cannot be parsed as a valid URN are skipped with a warning. + * + * @param sqlRows list of {@link SqlRow} from entity table query + * @param urnClass URN class for deserialization + * @param URN type + * @return map of URN to {@link EntityDeletionInfo} + */ + public static Map convertSqlRowsToEntityDeletionInfoMap( + @Nonnull List sqlRows, @Nonnull Class urnClass) { + final Map result = new HashMap<>(); + for (SqlRow row : sqlRows) { + final String urnStr = row.getString("urn"); + try { + result.put(getUrn(urnStr, urnClass), toEntityDeletionInfo(row)); + } catch (IllegalArgumentException e) { + log.warn("Failed to parse URN string: {}, skipping row", urnStr, e); + } + } + return result; + } + + /** + * Parse a single {@link SqlRow} from an entity table SELECT * into an {@link EntityDeletionInfo}. + * Extracts deletion eligibility fields from a_status (statusRemoved, statusLastModifiedOn) + * and collects all aspect columns for Kafka archival. + * + * @param row {@link SqlRow} from entity table query + * @return {@link EntityDeletionInfo} + */ + @Nonnull + static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row) { + // Collect all aspect columns (a_* prefixed, non-null), same pattern as readSqlRows() + final Map aspectColumnValues = new HashMap<>(); + for (String key : row.keySet()) { + if (key.startsWith(SQLSchemaUtils.ASPECT_PREFIX) && row.get(key) != null) { + aspectColumnValues.put(key, row.getString(key)); + } + } + + // Parse a_status using RecordUtils (same pattern as readSqlRows / isSoftDeletedAspect) + boolean statusRemoved = false; + String statusLastModifiedOn = null; + final String statusJson = row.getString("a_status"); + if (statusJson != null) { + final DataMap statusData = RecordUtils.toDataMap(statusJson); + final Object lastModObj = statusData.get("lastmodifiedon"); + if (lastModObj != null) { + statusLastModifiedOn = lastModObj.toString(); + } + final Object aspectObj = statusData.get("aspect"); + if (aspectObj instanceof DataMap) { + statusRemoved = Boolean.TRUE.equals(((DataMap) aspectObj).get("removed")); + } + } + + return EntityDeletionInfo.builder() + .deletedTs(row.getTimestamp("deleted_ts")) + .statusRemoved(statusRemoved) + .statusLastModifiedOn(statusLastModifiedOn) + .aspectColumns(aspectColumnValues) + .build(); + } + /** * Read EbeanMetadataAspect from {@link SqlRow}. * @param sqlRow {@link SqlRow} diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 45cc8b32d..279499371 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -83,6 +83,15 @@ public class SQLStatementUtils { public static final String SQL_INSERT_ASSET_VALUES = "VALUES (:urn, :lastmodifiedon, :lastmodifiedby,"; // Delete prefix of the sql statement for deleting from metadata_aspect table public static final String SQL_SOFT_DELETE_ASSET_WITH_URN = "UPDATE %s SET deleted_ts = NOW() WHERE urn = '%s';"; + + private static final String SQL_READ_ALL_COLUMNS_BY_URNS_TEMPLATE = "SELECT * FROM %s WHERE urn IN (%s)"; + + private static final String SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE = + "UPDATE %s SET deleted_ts = NOW()" + + " WHERE urn IN (%s)" + + " AND deleted_ts IS NULL" + + " AND JSON_EXTRACT(a_status, '$.aspect.removed') = true" + + " AND JSON_EXTRACT(a_status, '$.lastmodifiedon') < '%s'"; // closing bracket for the sql statement INSERT prefix // e.g. INSERT INTO metadata_aspect (urn, a_urn, lastmodifiedon, lastmodifiedby) public static final String CLOSING_BRACKET = ") "; @@ -302,6 +311,43 @@ public static String createSoftDeleteAssetSql(@N return String.format(SQL_SOFT_DELETE_ASSET_WITH_URN, tableName, urn); } + /** + * Create SELECT * SQL statement for reading all columns for a batch of URNs. + * Used by batch deletion to read entity data for validation and Kafka archival. + * + * @param urns list of URNs to read + * @param isTestMode whether the test mode is enabled or not + * @return select all columns sql + */ + public static String createReadAllColumnsByUrnsSql(@Nonnull List urns, boolean isTestMode) { + final Urn firstUrn = urns.get(0); + final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); + final String urnList = urns.stream() + .map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'") + .collect(Collectors.joining(", ")); + return String.format(SQL_READ_ALL_COLUMNS_BY_URNS_TEMPLATE, tableName, urnList); + } + + /** + * Create batch soft-delete SQL statement with guard clauses for defense-in-depth. + * The UPDATE includes conditions (deleted_ts IS NULL, Status.removed = true, lastmodifiedon < cutoff) + * to protect against race conditions between validation SELECT and this UPDATE. + * + * @param urns list of URNs to soft-delete + * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp + * @param isTestMode whether the test mode is enabled or not + * @return batch soft-delete sql + */ + public static String createBatchSoftDeleteAssetSql(@Nonnull List urns, + @Nonnull String cutoffTimestamp, boolean isTestMode) { + final Urn firstUrn = urns.get(0); + final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); + final String urnList = urns.stream() + .map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'") + .collect(Collectors.joining(", ")); + return String.format(SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE, tableName, urnList, cutoffTimestamp); + } + /** * Create Update with optimistic locking SQL statement. The SQL UPDATE use old_timestamp as a compareAndSet to check * if the current update is made on an unchange record. For example: UPDATE table WHERE modifiedon = :oldTimestamp. diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 345960305..f7eb56db8 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -520,4 +520,210 @@ public void testDeleteAll() { int numRowsDeleted = _ebeanLocalAccessFoo.softDeleteAsset(fooUrn, false); assertEquals(numRowsDeleted, 1); } + + // ==================== readDeletionInfoBatch tests ==================== + + /** + * Helper to insert a row into metadata_entity_foo with specific a_status JSON via raw SQL. + * Uses URN IDs starting at 500+ to avoid collisions with the 0-99 range from setupTest(). + */ + private void insertFooEntityWithStatus(int id, String statusJson, String deletedTs) { + String urn = "urn:li:foo:" + id; + String deletedTsClause = deletedTs != null ? "'" + deletedTs + "'" : "NULL"; + String statusClause = statusJson != null ? "'" + statusJson + "'" : "NULL"; + String sql = String.format( + "INSERT INTO metadata_entity_foo (urn, lastmodifiedon, lastmodifiedby, a_status, deleted_ts) " + + "VALUES ('%s', NOW(), 'testActor', %s, %s) " + + "ON DUPLICATE KEY UPDATE a_status = %s, deleted_ts = %s", + urn, statusClause, deletedTsClause, statusClause, deletedTsClause); + _server.createSqlUpdate(sql).execute(); + } + + private static String makeStatusJson(boolean removed, String lastModifiedOn) { + return String.format("{\"aspect\":{\"removed\":%s},\"lastmodifiedon\":\"%s\",\"lastmodifiedby\":\"urn:li:corpuser:testActor\"}", + removed, lastModifiedOn); + } + + @Test + public void testReadDeletionInfoBatch_happyPath() { + // Given: 3 URNs with known status + String oldTimestamp = "2025-01-01 00:00:00.000"; + insertFooEntityWithStatus(500, makeStatusJson(true, oldTimestamp), null); + insertFooEntityWithStatus(501, makeStatusJson(false, oldTimestamp), null); + insertFooEntityWithStatus(502, makeStatusJson(true, oldTimestamp), null); + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(500)); + urns.add(makeFooUrn(501)); + urns.add(makeFooUrn(502)); + + // When + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + + // Then: all 3 returned with correct fields + assertEquals(result.size(), 3); + + EntityDeletionInfo info500 = result.get(makeFooUrn(500)); + assertNotNull(info500); + assertNull(info500.getDeletedTs()); + assertTrue(info500.isStatusRemoved()); + assertNotNull(info500.getAspectColumns()); + assertTrue(info500.getAspectColumns().containsKey("a_status")); + + EntityDeletionInfo info501 = result.get(makeFooUrn(501)); + assertNotNull(info501); + assertFalse(info501.isStatusRemoved()); + } + + @Test + public void testReadDeletionInfoBatch_emptyList() { + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), false); + assertTrue(result.isEmpty()); + } + + @Test + public void testReadDeletionInfoBatch_nonExistentUrns() { + List urns = Collections.singletonList(makeFooUrn(9998)); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + // Non-existent URNs are simply absent from the map + assertFalse(result.containsKey(makeFooUrn(9998))); + } + + @Test + public void testReadDeletionInfoBatch_mixedExistAndNonExist() { + String statusJson = makeStatusJson(true, "2025-01-01 00:00:00.000"); + insertFooEntityWithStatus(510, statusJson, null); + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(510)); // exists + urns.add(makeFooUrn(9997)); // does not exist + + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + + assertEquals(result.size(), 1); + assertTrue(result.containsKey(makeFooUrn(510))); + assertFalse(result.containsKey(makeFooUrn(9997))); + } + + @Test + public void testReadDeletionInfoBatch_alreadySoftDeleted() { + String statusJson = makeStatusJson(true, "2025-01-01 00:00:00.000"); + insertFooEntityWithStatus(520, statusJson, "2025-06-01 00:00:00.000"); + + List urns = Collections.singletonList(makeFooUrn(520)); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + + assertEquals(result.size(), 1); + EntityDeletionInfo info = result.get(makeFooUrn(520)); + assertNotNull(info); + assertNotNull(info.getDeletedTs()); + } + + // ==================== batchSoftDeleteAssets tests ==================== + + @Test + public void testBatchSoftDeleteAssets_happyPath() { + // Given: URNs with Status.removed=true and old lastmodifiedon + String oldTimestamp = "2025-01-01 00:00:00.000"; + insertFooEntityWithStatus(600, makeStatusJson(true, oldTimestamp), null); + insertFooEntityWithStatus(601, makeStatusJson(true, oldTimestamp), null); + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(600)); + urns.add(makeFooUrn(601)); + + // Cutoff is after the lastmodifiedon, so these should be eligible + String cutoffTimestamp = "2026-01-01 00:00:00.000"; + + // When + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, cutoffTimestamp, false); + + // Then: both rows soft-deleted + assertEquals(rowsAffected, 2); + + // Verify deleted_ts is set in DB + SqlRow row600 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:600'").findOne(); + assertNotNull(row600.getTimestamp("deleted_ts")); + SqlRow row601 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:601'").findOne(); + assertNotNull(row601.getTimestamp("deleted_ts")); + } + + @Test + public void testBatchSoftDeleteAssets_emptyList() { + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", false); + assertEquals(rowsAffected, 0); + } + + @Test + public void testBatchSoftDeleteAssets_guardsPreventDeletion_statusNotRemoved() { + // Given: URN with Status.removed=false + insertFooEntityWithStatus(610, makeStatusJson(false, "2025-01-01 00:00:00.000"), null); + + List urns = Collections.singletonList(makeFooUrn(610)); + + // When: cutoff is in the future (would pass retention check) + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + + // Then: guard clause prevents deletion + assertEquals(rowsAffected, 0); + SqlRow row = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:610'").findOne(); + assertNull(row.getTimestamp("deleted_ts")); + } + + @Test + public void testBatchSoftDeleteAssets_guardsPreventDeletion_retentionNotMet() { + // Given: URN with Status.removed=true but recent lastmodifiedon + String recentTimestamp = "2026-03-01 00:00:00.000"; + insertFooEntityWithStatus(620, makeStatusJson(true, recentTimestamp), null); + + List urns = Collections.singletonList(makeFooUrn(620)); + + // When: cutoff is BEFORE the lastmodifiedon (retention window not met) + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-02-01 00:00:00.000", false); + + // Then: guard clause prevents deletion + assertEquals(rowsAffected, 0); + } + + @Test + public void testBatchSoftDeleteAssets_guardsPreventDeletion_alreadyDeleted() { + // Given: URN already soft-deleted + insertFooEntityWithStatus(630, makeStatusJson(true, "2025-01-01 00:00:00.000"), "2025-06-01 00:00:00.000"); + + List urns = Collections.singletonList(makeFooUrn(630)); + + // When + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + + // Then: guard clause prevents re-deletion + assertEquals(rowsAffected, 0); + } + + @Test + public void testBatchSoftDeleteAssets_mixedEligibility() { + // Given: mix of eligible and ineligible URNs + String oldTimestamp = "2025-01-01 00:00:00.000"; + insertFooEntityWithStatus(640, makeStatusJson(true, oldTimestamp), null); // eligible + insertFooEntityWithStatus(641, makeStatusJson(false, oldTimestamp), null); // ineligible: not removed + insertFooEntityWithStatus(642, makeStatusJson(true, oldTimestamp), null); // eligible + + List urns = new ArrayList<>(); + urns.add(makeFooUrn(640)); + urns.add(makeFooUrn(641)); + urns.add(makeFooUrn(642)); + + // When + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + + // Then: only 2 eligible rows deleted + assertEquals(rowsAffected, 2); + + // Verify: 640 and 642 deleted, 641 not + SqlRow row640 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:640'").findOne(); + assertNotNull(row640.getTimestamp("deleted_ts")); + SqlRow row641 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:641'").findOne(); + assertNull(row641.getTimestamp("deleted_ts")); + SqlRow row642 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:642'").findOne(); + assertNotNull(row642.getTimestamp("deleted_ts")); + } } \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java index 57b115a64..842bc5441 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java @@ -215,4 +215,31 @@ public void testSetUrnPathExtractorDelegates() { _instrumented.setUrnPathExtractor(null); verify(_mockDelegate).setUrnPathExtractor(null); } + + @Test + public void testReadDeletionInfoBatchDelegatesAndRecordsLatency() { + Map expected = new HashMap<>(); + when(_mockDelegate.readDeletionInfoBatch(any(), anyBoolean())).thenReturn(expected); + + List urns = Collections.singletonList(null); + Map result = _instrumented.readDeletionInfoBatch(urns, false); + + assertSame(result, expected); + verify(_mockDelegate).readDeletionInfoBatch(urns, false); + verify(_mockMetrics).recordOperationLatency(eq("readDeletionInfoBatch.urns_1"), eq("test"), anyLong()); + verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); + } + + @Test + public void testBatchSoftDeleteAssetsDelegatesAndRecordsLatency() { + when(_mockDelegate.batchSoftDeleteAssets(any(), any(), anyBoolean())).thenReturn(5); + + List urns = Collections.singletonList(null); + int result = _instrumented.batchSoftDeleteAssets(urns, "2026-01-01", false); + + assertEquals(result, 5); + verify(_mockDelegate).batchSoftDeleteAssets(urns, "2026-01-01", false); + verify(_mockMetrics).recordOperationLatency(eq("batchSoftDeleteAssets.urns_1"), eq("test"), anyLong()); + verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); + } } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java index b4bcda51a..12fcbca0c 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java @@ -88,13 +88,10 @@ private static void initDB() { configurationBuilder.setDataDir(baseDbDir + File.separator + "data"); configurationBuilder.setBaseDir(baseDbDir + File.separator + "base"); - /* - * Add below 3 lines of code if building datahub-gma on a M1 / M2 chip Apple computer. - * - * configurationBuilder.setBaseDir("/opt/homebrew"); - * configurationBuilder.setUnpackingFromClasspath(false); - * configurationBuilder.setLibDir(System.getProperty("java.io.tmpdir") + "/MariaDB4j/no-libs"); - */ + // Apple Silicon (M1/M2/M3) configuration — uses locally installed MariaDB + configurationBuilder.setBaseDir("/opt/homebrew"); + configurationBuilder.setUnpackingFromClasspath(false); + configurationBuilder.setLibDir(System.getProperty("java.io.tmpdir") + "/MariaDB4j/no-libs"); try { // ensure the DB directory is deleted before we start to have a clean start diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql index a8761aa9d..6ecbfcdb2 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all-with-non-dollar-virtual-column-names.sql @@ -73,6 +73,9 @@ ALTER TABLE metadata_entity_bar ADD a_urn JSON; ALTER TABLE metadata_entity_foo ADD COLUMN i_urn0fooId VARCHAR(255) GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(a_urn, '$."\\\/fooId"'))); +-- add status aspect to foo entity (used by batch delete tests) +ALTER TABLE metadata_entity_foo ADD a_status JSON; + -- add foo aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectfoo JSON; diff --git a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql index 55042f2da..c5c6ab7e0 100644 --- a/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql +++ b/dao-impl/ebean-dao/src/test/resources/ebean-local-access-create-all.sql @@ -73,6 +73,9 @@ ALTER TABLE metadata_entity_bar ADD a_urn JSON; ALTER TABLE metadata_entity_foo ADD COLUMN i_urn$fooId VARCHAR(255) GENERATED ALWAYS AS (JSON_UNQUOTE(JSON_EXTRACT(a_urn, '$."\\\/fooId"'))); +-- add status aspect to foo entity (used by batch delete tests) +ALTER TABLE metadata_entity_foo ADD a_status JSON; + -- add foo aspect to foo entity ALTER TABLE metadata_entity_foo ADD a_aspectfoo JSON; diff --git a/spec/batch_delete_dao_changes.md b/spec/batch_delete_dao_changes.md new file mode 100644 index 000000000..8d3e88e06 --- /dev/null +++ b/spec/batch_delete_dao_changes.md @@ -0,0 +1,95 @@ +# Batch Deletion DAO Support + +**PR scope:** datahub-gma only +**Project:** META-23501 — Metadata Graph Stale Metadata Cleanup Phase 2 + +--- + +## Why + +Stale metadata cleanup jobs need to soft-delete entities in bulk. The existing `softDeleteAsset()` method operates on a single URN. This change adds two new DAO operations that work on a batch of URNs in exactly two DB round-trips, regardless of batch size. + +The DAO layer is intentionally kept simple: pure SQL, no business logic, no Kafka. The consuming service (`metadata-graph-assets`) handles validation, Kafka archival, and per-URN result reporting. + +--- + +## What Was Added + +### `EntityDeletionInfo` (new, `dao-api`) + +An immutable value object returned by the batch read operation. Contains the fields a caller needs to determine whether each entity is eligible for deletion: + +- `deletedTs` — whether the entity is already soft-deleted +- `statusRemoved` — whether the entity's Status aspect has `removed = true` +- `statusLastModifiedOn` — when the Status was last changed (for retention window checks) +- `aspectColumns` — all aspect column values as raw JSON strings, for use by the service layer's Kafka archival step + +Presence in the returned map means the entity exists. Absence means it was not found. + +### Two new `IEbeanLocalAccess` methods + +**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single `SELECT *`. Returns a map of URN → `EntityDeletionInfo` for all URNs found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not found." + +**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The WHERE clause embeds all safety guard conditions (not already deleted, Status.removed = true, lastmodifiedon before cutoff) as defense-in-depth against race conditions between the SELECT and UPDATE. + +### Layered implementation (no logic in `EbeanLocalAccess`) + +Following the established pattern of `batchGetUnion`, `EbeanLocalAccess` methods are thin: they delegate SQL generation to `SQLStatementUtils` and result parsing to `EBeanDAOUtils`. No SQL strings or parsing logic live directly in `EbeanLocalAccess`. + +- **`SQLStatementUtils`**: two new factory methods that build the SELECT and UPDATE statements +- **`EBeanDAOUtils`**: two new methods that parse `SqlRow` results into `EntityDeletionInfo`. Status fields are extracted using `RecordUtils.toDataMap()` (the same Pegasus data framework used throughout the codebase — no manual JSON parsing). + +### `InstrumentedEbeanLocalAccess` + +Both new methods are wired through the existing `instrument()` decorator, consistent with every other method on this class. + +--- + +## Intended Usage + +``` +caller → readDeletionInfoBatch(urns) // 1 SELECT + → validate each URN, partition into eligible / skipped + → archive eligible to Kafka // service layer responsibility + → batchSoftDeleteAssets(eligible, cutoff) // 1 UPDATE +``` + +The two methods are designed to be called together in sequence. The `aspectColumns` field in `EntityDeletionInfo` carries the full entity state needed for Kafka archival between the two calls. + +--- + +## Tests + +### `EbeanLocalAccessTest` — integration tests against embedded MariaDB + +Tests verify actual SQL execution and result correctness end-to-end. Each test inserts rows directly via raw SQL to control `a_status` and `deleted_ts` precisely, then asserts on the returned `EntityDeletionInfo` values or on DB state after the UPDATE. + +**`readDeletionInfoBatch`:** +- Happy path: returns correct `statusRemoved`, `statusLastModifiedOn`, `deletedTs`, and `aspectColumns` for found URNs +- Empty input: returns empty map +- URNs not in DB: absent from result map +- Mixed found/not-found: only found URNs in result +- Already soft-deleted URN: `deletedTs` is non-null in result + +**`batchSoftDeleteAssets`:** +- Happy path: eligible URNs are soft-deleted, returns correct affected row count +- Empty input: returns 0 +- `Status.removed = false`: guard clause blocks deletion +- Retention window not met (recent `lastmodifiedon`): guard clause blocks deletion +- Already soft-deleted (`deleted_ts` set): guard clause blocks re-deletion +- Mixed batch: only eligible URNs are deleted; ineligible ones are untouched + +The test SQL schema (both `ebean-local-access-create-all.sql` files) was extended with an `a_status` column on `metadata_entity_foo` to support these tests. + +### `InstrumentedEbeanLocalAccessTest` — mock-based unit tests + +Verifies that `InstrumentedEbeanLocalAccess` correctly delegates both new methods to the underlying `IEbeanLocalAccess` and records latency via `BaseDaoBenchmarkMetrics`. No database required. + +--- + +## Design Constraints + +- **DAO layer is Kafka-free.** No archival, no event publishing. The shared library stays generic. +- **Exactly 2 DB calls per batch.** No per-URN queries. +- **Guard clauses in the UPDATE.** Even if a caller skips the SELECT validation, the UPDATE will not soft-delete entities that don't meet all safety conditions. +- **`a_status` column is the only schema assumption.** The two methods rely on `a_status` existing in the entity table. All other aspect columns are collected generically by column name prefix. From aead6f1a78008e049524eddd5b93a43fd48aa115 Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Wed, 18 Mar 2026 09:23:35 -0700 Subject: [PATCH 02/10] fix: revert EmbeddedMariaInstance Apple Silicon config, fix checkstyle and spotless - Revert EmbeddedMariaInstance Apple Silicon config back to commented-out (was activated for local testing only, must not be in PR) - Rename test methods to camelCase to satisfy checkstyle method naming rule - Apply spotless formatting to CLAUDE.md and spec/batch_delete_dao_changes.md Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 33 ++++++++---- .../metadata/dao/utils/EBeanDAOUtils.java | 1 - .../metadata/dao/EbeanLocalAccessTest.java | 22 ++++---- .../dao/utils/EmbeddedMariaInstance.java | 11 ++-- spec/batch_delete_dao_changes.md | 51 ++++++++++++------- 5 files changed, 75 insertions(+), 43 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 22cdfd9d4..5c3be3250 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,10 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -DataHub GMA (General Metadata Architecture) is the backend framework for LinkedIn's DataHub metadata search & discovery platform. It provides type-safe, schema-driven metadata management with event-driven data consistency across multiple storage backends (SQL via Ebean, Elasticsearch for search). Built on LinkedIn's Pegasus data framework and Rest.li for REST APIs. +DataHub GMA (General Metadata Architecture) is the backend framework for LinkedIn's DataHub metadata search & discovery +platform. It provides type-safe, schema-driven metadata management with event-driven data consistency across multiple +storage backends (SQL via Ebean, Elasticsearch for search). Built on LinkedIn's Pegasus data framework and Rest.li for +REST APIs. **Java version**: 1.8 (Java 8) — required, newer versions will cause build failures. @@ -20,9 +23,11 @@ DataHub GMA (General Metadata Architecture) is the backend framework for LinkedI ./gradlew idea # Generate IntelliJ project files ``` -Tests use **TestNG** (not JUnit) as the default test framework across all subprojects. The elasticsearch integration tests require Docker. +Tests use **TestNG** (not JUnit) as the default test framework across all subprojects. The elasticsearch integration +tests require Docker. -**Apple Silicon (M-series Mac)**: Requires `brew install mariadb` and uncommenting three lines in `EmbeddedMariaInstance.java` (see `docs/developers.md`). +**Apple Silicon (M-series Mac)**: Requires `brew install mariadb` and uncommenting three lines in +`EmbeddedMariaInstance.java` (see `docs/developers.md`). ## Module Architecture @@ -45,16 +50,24 @@ testing/ → Test infrastructure, ES integration test harness, test ## Key Architectural Patterns -- **Urn (Universal Resource Name)**: `urn:li:entityType:entityKey` — the universal identifier for all entities. Typed URN subclasses provide entity-specific keys. -- **Aspect Union Pattern**: Each entity type defines a Pegasus union of its supported aspects. Validators enforce that union members are record types only. -- **Aspect Versioning**: Version 0 = latest. Each aspect write creates a new immutable version. Retention policies (indefinite, time-based, version-based) control history. -- **Layered Storage**: BaseLocalDAO (SQL, source of truth) → BaseSearchDAO (Elasticsearch, derived index) → BaseBrowseDAO (hierarchical navigation). BaseRemoteDAO proxies to other GMS instances. -- **Event Sourcing**: Writes to LocalDAO trigger MCE/MAE event emission via BaseMetadataEventProducer. The `gradle-plugins` auto-generate event PDL schemas from `@gma` annotations on aspect PDL files. -- **Generic Type Binding**: DAOs are heavily parameterized with generics (``) and validate type constraints at construction time using reflection via `ModelUtils`. +- **Urn (Universal Resource Name)**: `urn:li:entityType:entityKey` — the universal identifier for all entities. Typed + URN subclasses provide entity-specific keys. +- **Aspect Union Pattern**: Each entity type defines a Pegasus union of its supported aspects. Validators enforce that + union members are record types only. +- **Aspect Versioning**: Version 0 = latest. Each aspect write creates a new immutable version. Retention policies + (indefinite, time-based, version-based) control history. +- **Layered Storage**: BaseLocalDAO (SQL, source of truth) → BaseSearchDAO (Elasticsearch, derived index) → + BaseBrowseDAO (hierarchical navigation). BaseRemoteDAO proxies to other GMS instances. +- **Event Sourcing**: Writes to LocalDAO trigger MCE/MAE event emission via BaseMetadataEventProducer. The + `gradle-plugins` auto-generate event PDL schemas from `@gma` annotations on aspect PDL files. +- **Generic Type Binding**: DAOs are heavily parameterized with generics (``) and validate type + constraints at construction time using reflection via `ModelUtils`. ## Pegasus/Rest.li Data Models -PDL (Pegasus Data Language) schemas live in `src/main/pegasus/` directories and compile to Java `RecordTemplate` classes. Key namespaces: +PDL (Pegasus Data Language) schemas live in `src/main/pegasus/` directories and compile to Java `RecordTemplate` +classes. Key namespaces: + - `com.linkedin.common.*` — Core types (Urn, AuditStamp) - `com.linkedin.metadata.aspect.*` — Aspect wrappers - `com.linkedin.metadata.query.*` — Search/filter structures diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java index 53fdd9a01..d118b01e7 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java @@ -301,7 +301,6 @@ static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row) { } } - // Parse a_status using RecordUtils (same pattern as readSqlRows / isSoftDeletedAspect) boolean statusRemoved = false; String statusLastModifiedOn = null; final String statusJson = row.getString("a_status"); diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index f7eb56db8..6222ddc78 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -545,7 +545,7 @@ private static String makeStatusJson(boolean removed, String lastModifiedOn) { } @Test - public void testReadDeletionInfoBatch_happyPath() { + public void testReadDeletionInfoBatchHappyPath() { // Given: 3 URNs with known status String oldTimestamp = "2025-01-01 00:00:00.000"; insertFooEntityWithStatus(500, makeStatusJson(true, oldTimestamp), null); @@ -576,13 +576,13 @@ public void testReadDeletionInfoBatch_happyPath() { } @Test - public void testReadDeletionInfoBatch_emptyList() { + public void testReadDeletionInfoBatchEmptyList() { Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), false); assertTrue(result.isEmpty()); } @Test - public void testReadDeletionInfoBatch_nonExistentUrns() { + public void testReadDeletionInfoBatchNonExistentUrns() { List urns = Collections.singletonList(makeFooUrn(9998)); Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); // Non-existent URNs are simply absent from the map @@ -590,7 +590,7 @@ public void testReadDeletionInfoBatch_nonExistentUrns() { } @Test - public void testReadDeletionInfoBatch_mixedExistAndNonExist() { + public void testReadDeletionInfoBatchMixedExistAndNonExist() { String statusJson = makeStatusJson(true, "2025-01-01 00:00:00.000"); insertFooEntityWithStatus(510, statusJson, null); @@ -606,7 +606,7 @@ public void testReadDeletionInfoBatch_mixedExistAndNonExist() { } @Test - public void testReadDeletionInfoBatch_alreadySoftDeleted() { + public void testReadDeletionInfoBatchAlreadySoftDeleted() { String statusJson = makeStatusJson(true, "2025-01-01 00:00:00.000"); insertFooEntityWithStatus(520, statusJson, "2025-06-01 00:00:00.000"); @@ -622,7 +622,7 @@ public void testReadDeletionInfoBatch_alreadySoftDeleted() { // ==================== batchSoftDeleteAssets tests ==================== @Test - public void testBatchSoftDeleteAssets_happyPath() { + public void testBatchSoftDeleteAssetsHappyPath() { // Given: URNs with Status.removed=true and old lastmodifiedon String oldTimestamp = "2025-01-01 00:00:00.000"; insertFooEntityWithStatus(600, makeStatusJson(true, oldTimestamp), null); @@ -649,13 +649,13 @@ public void testBatchSoftDeleteAssets_happyPath() { } @Test - public void testBatchSoftDeleteAssets_emptyList() { + public void testBatchSoftDeleteAssetsEmptyList() { int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", false); assertEquals(rowsAffected, 0); } @Test - public void testBatchSoftDeleteAssets_guardsPreventDeletion_statusNotRemoved() { + public void testBatchSoftDeleteAssetsGuardsStatusNotRemoved() { // Given: URN with Status.removed=false insertFooEntityWithStatus(610, makeStatusJson(false, "2025-01-01 00:00:00.000"), null); @@ -671,7 +671,7 @@ public void testBatchSoftDeleteAssets_guardsPreventDeletion_statusNotRemoved() { } @Test - public void testBatchSoftDeleteAssets_guardsPreventDeletion_retentionNotMet() { + public void testBatchSoftDeleteAssetsGuardsRetentionNotMet() { // Given: URN with Status.removed=true but recent lastmodifiedon String recentTimestamp = "2026-03-01 00:00:00.000"; insertFooEntityWithStatus(620, makeStatusJson(true, recentTimestamp), null); @@ -686,7 +686,7 @@ public void testBatchSoftDeleteAssets_guardsPreventDeletion_retentionNotMet() { } @Test - public void testBatchSoftDeleteAssets_guardsPreventDeletion_alreadyDeleted() { + public void testBatchSoftDeleteAssetsGuardsAlreadyDeleted() { // Given: URN already soft-deleted insertFooEntityWithStatus(630, makeStatusJson(true, "2025-01-01 00:00:00.000"), "2025-06-01 00:00:00.000"); @@ -700,7 +700,7 @@ public void testBatchSoftDeleteAssets_guardsPreventDeletion_alreadyDeleted() { } @Test - public void testBatchSoftDeleteAssets_mixedEligibility() { + public void testBatchSoftDeleteAssetsMixedEligibility() { // Given: mix of eligible and ineligible URNs String oldTimestamp = "2025-01-01 00:00:00.000"; insertFooEntityWithStatus(640, makeStatusJson(true, oldTimestamp), null); // eligible diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java index 12fcbca0c..b4bcda51a 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/utils/EmbeddedMariaInstance.java @@ -88,10 +88,13 @@ private static void initDB() { configurationBuilder.setDataDir(baseDbDir + File.separator + "data"); configurationBuilder.setBaseDir(baseDbDir + File.separator + "base"); - // Apple Silicon (M1/M2/M3) configuration — uses locally installed MariaDB - configurationBuilder.setBaseDir("/opt/homebrew"); - configurationBuilder.setUnpackingFromClasspath(false); - configurationBuilder.setLibDir(System.getProperty("java.io.tmpdir") + "/MariaDB4j/no-libs"); + /* + * Add below 3 lines of code if building datahub-gma on a M1 / M2 chip Apple computer. + * + * configurationBuilder.setBaseDir("/opt/homebrew"); + * configurationBuilder.setUnpackingFromClasspath(false); + * configurationBuilder.setLibDir(System.getProperty("java.io.tmpdir") + "/MariaDB4j/no-libs"); + */ try { // ensure the DB directory is deleted before we start to have a clean start diff --git a/spec/batch_delete_dao_changes.md b/spec/batch_delete_dao_changes.md index 8d3e88e06..f5d154317 100644 --- a/spec/batch_delete_dao_changes.md +++ b/spec/batch_delete_dao_changes.md @@ -1,15 +1,17 @@ # Batch Deletion DAO Support -**PR scope:** datahub-gma only -**Project:** META-23501 — Metadata Graph Stale Metadata Cleanup Phase 2 +**PR scope:** datahub-gma only **Project:** META-23501 — Metadata Graph Stale Metadata Cleanup Phase 2 --- ## Why -Stale metadata cleanup jobs need to soft-delete entities in bulk. The existing `softDeleteAsset()` method operates on a single URN. This change adds two new DAO operations that work on a batch of URNs in exactly two DB round-trips, regardless of batch size. +Stale metadata cleanup jobs need to soft-delete entities in bulk. The existing `softDeleteAsset()` method operates on a +single URN. This change adds two new DAO operations that work on a batch of URNs in exactly two DB round-trips, +regardless of batch size. -The DAO layer is intentionally kept simple: pure SQL, no business logic, no Kafka. The consuming service (`metadata-graph-assets`) handles validation, Kafka archival, and per-URN result reporting. +The DAO layer is intentionally kept simple: pure SQL, no business logic, no Kafka. The consuming service +(`metadata-graph-assets`) handles validation, Kafka archival, and per-URN result reporting. --- @@ -17,7 +19,8 @@ The DAO layer is intentionally kept simple: pure SQL, no business logic, no Kafk ### `EntityDeletionInfo` (new, `dao-api`) -An immutable value object returned by the batch read operation. Contains the fields a caller needs to determine whether each entity is eligible for deletion: +An immutable value object returned by the batch read operation. Contains the fields a caller needs to determine whether +each entity is eligible for deletion: - `deletedTs` — whether the entity is already soft-deleted - `statusRemoved` — whether the entity's Status aspect has `removed = true` @@ -28,20 +31,25 @@ Presence in the returned map means the entity exists. Absence means it was not f ### Two new `IEbeanLocalAccess` methods -**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single `SELECT *`. Returns a map of URN → `EntityDeletionInfo` for all URNs found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not found." +**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single `SELECT *`. Returns a map of +URN → `EntityDeletionInfo` for all URNs found. URNs not present in the DB are simply absent from the result — the caller +treats absence as "not found." -**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The WHERE clause embeds all safety guard conditions (not already deleted, Status.removed = true, lastmodifiedon before cutoff) as defense-in-depth against race conditions between the SELECT and UPDATE. +**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The WHERE clause embeds all safety guard +conditions (not already deleted, Status.removed = true, lastmodifiedon before cutoff) as defense-in-depth against race +conditions between the SELECT and UPDATE. ### Layered implementation (no logic in `EbeanLocalAccess`) -Following the established pattern of `batchGetUnion`, `EbeanLocalAccess` methods are thin: they delegate SQL generation to `SQLStatementUtils` and result parsing to `EBeanDAOUtils`. No SQL strings or parsing logic live directly in `EbeanLocalAccess`. - - **`SQLStatementUtils`**: two new factory methods that build the SELECT and UPDATE statements -- **`EBeanDAOUtils`**: two new methods that parse `SqlRow` results into `EntityDeletionInfo`. Status fields are extracted using `RecordUtils.toDataMap()` (the same Pegasus data framework used throughout the codebase — no manual JSON parsing). +- **`EBeanDAOUtils`**: two new methods that parse `SqlRow` results into `EntityDeletionInfo`. Status fields are + extracted using `RecordUtils.toDataMap()` (the same Pegasus data framework used throughout the codebase — no manual + JSON parsing). ### `InstrumentedEbeanLocalAccess` -Both new methods are wired through the existing `instrument()` decorator, consistent with every other method on this class. +Both new methods are wired through the existing `instrument()` decorator, consistent with every other method on this +class. --- @@ -54,7 +62,8 @@ caller → readDeletionInfoBatch(urns) // 1 SELECT → batchSoftDeleteAssets(eligible, cutoff) // 1 UPDATE ``` -The two methods are designed to be called together in sequence. The `aspectColumns` field in `EntityDeletionInfo` carries the full entity state needed for Kafka archival between the two calls. +The two methods are designed to be called together in sequence. The `aspectColumns` field in `EntityDeletionInfo` +carries the full entity state needed for Kafka archival between the two calls. --- @@ -62,9 +71,12 @@ The two methods are designed to be called together in sequence. The `aspectColum ### `EbeanLocalAccessTest` — integration tests against embedded MariaDB -Tests verify actual SQL execution and result correctness end-to-end. Each test inserts rows directly via raw SQL to control `a_status` and `deleted_ts` precisely, then asserts on the returned `EntityDeletionInfo` values or on DB state after the UPDATE. +Tests verify actual SQL execution and result correctness end-to-end. Each test inserts rows directly via raw SQL to +control `a_status` and `deleted_ts` precisely, then asserts on the returned `EntityDeletionInfo` values or on DB state +after the UPDATE. **`readDeletionInfoBatch`:** + - Happy path: returns correct `statusRemoved`, `statusLastModifiedOn`, `deletedTs`, and `aspectColumns` for found URNs - Empty input: returns empty map - URNs not in DB: absent from result map @@ -72,6 +84,7 @@ Tests verify actual SQL execution and result correctness end-to-end. Each test i - Already soft-deleted URN: `deletedTs` is non-null in result **`batchSoftDeleteAssets`:** + - Happy path: eligible URNs are soft-deleted, returns correct affected row count - Empty input: returns 0 - `Status.removed = false`: guard clause blocks deletion @@ -79,11 +92,13 @@ Tests verify actual SQL execution and result correctness end-to-end. Each test i - Already soft-deleted (`deleted_ts` set): guard clause blocks re-deletion - Mixed batch: only eligible URNs are deleted; ineligible ones are untouched -The test SQL schema (both `ebean-local-access-create-all.sql` files) was extended with an `a_status` column on `metadata_entity_foo` to support these tests. +The test SQL schema (both `ebean-local-access-create-all.sql` files) was extended with an `a_status` column on +`metadata_entity_foo` to support these tests. ### `InstrumentedEbeanLocalAccessTest` — mock-based unit tests -Verifies that `InstrumentedEbeanLocalAccess` correctly delegates both new methods to the underlying `IEbeanLocalAccess` and records latency via `BaseDaoBenchmarkMetrics`. No database required. +Verifies that `InstrumentedEbeanLocalAccess` correctly delegates both new methods to the underlying `IEbeanLocalAccess` +and records latency via `BaseDaoBenchmarkMetrics`. No database required. --- @@ -91,5 +106,7 @@ Verifies that `InstrumentedEbeanLocalAccess` correctly delegates both new method - **DAO layer is Kafka-free.** No archival, no event publishing. The shared library stays generic. - **Exactly 2 DB calls per batch.** No per-URN queries. -- **Guard clauses in the UPDATE.** Even if a caller skips the SELECT validation, the UPDATE will not soft-delete entities that don't meet all safety conditions. -- **`a_status` column is the only schema assumption.** The two methods rely on `a_status` existing in the entity table. All other aspect columns are collected generically by column name prefix. +- **Guard clauses in the UPDATE.** Even if a caller skips the SELECT validation, the UPDATE will not soft-delete + entities that don't meet all safety conditions. +- **`a_status` column is the only schema assumption.** The two methods rely on `a_status` existing in the entity table. + All other aspect columns are collected generically by column name prefix. From 0d1443e357871f724023b3407a6595e287762dc1 Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Wed, 18 Mar 2026 10:22:59 -0700 Subject: [PATCH 03/10] fix(dao): validate cutoffTimestamp format to prevent SQL injection Add format validation in createBatchSoftDeleteAssetSql() that rejects any cutoffTimestamp not matching yyyy-MM-dd HH:mm:ss.SSS pattern with IllegalArgumentException. Defense-in-depth for shared library API surface. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metadata/dao/utils/SQLStatementUtils.java | 6 ++++++ .../linkedin/metadata/dao/EbeanLocalAccessTest.java | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 279499371..0268281d4 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -44,6 +44,8 @@ public class SQLStatementUtils { .addEscape('\'', "''") .addEscape('\\', "\\\\").build(); + private static final String TIMESTAMP_FORMAT_PATTERN = "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}"; + public static final String SOFT_DELETED_CHECK = "JSON_EXTRACT(%s, '$.gma_deleted') IS NULL"; // true when not soft deleted public static final String DELETED_TS_IS_NULL_CHECK = "deleted_ts IS NULL"; // true when the deleted_ts is NULL, meaning the record is not soft deleted @@ -340,6 +342,10 @@ public static String createReadAllColumnsByUrnsSql(@Nonnull List */ public static String createBatchSoftDeleteAssetSql(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + if (!cutoffTimestamp.matches(TIMESTAMP_FORMAT_PATTERN)) { + throw new IllegalArgumentException( + "cutoffTimestamp must be in yyyy-MM-dd HH:mm:ss.SSS format, got: " + cutoffTimestamp); + } final Urn firstUrn = urns.get(0); final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); final String urnList = urns.stream() diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 6222ddc78..089a7a015 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -726,4 +726,16 @@ public void testBatchSoftDeleteAssetsMixedEligibility() { SqlRow row642 = _server.createSqlQuery("SELECT deleted_ts FROM metadata_entity_foo WHERE urn = 'urn:li:foo:642'").findOne(); assertNotNull(row642.getTimestamp("deleted_ts")); } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBatchSoftDeleteAssetsRejectsInvalidTimestampFormat() { + List urns = Collections.singletonList(makeFooUrn(0)); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01T00:00:00Z", false); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBatchSoftDeleteAssetsRejectsSqlInjection() { + List urns = Collections.singletonList(makeFooUrn(0)); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", false); + } } \ No newline at end of file From be1c3d148f1c1dc6c93b3da40b47da54bc259716 Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Wed, 18 Mar 2026 14:10:42 -0700 Subject: [PATCH 04/10] feat(dao): add batch deletion delegation methods to EbeanLocalDAO Add readDeletionInfoBatch() and batchSoftDeleteAssets() delegation methods so BaseAssetServiceImpl can access the new DAO operations through EbeanLocalDAO (which it already uses for all other operations). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../com/linkedin/metadata/dao/EbeanLocalDAO.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 389c4c809..61e5ff091 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -750,6 +750,20 @@ protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) { return _localAccess.softDeleteAsset(urn, isTestMode); } + /** + * Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}. + */ + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + return _localAccess.readDeletionInfoBatch(urns, isTestMode); + } + + /** + * Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}. + */ + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode); + } + @Override public void updateEntityTables(@Nonnull URN urn, @Nonnull Class aspectClass) { if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) { From 960e52f35923aa7a1039a300418efed03f26a08e Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Tue, 24 Mar 2026 13:40:17 -0700 Subject: [PATCH 05/10] refactor(dao): parameterize Status column name in batch deletion SQL Replace hardcoded `a_status` column name with a `statusColumnName` parameter throughout the batch deletion DAO layer. Different entity types may map the Status aspect to different column names (e.g. `a_foo_bar` instead of `a_status`), so the caller must resolve and pass the correct column name. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metadata/dao/EbeanLocalAccess.java | 11 +++++--- .../linkedin/metadata/dao/EbeanLocalDAO.java | 10 ++++--- .../metadata/dao/IEbeanLocalAccess.java | 8 ++++-- .../dao/InstrumentedEbeanLocalAccess.java | 10 ++++--- .../metadata/dao/utils/EBeanDAOUtils.java | 14 +++++----- .../metadata/dao/utils/SQLStatementUtils.java | 10 ++++--- .../metadata/dao/EbeanLocalAccessTest.java | 26 +++++++++---------- .../dao/InstrumentedEbeanLocalAccessTest.java | 12 ++++----- 8 files changed, 58 insertions(+), 43 deletions(-) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index b9c28fbc5..e64e1aeca 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -345,23 +345,26 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { } @Override - public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + public Map readDeletionInfoBatch(@Nonnull List urns, + @Nonnull String statusColumnName, boolean isTestMode) { if (urns.isEmpty()) { return Collections.emptyMap(); } final String sql = SQLStatementUtils.createReadAllColumnsByUrnsSql(urns, isTestMode); return EBeanDAOUtils.convertSqlRowsToEntityDeletionInfoMap( - _server.createSqlQuery(sql).findList(), _urnClass); + _server.createSqlQuery(sql).findList(), _urnClass, statusColumnName); } @Override - public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, + @Nonnull String statusColumnName, boolean isTestMode) { if (urns.isEmpty()) { return 0; } - final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, isTestMode); + final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, statusColumnName, + isTestMode); return _server.createSqlUpdate(sql).execute(); } diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 61e5ff091..a7e48fce6 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -753,15 +753,17 @@ protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) { /** * Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}. */ - public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { - return _localAccess.readDeletionInfoBatch(urns, isTestMode); + public Map readDeletionInfoBatch(@Nonnull List urns, + @Nonnull String statusColumnName, boolean isTestMode) { + return _localAccess.readDeletionInfoBatch(urns, statusColumnName, isTestMode); } /** * Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}. */ - public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { - return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode); + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, + @Nonnull String statusColumnName, boolean isTestMode) { + return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, statusColumnName, isTestMode); } @Override diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 0ceb0723b..5e37a6310 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -103,10 +103,12 @@ List batchGetUnion(@Nonnull * URNs not found in the database will not have entries in the returned map. * * @param urns list of URNs to check + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @param isTestMode whether to use test schema * @return map of URN to {@link EntityDeletionInfo} */ - Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode); + Map readDeletionInfoBatch(@Nonnull List urns, @Nonnull String statusColumnName, + boolean isTestMode); /** * Batch soft-delete entities by setting deleted_ts = NOW() for URNs that meet all deletion criteria. @@ -115,10 +117,12 @@ List batchGetUnion(@Nonnull * * @param urns list of URNs to soft-delete * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @param isTestMode whether to use test schema * @return number of rows actually soft-deleted */ - int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode); + int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, + @Nonnull String statusColumnName, boolean isTestMode); /** * Returns list of urns that satisfy the given filter conditions. diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java index d9bda4898..50452dc80 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java @@ -100,15 +100,17 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { } @Override - public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + public Map readDeletionInfoBatch(@Nonnull List urns, + @Nonnull String statusColumnName, boolean isTestMode) { return instrument("readDeletionInfoBatch.urns_" + urns.size(), - () -> _delegate.readDeletionInfoBatch(urns, isTestMode)); + () -> _delegate.readDeletionInfoBatch(urns, statusColumnName, isTestMode)); } @Override - public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, + @Nonnull String statusColumnName, boolean isTestMode) { return instrument("batchSoftDeleteAssets.urns_" + urns.size(), - () -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode)); + () -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, statusColumnName, isTestMode)); } @Override diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java index d118b01e7..bbc23be89 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java @@ -261,21 +261,22 @@ public static List readSqlR /** * Parse a list of {@link SqlRow} results (from a SELECT * on an entity table) into a map of - * URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and a_status columns. + * URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and the Status aspect column. * Rows that cannot be parsed as a valid URN are skipped with a warning. * * @param sqlRows list of {@link SqlRow} from entity table query * @param urnClass URN class for deserialization + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @param URN type * @return map of URN to {@link EntityDeletionInfo} */ public static Map convertSqlRowsToEntityDeletionInfoMap( - @Nonnull List sqlRows, @Nonnull Class urnClass) { + @Nonnull List sqlRows, @Nonnull Class urnClass, @Nonnull String statusColumnName) { final Map result = new HashMap<>(); for (SqlRow row : sqlRows) { final String urnStr = row.getString("urn"); try { - result.put(getUrn(urnStr, urnClass), toEntityDeletionInfo(row)); + result.put(getUrn(urnStr, urnClass), toEntityDeletionInfo(row, statusColumnName)); } catch (IllegalArgumentException e) { log.warn("Failed to parse URN string: {}, skipping row", urnStr, e); } @@ -285,14 +286,15 @@ public static Map convertSqlRowsToEnt /** * Parse a single {@link SqlRow} from an entity table SELECT * into an {@link EntityDeletionInfo}. - * Extracts deletion eligibility fields from a_status (statusRemoved, statusLastModifiedOn) + * Extracts deletion eligibility fields from the Status aspect column (statusRemoved, statusLastModifiedOn) * and collects all aspect columns for Kafka archival. * * @param row {@link SqlRow} from entity table query + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @return {@link EntityDeletionInfo} */ @Nonnull - static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row) { + static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row, @Nonnull String statusColumnName) { // Collect all aspect columns (a_* prefixed, non-null), same pattern as readSqlRows() final Map aspectColumnValues = new HashMap<>(); for (String key : row.keySet()) { @@ -303,7 +305,7 @@ static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row) { boolean statusRemoved = false; String statusLastModifiedOn = null; - final String statusJson = row.getString("a_status"); + final String statusJson = row.getString(statusColumnName); if (statusJson != null) { final DataMap statusData = RecordUtils.toDataMap(statusJson); final Object lastModObj = statusData.get("lastmodifiedon"); diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 0268281d4..7ca64c3f0 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -92,8 +92,8 @@ public class SQLStatementUtils { "UPDATE %s SET deleted_ts = NOW()" + " WHERE urn IN (%s)" + " AND deleted_ts IS NULL" - + " AND JSON_EXTRACT(a_status, '$.aspect.removed') = true" - + " AND JSON_EXTRACT(a_status, '$.lastmodifiedon') < '%s'"; + + " AND JSON_EXTRACT(%s, '$.aspect.removed') = true" + + " AND JSON_EXTRACT(%s, '$.lastmodifiedon') < '%s'"; // closing bracket for the sql statement INSERT prefix // e.g. INSERT INTO metadata_aspect (urn, a_urn, lastmodifiedon, lastmodifiedby) public static final String CLOSING_BRACKET = ") "; @@ -337,11 +337,12 @@ public static String createReadAllColumnsByUrnsSql(@Nonnull List * * @param urns list of URNs to soft-delete * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp + * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @param isTestMode whether the test mode is enabled or not * @return batch soft-delete sql */ public static String createBatchSoftDeleteAssetSql(@Nonnull List urns, - @Nonnull String cutoffTimestamp, boolean isTestMode) { + @Nonnull String cutoffTimestamp, @Nonnull String statusColumnName, boolean isTestMode) { if (!cutoffTimestamp.matches(TIMESTAMP_FORMAT_PATTERN)) { throw new IllegalArgumentException( "cutoffTimestamp must be in yyyy-MM-dd HH:mm:ss.SSS format, got: " + cutoffTimestamp); @@ -351,7 +352,8 @@ public static String createBatchSoftDeleteAssetSql(@Nonnull List final String urnList = urns.stream() .map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'") .collect(Collectors.joining(", ")); - return String.format(SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE, tableName, urnList, cutoffTimestamp); + return String.format(SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE, tableName, urnList, statusColumnName, + statusColumnName, cutoffTimestamp); } /** diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 089a7a015..b05c33a4e 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -558,7 +558,7 @@ public void testReadDeletionInfoBatchHappyPath() { urns.add(makeFooUrn(502)); // When - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); // Then: all 3 returned with correct fields assertEquals(result.size(), 3); @@ -577,14 +577,14 @@ public void testReadDeletionInfoBatchHappyPath() { @Test public void testReadDeletionInfoBatchEmptyList() { - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), "a_status", false); assertTrue(result.isEmpty()); } @Test public void testReadDeletionInfoBatchNonExistentUrns() { List urns = Collections.singletonList(makeFooUrn(9998)); - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); // Non-existent URNs are simply absent from the map assertFalse(result.containsKey(makeFooUrn(9998))); } @@ -598,7 +598,7 @@ public void testReadDeletionInfoBatchMixedExistAndNonExist() { urns.add(makeFooUrn(510)); // exists urns.add(makeFooUrn(9997)); // does not exist - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); assertEquals(result.size(), 1); assertTrue(result.containsKey(makeFooUrn(510))); @@ -611,7 +611,7 @@ public void testReadDeletionInfoBatchAlreadySoftDeleted() { insertFooEntityWithStatus(520, statusJson, "2025-06-01 00:00:00.000"); List urns = Collections.singletonList(makeFooUrn(520)); - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); assertEquals(result.size(), 1); EntityDeletionInfo info = result.get(makeFooUrn(520)); @@ -636,7 +636,7 @@ public void testBatchSoftDeleteAssetsHappyPath() { String cutoffTimestamp = "2026-01-01 00:00:00.000"; // When - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, cutoffTimestamp, false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, cutoffTimestamp, "a_status", false); // Then: both rows soft-deleted assertEquals(rowsAffected, 2); @@ -650,7 +650,7 @@ public void testBatchSoftDeleteAssetsHappyPath() { @Test public void testBatchSoftDeleteAssetsEmptyList() { - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", "a_status", false); assertEquals(rowsAffected, 0); } @@ -662,7 +662,7 @@ public void testBatchSoftDeleteAssetsGuardsStatusNotRemoved() { List urns = Collections.singletonList(makeFooUrn(610)); // When: cutoff is in the future (would pass retention check) - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", "a_status", false); // Then: guard clause prevents deletion assertEquals(rowsAffected, 0); @@ -679,7 +679,7 @@ public void testBatchSoftDeleteAssetsGuardsRetentionNotMet() { List urns = Collections.singletonList(makeFooUrn(620)); // When: cutoff is BEFORE the lastmodifiedon (retention window not met) - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-02-01 00:00:00.000", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-02-01 00:00:00.000", "a_status", false); // Then: guard clause prevents deletion assertEquals(rowsAffected, 0); @@ -693,7 +693,7 @@ public void testBatchSoftDeleteAssetsGuardsAlreadyDeleted() { List urns = Collections.singletonList(makeFooUrn(630)); // When - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", "a_status", false); // Then: guard clause prevents re-deletion assertEquals(rowsAffected, 0); @@ -713,7 +713,7 @@ public void testBatchSoftDeleteAssetsMixedEligibility() { urns.add(makeFooUrn(642)); // When - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", "a_status", false); // Then: only 2 eligible rows deleted assertEquals(rowsAffected, 2); @@ -730,12 +730,12 @@ public void testBatchSoftDeleteAssetsMixedEligibility() { @Test(expectedExceptions = IllegalArgumentException.class) public void testBatchSoftDeleteAssetsRejectsInvalidTimestampFormat() { List urns = Collections.singletonList(makeFooUrn(0)); - _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01T00:00:00Z", false); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01T00:00:00Z", "a_status", false); } @Test(expectedExceptions = IllegalArgumentException.class) public void testBatchSoftDeleteAssetsRejectsSqlInjection() { List urns = Collections.singletonList(makeFooUrn(0)); - _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", false); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", "a_status", false); } } \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java index 842bc5441..f8efc4261 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java @@ -219,26 +219,26 @@ public void testSetUrnPathExtractorDelegates() { @Test public void testReadDeletionInfoBatchDelegatesAndRecordsLatency() { Map expected = new HashMap<>(); - when(_mockDelegate.readDeletionInfoBatch(any(), anyBoolean())).thenReturn(expected); + when(_mockDelegate.readDeletionInfoBatch(any(), any(), anyBoolean())).thenReturn(expected); List urns = Collections.singletonList(null); - Map result = _instrumented.readDeletionInfoBatch(urns, false); + Map result = _instrumented.readDeletionInfoBatch(urns, "a_status", false); assertSame(result, expected); - verify(_mockDelegate).readDeletionInfoBatch(urns, false); + verify(_mockDelegate).readDeletionInfoBatch(urns, "a_status", false); verify(_mockMetrics).recordOperationLatency(eq("readDeletionInfoBatch.urns_1"), eq("test"), anyLong()); verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); } @Test public void testBatchSoftDeleteAssetsDelegatesAndRecordsLatency() { - when(_mockDelegate.batchSoftDeleteAssets(any(), any(), anyBoolean())).thenReturn(5); + when(_mockDelegate.batchSoftDeleteAssets(any(), any(), any(), anyBoolean())).thenReturn(5); List urns = Collections.singletonList(null); - int result = _instrumented.batchSoftDeleteAssets(urns, "2026-01-01", false); + int result = _instrumented.batchSoftDeleteAssets(urns, "2026-01-01", "a_status", false); assertEquals(result, 5); - verify(_mockDelegate).batchSoftDeleteAssets(urns, "2026-01-01", false); + verify(_mockDelegate).batchSoftDeleteAssets(urns, "2026-01-01", "a_status", false); verify(_mockMetrics).recordOperationLatency(eq("batchSoftDeleteAssets.urns_1"), eq("test"), anyLong()); verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); } From d3c2a53d46743c8f36f0f0d0ca814cf217d2ecac Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Tue, 24 Mar 2026 13:47:52 -0700 Subject: [PATCH 06/10] refactor(dao): replace SELECT * with explicit column list in batch deletion read Replace SELECT * with an explicit column list (urn, deleted_ts, a_*) in readDeletionInfoBatch to avoid fetching index columns (i_*) and other derived columns unnecessarily. Uses SchemaValidatorUtil's cached column metadata to resolve the column list at runtime. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../linkedin/metadata/dao/EbeanLocalAccess.java | 15 ++++++++++++++- .../metadata/dao/utils/SQLStatementUtils.java | 16 ++++++++++------ .../metadata/dao/utils/SchemaValidatorUtil.java | 15 +++++++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index e64e1aeca..93624705c 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -351,11 +351,24 @@ public Map readDeletionInfoBatch(@Nonnull List urn return Collections.emptyMap(); } - final String sql = SQLStatementUtils.createReadAllColumnsByUrnsSql(urns, isTestMode); + final Urn firstUrn = urns.get(0); + final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); + final List columns = getDeletionInfoColumns(tableName); + final String sql = SQLStatementUtils.createReadDeletionInfoByUrnsSql(urns, columns, isTestMode); return EBeanDAOUtils.convertSqlRowsToEntityDeletionInfoMap( _server.createSqlQuery(sql).findList(), _urnClass, statusColumnName); } + /** + * Returns the column list needed for batch deletion info: urn, deleted_ts, and all aspect columns (a_*). + * Excludes index columns (i_*) and other derived columns to reduce data transfer. + */ + private List getDeletionInfoColumns(@Nonnull String tableName) { + return validator.getColumns(tableName).stream() + .filter(c -> c.equals("urn") || c.equals("deleted_ts") || c.startsWith(ASPECT_PREFIX)) + .collect(Collectors.toList()); + } + @Override public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, @Nonnull String statusColumnName, boolean isTestMode) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java index 7ca64c3f0..c8877c25e 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java @@ -86,7 +86,7 @@ public class SQLStatementUtils { // Delete prefix of the sql statement for deleting from metadata_aspect table public static final String SQL_SOFT_DELETE_ASSET_WITH_URN = "UPDATE %s SET deleted_ts = NOW() WHERE urn = '%s';"; - private static final String SQL_READ_ALL_COLUMNS_BY_URNS_TEMPLATE = "SELECT * FROM %s WHERE urn IN (%s)"; + private static final String SQL_READ_COLUMNS_BY_URNS_TEMPLATE = "SELECT %s FROM %s WHERE urn IN (%s)"; private static final String SQL_BATCH_SOFT_DELETE_ASSET_TEMPLATE = "UPDATE %s SET deleted_ts = NOW()" @@ -314,20 +314,24 @@ public static String createSoftDeleteAssetSql(@N } /** - * Create SELECT * SQL statement for reading all columns for a batch of URNs. - * Used by batch deletion to read entity data for validation and Kafka archival. + * Create SELECT SQL statement for reading deletion-relevant columns for a batch of URNs. + * Selects only urn, deleted_ts, and aspect columns (a_* prefix), excluding index columns (i_*) + * and other derived columns to reduce data transfer. * * @param urns list of URNs to read + * @param columns list of column names to select * @param isTestMode whether the test mode is enabled or not - * @return select all columns sql + * @return select columns sql */ - public static String createReadAllColumnsByUrnsSql(@Nonnull List urns, boolean isTestMode) { + public static String createReadDeletionInfoByUrnsSql(@Nonnull List urns, + @Nonnull List columns, boolean isTestMode) { final Urn firstUrn = urns.get(0); final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); final String urnList = urns.stream() .map(urn -> "'" + escapeReservedCharInUrn(urn.toString()) + "'") .collect(Collectors.joining(", ")); - return String.format(SQL_READ_ALL_COLUMNS_BY_URNS_TEMPLATE, tableName, urnList); + final String columnList = String.join(", ", columns); + return String.format(SQL_READ_COLUMNS_BY_URNS_TEMPLATE, columnList, tableName, urnList); } /** diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java index 2a7aacaa2..83d4012fb 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SchemaValidatorUtil.java @@ -98,6 +98,21 @@ public boolean columnExists(@Nonnull String tableName, @Nonnull String columnNam return columns.contains(lowerColumn); } + /** + * Returns all column names (lowercase) for the given table, using the cache. + * + * @param tableName Table name + * @return set of lowercase column names + */ + @Nonnull + public Set getColumns(@Nonnull String tableName) { + String lowerTable = tableName.toLowerCase(); + return columnCache.get(lowerTable, tbl -> { + log.info("Refreshing column cache for table '{}'", tbl); + return loadColumns(tbl); + }); + } + /** * Checks whether the given index exists in the specified table. * From c448571328ae06fcb74608e28ded8a6f07b475fc Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Tue, 24 Mar 2026 14:32:48 -0700 Subject: [PATCH 07/10] fix(dao): add batch size guard to batch deletion DAO methods Reject batches exceeding 2000 URNs in readDeletionInfoBatch and batchSoftDeleteAssets as defense-in-depth against overwhelming the DB. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../java/com/linkedin/metadata/dao/EbeanLocalAccess.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 93624705c..38583b79f 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -72,6 +72,7 @@ public class EbeanLocalAccess implements IEbeanLocalAccess // TODO confirm if the default page size is 1000 in other code context. private static final int DEFAULT_PAGE_SIZE = 1000; + private static final int MAX_BATCH_DELETE_SIZE = 2000; private static final String ASPECT_JSON_PLACEHOLDER = "__PLACEHOLDER__"; private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN"; private static final String EBEAN_SERVER_CONFIG = "EbeanServerConfig"; @@ -350,6 +351,10 @@ public Map readDeletionInfoBatch(@Nonnull List urn if (urns.isEmpty()) { return Collections.emptyMap(); } + if (urns.size() > MAX_BATCH_DELETE_SIZE) { + throw new IllegalArgumentException( + String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE)); + } final Urn firstUrn = urns.get(0); final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); @@ -375,6 +380,10 @@ public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoff if (urns.isEmpty()) { return 0; } + if (urns.size() > MAX_BATCH_DELETE_SIZE) { + throw new IllegalArgumentException( + String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE)); + } final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, statusColumnName, isTestMode); From 178181dec34ae8c6f9f62e34e9ac1f416f6d8d70 Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Tue, 24 Mar 2026 14:43:57 -0700 Subject: [PATCH 08/10] fix(dao): update docs to reflect parameterized column name and explicit SELECT Update spec and Javadocs to reflect that readDeletionInfoBatch uses an explicit column list (not SELECT *), statusColumnName is caller-provided (not hardcoded a_status), and both methods enforce a 2000 URN batch size limit. Fix missing trailing newline in test file. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metadata/dao/utils/EBeanDAOUtils.java | 4 ++-- .../metadata/dao/EbeanLocalAccessTest.java | 2 +- spec/batch_delete_dao_changes.md | 19 +++++++++++-------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java index bbc23be89..fdf84106f 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java @@ -260,7 +260,7 @@ public static List readSqlR } /** - * Parse a list of {@link SqlRow} results (from a SELECT * on an entity table) into a map of + * Parse a list of {@link SqlRow} results from an entity table into a map of * URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and the Status aspect column. * Rows that cannot be parsed as a valid URN are skipped with a warning. * @@ -285,7 +285,7 @@ public static Map convertSqlRowsToEnt } /** - * Parse a single {@link SqlRow} from an entity table SELECT * into an {@link EntityDeletionInfo}. + * Parse a single {@link SqlRow} from an entity table query into an {@link EntityDeletionInfo}. * Extracts deletion eligibility fields from the Status aspect column (statusRemoved, statusLastModifiedOn) * and collects all aspect columns for Kafka archival. * diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index b05c33a4e..41e96841c 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -738,4 +738,4 @@ public void testBatchSoftDeleteAssetsRejectsSqlInjection() { List urns = Collections.singletonList(makeFooUrn(0)); _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", "a_status", false); } -} \ No newline at end of file +} diff --git a/spec/batch_delete_dao_changes.md b/spec/batch_delete_dao_changes.md index f5d154317..676dbd674 100644 --- a/spec/batch_delete_dao_changes.md +++ b/spec/batch_delete_dao_changes.md @@ -31,13 +31,14 @@ Presence in the returned map means the entity exists. Absence means it was not f ### Two new `IEbeanLocalAccess` methods -**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single `SELECT *`. Returns a map of -URN → `EntityDeletionInfo` for all URNs found. URNs not present in the DB are simply absent from the result — the caller -treats absence as "not found." +**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single SELECT (explicit column list: +`urn`, `deleted_ts`, and all `a_*` aspect columns — index columns are excluded). The caller passes the Status aspect's +column name so the DAO can parse it without hardcoding `a_status`. Returns a map of URN → `EntityDeletionInfo` for all +URNs found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not found." -**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The WHERE clause embeds all safety guard -conditions (not already deleted, Status.removed = true, lastmodifiedon before cutoff) as defense-in-depth against race -conditions between the SELECT and UPDATE. +**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The caller passes the Status aspect's +column name, which is used in the WHERE clause guard conditions (not already deleted, Status.removed = true, +lastmodifiedon before cutoff) as defense-in-depth against race conditions between the SELECT and UPDATE. ### Layered implementation (no logic in `EbeanLocalAccess`) @@ -108,5 +109,7 @@ and records latency via `BaseDaoBenchmarkMetrics`. No database required. - **Exactly 2 DB calls per batch.** No per-URN queries. - **Guard clauses in the UPDATE.** Even if a caller skips the SELECT validation, the UPDATE will not soft-delete entities that don't meet all safety conditions. -- **`a_status` column is the only schema assumption.** The two methods rely on `a_status` existing in the entity table. - All other aspect columns are collected generically by column name prefix. +- **No hardcoded column names.** The Status aspect column name is passed by the caller, so entity types that map Status + to a different column (e.g. `a_foo_bar`) work without changes. All other aspect columns are collected generically by + the `a_` prefix. +- **Batch size limit.** Both methods reject batches exceeding 2000 URNs as defense-in-depth against overwhelming the DB. From c8635a4c1cf3a0a4da152702e67a8bcb0fc6a0cb Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Tue, 24 Mar 2026 15:27:49 -0700 Subject: [PATCH 09/10] style: fix spotless markdown formatting in spec Co-Authored-By: Claude Opus 4.6 (1M context) --- spec/batch_delete_dao_changes.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spec/batch_delete_dao_changes.md b/spec/batch_delete_dao_changes.md index 676dbd674..e1fe72fc3 100644 --- a/spec/batch_delete_dao_changes.md +++ b/spec/batch_delete_dao_changes.md @@ -31,10 +31,11 @@ Presence in the returned map means the entity exists. Absence means it was not f ### Two new `IEbeanLocalAccess` methods -**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single SELECT (explicit column list: -`urn`, `deleted_ts`, and all `a_*` aspect columns — index columns are excluded). The caller passes the Status aspect's -column name so the DAO can parse it without hardcoding `a_status`. Returns a map of URN → `EntityDeletionInfo` for all -URNs found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not found." +**`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single SELECT (explicit column +list: `urn`, `deleted_ts`, and all `a_*` aspect columns — index columns are excluded). The caller passes the Status +aspect's column name so the DAO can parse it without hardcoding `a_status`. Returns a map of URN → `EntityDeletionInfo` +for all URNs found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not +found." **`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The caller passes the Status aspect's column name, which is used in the WHERE clause guard conditions (not already deleted, Status.removed = true, From 69e341f6fc2f7115d0834bbd04d5a2fa92c10a2d Mon Sep 17 00:00:00 2001 From: Natallia Ulashchick Date: Fri, 27 Mar 2026 09:21:48 -0700 Subject: [PATCH 10/10] refactor(dao): resolve Status column name internally in EbeanLocalAccess Remove statusColumnName parameter from public DAO API. The Status aspect column name is now resolved internally via SQLSchemaUtils.getAspectColumnName(), consistent with how all other DAO methods resolve column names. Add test stub com.linkedin.common.Status PDL and FooAsset PDL so that the test entity type can resolve the Status column name through the standard GlobalAssetRegistry path. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metadata/dao/EbeanLocalAccess.java | 16 ++++++++--- .../linkedin/metadata/dao/EbeanLocalDAO.java | 10 +++---- .../metadata/dao/IEbeanLocalAccess.java | 11 +++----- .../dao/InstrumentedEbeanLocalAccess.java | 10 +++---- .../metadata/dao/EbeanLocalAccessTest.java | 28 ++++++++++--------- .../dao/InstrumentedEbeanLocalAccessTest.java | 12 ++++---- spec/batch_delete_dao_changes.md | 17 ++++++----- .../pegasus/com/linkedin/common/Status.pdl | 10 +++++++ .../pegasus/com/linkedin/testing/FooAsset.pdl | 23 +++++++++++++++ 9 files changed, 86 insertions(+), 51 deletions(-) create mode 100644 testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl create mode 100644 testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 38583b79f..6b9a312b8 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -73,6 +73,7 @@ public class EbeanLocalAccess implements IEbeanLocalAccess // TODO confirm if the default page size is 1000 in other code context. private static final int DEFAULT_PAGE_SIZE = 1000; private static final int MAX_BATCH_DELETE_SIZE = 2000; + private static final String STATUS_ASPECT_FQCN = "com.linkedin.common.Status"; private static final String ASPECT_JSON_PLACEHOLDER = "__PLACEHOLDER__"; private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN"; private static final String EBEAN_SERVER_CONFIG = "EbeanServerConfig"; @@ -346,8 +347,7 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { } @Override - public Map readDeletionInfoBatch(@Nonnull List urns, - @Nonnull String statusColumnName, boolean isTestMode) { + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { if (urns.isEmpty()) { return Collections.emptyMap(); } @@ -356,6 +356,7 @@ public Map readDeletionInfoBatch(@Nonnull List urn String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE)); } + final String statusColumnName = getStatusColumnName(); final Urn firstUrn = urns.get(0); final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn); final List columns = getDeletionInfoColumns(tableName); @@ -374,9 +375,15 @@ private List getDeletionInfoColumns(@Nonnull String tableName) { .collect(Collectors.toList()); } + /** + * Resolves the entity table column name for the Status aspect via {@link SQLSchemaUtils}. + */ + private String getStatusColumnName() { + return getAspectColumnName(_entityType, STATUS_ASPECT_FQCN); + } + @Override - public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, - @Nonnull String statusColumnName, boolean isTestMode) { + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { if (urns.isEmpty()) { return 0; } @@ -385,6 +392,7 @@ public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoff String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE)); } + final String statusColumnName = getStatusColumnName(); final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, statusColumnName, isTestMode); return _server.createSqlUpdate(sql).execute(); diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index a7e48fce6..61e5ff091 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -753,17 +753,15 @@ protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) { /** * Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}. */ - public Map readDeletionInfoBatch(@Nonnull List urns, - @Nonnull String statusColumnName, boolean isTestMode) { - return _localAccess.readDeletionInfoBatch(urns, statusColumnName, isTestMode); + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { + return _localAccess.readDeletionInfoBatch(urns, isTestMode); } /** * Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}. */ - public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, - @Nonnull String statusColumnName, boolean isTestMode) { - return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, statusColumnName, isTestMode); + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { + return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode); } @Override diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 5e37a6310..d807522f4 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -103,26 +103,23 @@ List batchGetUnion(@Nonnull * URNs not found in the database will not have entries in the returned map. * * @param urns list of URNs to check - * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @param isTestMode whether to use test schema * @return map of URN to {@link EntityDeletionInfo} */ - Map readDeletionInfoBatch(@Nonnull List urns, @Nonnull String statusColumnName, - boolean isTestMode); + Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode); /** * Batch soft-delete entities by setting deleted_ts = NOW() for URNs that meet all deletion criteria. * The UPDATE includes guard clauses (deleted_ts IS NULL, Status.removed = true, lastmodifiedon < cutoff) - * as defense-in-depth against race conditions. + * as defense-in-depth against race conditions. The Status aspect column name is resolved internally via + * {@link com.linkedin.metadata.dao.utils.SQLSchemaUtils#getAspectColumnName}. * * @param urns list of URNs to soft-delete * @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp - * @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status") * @param isTestMode whether to use test schema * @return number of rows actually soft-deleted */ - int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, - @Nonnull String statusColumnName, boolean isTestMode); + int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode); /** * Returns list of urns that satisfy the given filter conditions. diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java index 50452dc80..d9bda4898 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java @@ -100,17 +100,15 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { } @Override - public Map readDeletionInfoBatch(@Nonnull List urns, - @Nonnull String statusColumnName, boolean isTestMode) { + public Map readDeletionInfoBatch(@Nonnull List urns, boolean isTestMode) { return instrument("readDeletionInfoBatch.urns_" + urns.size(), - () -> _delegate.readDeletionInfoBatch(urns, statusColumnName, isTestMode)); + () -> _delegate.readDeletionInfoBatch(urns, isTestMode)); } @Override - public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, - @Nonnull String statusColumnName, boolean isTestMode) { + public int batchSoftDeleteAssets(@Nonnull List urns, @Nonnull String cutoffTimestamp, boolean isTestMode) { return instrument("batchSoftDeleteAssets.urns_" + urns.size(), - () -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, statusColumnName, isTestMode)); + () -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode)); } @Override diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 41e96841c..ecf40f924 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -20,6 +20,7 @@ import com.linkedin.testing.AspectBar; import com.linkedin.testing.AspectBaz; import com.linkedin.testing.AspectFoo; +import com.linkedin.testing.FooAsset; import com.linkedin.testing.urn.BurgerUrn; import com.linkedin.testing.urn.FooUrn; import io.ebean.Ebean; @@ -72,6 +73,7 @@ public static Object[][] inputList() { @BeforeClass public void init() { + GlobalAssetRegistry.register(FooUrn.ENTITY_TYPE, FooAsset.class); _server = EmbeddedMariaInstance.getServer(EbeanLocalAccessTest.class.getSimpleName()); _ebeanLocalAccessFoo = new EbeanLocalAccess<>(_server, EmbeddedMariaInstance.SERVER_CONFIG_MAP.get(_server.getName()), FooUrn.class, new FooUrnPathExtractor(), _ebeanConfig.isNonDollarVirtualColumnsEnabled()); @@ -558,7 +560,7 @@ public void testReadDeletionInfoBatchHappyPath() { urns.add(makeFooUrn(502)); // When - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); // Then: all 3 returned with correct fields assertEquals(result.size(), 3); @@ -577,14 +579,14 @@ public void testReadDeletionInfoBatchHappyPath() { @Test public void testReadDeletionInfoBatchEmptyList() { - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), "a_status", false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(Collections.emptyList(), false); assertTrue(result.isEmpty()); } @Test public void testReadDeletionInfoBatchNonExistentUrns() { List urns = Collections.singletonList(makeFooUrn(9998)); - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); // Non-existent URNs are simply absent from the map assertFalse(result.containsKey(makeFooUrn(9998))); } @@ -598,7 +600,7 @@ public void testReadDeletionInfoBatchMixedExistAndNonExist() { urns.add(makeFooUrn(510)); // exists urns.add(makeFooUrn(9997)); // does not exist - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); assertEquals(result.size(), 1); assertTrue(result.containsKey(makeFooUrn(510))); @@ -611,7 +613,7 @@ public void testReadDeletionInfoBatchAlreadySoftDeleted() { insertFooEntityWithStatus(520, statusJson, "2025-06-01 00:00:00.000"); List urns = Collections.singletonList(makeFooUrn(520)); - Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, "a_status", false); + Map result = _ebeanLocalAccessFoo.readDeletionInfoBatch(urns, false); assertEquals(result.size(), 1); EntityDeletionInfo info = result.get(makeFooUrn(520)); @@ -636,7 +638,7 @@ public void testBatchSoftDeleteAssetsHappyPath() { String cutoffTimestamp = "2026-01-01 00:00:00.000"; // When - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, cutoffTimestamp, "a_status", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, cutoffTimestamp, false); // Then: both rows soft-deleted assertEquals(rowsAffected, 2); @@ -650,7 +652,7 @@ public void testBatchSoftDeleteAssetsHappyPath() { @Test public void testBatchSoftDeleteAssetsEmptyList() { - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", "a_status", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(Collections.emptyList(), "2026-01-01 00:00:00.000", false); assertEquals(rowsAffected, 0); } @@ -662,7 +664,7 @@ public void testBatchSoftDeleteAssetsGuardsStatusNotRemoved() { List urns = Collections.singletonList(makeFooUrn(610)); // When: cutoff is in the future (would pass retention check) - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", "a_status", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); // Then: guard clause prevents deletion assertEquals(rowsAffected, 0); @@ -679,7 +681,7 @@ public void testBatchSoftDeleteAssetsGuardsRetentionNotMet() { List urns = Collections.singletonList(makeFooUrn(620)); // When: cutoff is BEFORE the lastmodifiedon (retention window not met) - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-02-01 00:00:00.000", "a_status", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-02-01 00:00:00.000", false); // Then: guard clause prevents deletion assertEquals(rowsAffected, 0); @@ -693,7 +695,7 @@ public void testBatchSoftDeleteAssetsGuardsAlreadyDeleted() { List urns = Collections.singletonList(makeFooUrn(630)); // When - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", "a_status", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); // Then: guard clause prevents re-deletion assertEquals(rowsAffected, 0); @@ -713,7 +715,7 @@ public void testBatchSoftDeleteAssetsMixedEligibility() { urns.add(makeFooUrn(642)); // When - int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", "a_status", false); + int rowsAffected = _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01 00:00:00.000", false); // Then: only 2 eligible rows deleted assertEquals(rowsAffected, 2); @@ -730,12 +732,12 @@ public void testBatchSoftDeleteAssetsMixedEligibility() { @Test(expectedExceptions = IllegalArgumentException.class) public void testBatchSoftDeleteAssetsRejectsInvalidTimestampFormat() { List urns = Collections.singletonList(makeFooUrn(0)); - _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01T00:00:00Z", "a_status", false); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "2026-01-01T00:00:00Z", false); } @Test(expectedExceptions = IllegalArgumentException.class) public void testBatchSoftDeleteAssetsRejectsSqlInjection() { List urns = Collections.singletonList(makeFooUrn(0)); - _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", "a_status", false); + _ebeanLocalAccessFoo.batchSoftDeleteAssets(urns, "'; DROP TABLE metadata_entity_foo; --", false); } } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java index f8efc4261..842bc5441 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccessTest.java @@ -219,26 +219,26 @@ public void testSetUrnPathExtractorDelegates() { @Test public void testReadDeletionInfoBatchDelegatesAndRecordsLatency() { Map expected = new HashMap<>(); - when(_mockDelegate.readDeletionInfoBatch(any(), any(), anyBoolean())).thenReturn(expected); + when(_mockDelegate.readDeletionInfoBatch(any(), anyBoolean())).thenReturn(expected); List urns = Collections.singletonList(null); - Map result = _instrumented.readDeletionInfoBatch(urns, "a_status", false); + Map result = _instrumented.readDeletionInfoBatch(urns, false); assertSame(result, expected); - verify(_mockDelegate).readDeletionInfoBatch(urns, "a_status", false); + verify(_mockDelegate).readDeletionInfoBatch(urns, false); verify(_mockMetrics).recordOperationLatency(eq("readDeletionInfoBatch.urns_1"), eq("test"), anyLong()); verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); } @Test public void testBatchSoftDeleteAssetsDelegatesAndRecordsLatency() { - when(_mockDelegate.batchSoftDeleteAssets(any(), any(), any(), anyBoolean())).thenReturn(5); + when(_mockDelegate.batchSoftDeleteAssets(any(), any(), anyBoolean())).thenReturn(5); List urns = Collections.singletonList(null); - int result = _instrumented.batchSoftDeleteAssets(urns, "2026-01-01", "a_status", false); + int result = _instrumented.batchSoftDeleteAssets(urns, "2026-01-01", false); assertEquals(result, 5); - verify(_mockDelegate).batchSoftDeleteAssets(urns, "2026-01-01", "a_status", false); + verify(_mockDelegate).batchSoftDeleteAssets(urns, "2026-01-01", false); verify(_mockMetrics).recordOperationLatency(eq("batchSoftDeleteAssets.urns_1"), eq("test"), anyLong()); verify(_mockMetrics, never()).recordOperationError(anyString(), anyString(), anyString()); } diff --git a/spec/batch_delete_dao_changes.md b/spec/batch_delete_dao_changes.md index e1fe72fc3..7fda662fa 100644 --- a/spec/batch_delete_dao_changes.md +++ b/spec/batch_delete_dao_changes.md @@ -32,13 +32,12 @@ Presence in the returned map means the entity exists. Absence means it was not f ### Two new `IEbeanLocalAccess` methods **`readDeletionInfoBatch`** — reads deletion-relevant fields for a list of URNs in a single SELECT (explicit column -list: `urn`, `deleted_ts`, and all `a_*` aspect columns — index columns are excluded). The caller passes the Status -aspect's column name so the DAO can parse it without hardcoding `a_status`. Returns a map of URN → `EntityDeletionInfo` -for all URNs found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not -found." +list: `urn`, `deleted_ts`, and all `a_*` aspect columns — index columns are excluded). The Status aspect column name is +resolved internally via `SQLSchemaUtils.getAspectColumnName()`. Returns a map of URN → `EntityDeletionInfo` for all URNs +found. URNs not present in the DB are simply absent from the result — the caller treats absence as "not found." -**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The caller passes the Status aspect's -column name, which is used in the WHERE clause guard conditions (not already deleted, Status.removed = true, +**`batchSoftDeleteAssets`** — soft-deletes a list of URNs in a single `UPDATE`. The Status aspect column name is +resolved internally, and is used in the WHERE clause guard conditions (not already deleted, Status.removed = true, lastmodifiedon before cutoff) as defense-in-depth against race conditions between the SELECT and UPDATE. ### Layered implementation (no logic in `EbeanLocalAccess`) @@ -110,7 +109,7 @@ and records latency via `BaseDaoBenchmarkMetrics`. No database required. - **Exactly 2 DB calls per batch.** No per-URN queries. - **Guard clauses in the UPDATE.** Even if a caller skips the SELECT validation, the UPDATE will not soft-delete entities that don't meet all safety conditions. -- **No hardcoded column names.** The Status aspect column name is passed by the caller, so entity types that map Status - to a different column (e.g. `a_foo_bar`) work without changes. All other aspect columns are collected generically by - the `a_` prefix. +- **No hardcoded column names.** The Status aspect column name is resolved internally via + `SQLSchemaUtils.getAspectColumnName()`, consistent with how all other DAO methods resolve column names. Entity types + that map Status to a different column (e.g. `a_foo_bar`) work without changes. - **Batch size limit.** Both methods reject batches exceeding 2000 URNs as defense-in-depth against overwhelming the DB. diff --git a/testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl b/testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl new file mode 100644 index 000000000..8d4ac4287 --- /dev/null +++ b/testing/test-models/src/main/pegasus/com/linkedin/common/Status.pdl @@ -0,0 +1,10 @@ +namespace com.linkedin.common + +/** + * Test stub for com.linkedin.common.Status — the real Status aspect lives in metadata-models. + * This stub exists so that batch deletion tests can resolve the Status column name via + * SQLSchemaUtils.getAspectColumnName() without depending on the downstream metadata-models MP. + */ +record Status { + removed: boolean = false +} diff --git a/testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl b/testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl new file mode 100644 index 000000000..97d323761 --- /dev/null +++ b/testing/test-models/src/main/pegasus/com/linkedin/testing/FooAsset.pdl @@ -0,0 +1,23 @@ +namespace com.linkedin.testing + +import com.linkedin.common.Status +import com.linkedin.testing.localrelationship.AspectFooBar +import com.linkedin.testing.localrelationship.AspectFooBaz +import com.linkedin.testing.localrelationship.AspectFooBarBaz + +/** + * For unit tests — asset model for FooUrn entity type. + * Must include all aspects used across test classes for the foo entity, + * since GlobalAssetRegistry is a static singleton shared across test classes. + */ +record FooAsset { + urn: optional FooUrn + aspectfoo: optional AspectFoo, + aspectbar: optional AspectBar, + aspectbaz: optional AspectBaz, + aspectfoobar: optional AspectFooBar, + aspectfoobaz: optional AspectFooBaz, + aspectfoobarbaz: optional AspectFooBarBaz, + aspectattributes: optional AspectAttributes, + status: optional Status +}