Skip to content

Commit b311a97

Browse files
Add batch deletion DAO support for stale metadata cleanup (#604)
* feat(dao): add batch deletion DAO support for stale metadata cleanup Add readDeletionInfoBatch() and batchSoftDeleteAssets() to IEbeanLocalAccess to support bulk soft-deletion with exactly 2 DB round-trips per batch. - EntityDeletionInfo: new @value @builder data class in dao-api holding deletion eligibility fields and aspect columns for Kafka archival - readDeletionInfoBatch: single SELECT * to read all URNs and parse a_status for statusRemoved/statusLastModifiedOn - batchSoftDeleteAssets: single guarded UPDATE with defense-in-depth WHERE clauses (deleted_ts IS NULL, removed=true, lastmodifiedon < cutoff) - SQL templates in SQLStatementUtils, SqlRow parsing in EBeanDAOUtils (convertSqlRowsToEntityDeletionInfoMap / toEntityDeletionInfo) - InstrumentedEbeanLocalAccess: delegation with instrument() wrapper - Integration tests in EbeanLocalAccessTest (11 tests against embedded MariaDB) and mock-based tests in InstrumentedEbeanLocalAccessTest (2)
1 parent 2fac19d commit b311a97

File tree

16 files changed

+760
-1
lines changed

16 files changed

+760
-1
lines changed

CLAUDE.md

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# CLAUDE.md
2+
3+
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
4+
5+
## Project Overview
6+
7+
DataHub GMA (General Metadata Architecture) is the backend framework for LinkedIn's DataHub metadata search & discovery
8+
platform. It provides type-safe, schema-driven metadata management with event-driven data consistency across multiple
9+
storage backends (SQL via Ebean, Elasticsearch for search). Built on LinkedIn's Pegasus data framework and Rest.li for
10+
REST APIs.
11+
12+
**Java version**: 1.8 (Java 8) — required, newer versions will cause build failures.
13+
14+
## Build Commands
15+
16+
```bash
17+
./gradlew build # Full build + all tests
18+
./gradlew :module-name:build # Build a specific module (e.g., :dao-api, :dao-impl:ebean-dao)
19+
./gradlew :module-name:test # Run tests for a specific module
20+
./gradlew spotlessCheck # Check code formatting (CI enforced)
21+
./gradlew spotlessApply # Auto-fix formatting
22+
./gradlew checkstyleMain # Run checkstyle on main sources
23+
./gradlew idea # Generate IntelliJ project files
24+
```
25+
26+
Tests use **TestNG** (not JUnit) as the default test framework across all subprojects. The elasticsearch integration
27+
tests require Docker.
28+
29+
**Apple Silicon (M-series Mac)**: Requires `brew install mariadb` and uncommenting three lines in
30+
`EmbeddedMariaInstance.java` (see `docs/developers.md`).
31+
32+
## Module Architecture
33+
34+
```
35+
core-models/ → Pegasus PDL schemas: Urn, AuditStamp, Url, Time (no Java logic)
36+
core-models-utils/ → URN utility helpers
37+
dao-api/ → DAO abstractions (BaseLocalDAO, BaseSearchDAO, BaseBrowseDAO),
38+
event producers, query utilities, retention policies
39+
dao-impl/
40+
ebean-dao/ → SQL storage via Ebean ORM (EbeanLocalDAO, relationship queries)
41+
elasticsearch-dao/ → ES 5.x/6.x search implementation
42+
elasticsearch-dao-7/→ ES 7.x search implementation
43+
restli-resources/ → Rest.li resource base classes (BaseEntityResource,
44+
BaseSearchableEntityResource) mapping DAOs to REST endpoints
45+
validators/ → Schema validators ensuring PDL models conform to GMA conventions
46+
(AspectValidator, EntityValidator, SnapshotValidator, etc.)
47+
gradle-plugins/ → Annotation parsing (@gma) and code generation for metadata events
48+
testing/ → Test infrastructure, ES integration test harness, test models
49+
```
50+
51+
## Key Architectural Patterns
52+
53+
- **Urn (Universal Resource Name)**: `urn:li:entityType:entityKey` — the universal identifier for all entities. Typed
54+
URN subclasses provide entity-specific keys.
55+
- **Aspect Union Pattern**: Each entity type defines a Pegasus union of its supported aspects. Validators enforce that
56+
union members are record types only.
57+
- **Aspect Versioning**: Version 0 = latest. Each aspect write creates a new immutable version. Retention policies
58+
(indefinite, time-based, version-based) control history.
59+
- **Layered Storage**: BaseLocalDAO (SQL, source of truth) → BaseSearchDAO (Elasticsearch, derived index) →
60+
BaseBrowseDAO (hierarchical navigation). BaseRemoteDAO proxies to other GMS instances.
61+
- **Event Sourcing**: Writes to LocalDAO trigger MCE/MAE event emission via BaseMetadataEventProducer. The
62+
`gradle-plugins` auto-generate event PDL schemas from `@gma` annotations on aspect PDL files.
63+
- **Generic Type Binding**: DAOs are heavily parameterized with generics (`<ASPECT_UNION, URN>`) and validate type
64+
constraints at construction time using reflection via `ModelUtils`.
65+
66+
## Pegasus/Rest.li Data Models
67+
68+
PDL (Pegasus Data Language) schemas live in `src/main/pegasus/` directories and compile to Java `RecordTemplate`
69+
classes. Key namespaces:
70+
71+
- `com.linkedin.common.*` — Core types (Urn, AuditStamp)
72+
- `com.linkedin.metadata.aspect.*` — Aspect wrappers
73+
- `com.linkedin.metadata.query.*` — Search/filter structures
74+
- `com.linkedin.metadata.events.*` — Change tracking types
75+
- `com.linkedin.metadata.snapshot.*` — Entity snapshots (versioned aspect collections)
76+
77+
When modifying PDL schemas, the Pegasus gradle plugin regenerates Java bindings automatically during build.
78+
79+
## Commit Convention
80+
81+
Follow [Conventional Commits](https://www.conventionalcommits.org/): `<type>(scope): description`
82+
83+
Types: `feat`, `fix`, `refactor`, `docs`, `test`, `perf`, `style`, `build`, `ci`
84+
85+
Max line length: 88 characters. Use imperative present tense, no capitalized first letter, no trailing dot.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.linkedin.metadata.dao;
2+
3+
import java.sql.Timestamp;
4+
import java.util.Map;
5+
import lombok.Builder;
6+
import lombok.Value;
7+
8+
9+
/**
10+
* A value class that holds deletion-relevant fields for a single entity, used by batch deletion validation.
11+
* Contains status flags for deletion eligibility checks and all aspect column values for Kafka archival.
12+
*/
13+
@Value
14+
@Builder
15+
public class EntityDeletionInfo {
16+
17+
Timestamp deletedTs;
18+
19+
boolean statusRemoved;
20+
21+
String statusLastModifiedOn;
22+
23+
/**
24+
* All aspect column values (column name → raw JSON string) for Kafka archival by the service layer.
25+
*/
26+
Map<String, String> aspectColumns;
27+
}

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public class EbeanLocalAccess<URN extends Urn> implements IEbeanLocalAccess<URN>
7272

7373
// TODO confirm if the default page size is 1000 in other code context.
7474
private static final int DEFAULT_PAGE_SIZE = 1000;
75+
private static final int MAX_BATCH_DELETE_SIZE = 2000;
76+
private static final String STATUS_ASPECT_FQCN = "com.linkedin.common.Status";
7577
private static final String ASPECT_JSON_PLACEHOLDER = "__PLACEHOLDER__";
7678
private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN";
7779
private static final String EBEAN_SERVER_CONFIG = "EbeanServerConfig";
@@ -344,6 +346,58 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) {
344346
return _server.createSqlUpdate(deleteSqlStatement).execute();
345347
}
346348

349+
@Override
350+
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
351+
if (urns.isEmpty()) {
352+
return Collections.emptyMap();
353+
}
354+
if (urns.size() > MAX_BATCH_DELETE_SIZE) {
355+
throw new IllegalArgumentException(
356+
String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE));
357+
}
358+
359+
final String statusColumnName = getStatusColumnName();
360+
final Urn firstUrn = urns.get(0);
361+
final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn);
362+
final List<String> columns = getDeletionInfoColumns(tableName);
363+
final String sql = SQLStatementUtils.createReadDeletionInfoByUrnsSql(urns, columns, isTestMode);
364+
return EBeanDAOUtils.convertSqlRowsToEntityDeletionInfoMap(
365+
_server.createSqlQuery(sql).findList(), _urnClass, statusColumnName);
366+
}
367+
368+
/**
369+
* Returns the column list needed for batch deletion info: urn, deleted_ts, and all aspect columns (a_*).
370+
* Excludes index columns (i_*) and other derived columns to reduce data transfer.
371+
*/
372+
private List<String> getDeletionInfoColumns(@Nonnull String tableName) {
373+
return validator.getColumns(tableName).stream()
374+
.filter(c -> c.equals("urn") || c.equals("deleted_ts") || c.startsWith(ASPECT_PREFIX))
375+
.collect(Collectors.toList());
376+
}
377+
378+
/**
379+
* Resolves the entity table column name for the Status aspect via {@link SQLSchemaUtils}.
380+
*/
381+
private String getStatusColumnName() {
382+
return getAspectColumnName(_entityType, STATUS_ASPECT_FQCN);
383+
}
384+
385+
@Override
386+
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
387+
if (urns.isEmpty()) {
388+
return 0;
389+
}
390+
if (urns.size() > MAX_BATCH_DELETE_SIZE) {
391+
throw new IllegalArgumentException(
392+
String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE));
393+
}
394+
395+
final String statusColumnName = getStatusColumnName();
396+
final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, statusColumnName,
397+
isTestMode);
398+
return _server.createSqlUpdate(sql).execute();
399+
}
400+
347401
@Override
348402
public List<URN> listUrns(@Nullable IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion,
349403
@Nullable URN lastUrn, int pageSize) {

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,20 @@ protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) {
750750
return _localAccess.softDeleteAsset(urn, isTestMode);
751751
}
752752

753+
/**
754+
* Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}.
755+
*/
756+
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
757+
return _localAccess.readDeletionInfoBatch(urns, isTestMode);
758+
}
759+
760+
/**
761+
* Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}.
762+
*/
763+
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
764+
return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode);
765+
}
766+
753767
@Override
754768
public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
755769
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,30 @@ <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(@Nonnull
9797
*/
9898
int softDeleteAsset(@Nonnull URN urn, boolean isTestMode);
9999

100+
/**
101+
* Read deletion-relevant fields for a batch of URNs in a single SELECT.
102+
* Returns deletion-relevant fields for validation and all aspect columns for Kafka archival.
103+
* URNs not found in the database will not have entries in the returned map.
104+
*
105+
* @param urns list of URNs to check
106+
* @param isTestMode whether to use test schema
107+
* @return map of URN to {@link EntityDeletionInfo}
108+
*/
109+
Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode);
110+
111+
/**
112+
* Batch soft-delete entities by setting deleted_ts = NOW() for URNs that meet all deletion criteria.
113+
* The UPDATE includes guard clauses (deleted_ts IS NULL, Status.removed = true, lastmodifiedon &lt; cutoff)
114+
* as defense-in-depth against race conditions. The Status aspect column name is resolved internally via
115+
* {@link com.linkedin.metadata.dao.utils.SQLSchemaUtils#getAspectColumnName}.
116+
*
117+
* @param urns list of URNs to soft-delete
118+
* @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp
119+
* @param isTestMode whether to use test schema
120+
* @return number of rows actually soft-deleted
121+
*/
122+
int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode);
123+
100124
/**
101125
* Returns list of urns that satisfy the given filter conditions.
102126
*

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,18 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) {
9999
return instrument("softDeleteAsset", () -> _delegate.softDeleteAsset(urn, isTestMode));
100100
}
101101

102+
@Override
103+
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
104+
return instrument("readDeletionInfoBatch.urns_" + urns.size(),
105+
() -> _delegate.readDeletionInfoBatch(urns, isTestMode));
106+
}
107+
108+
@Override
109+
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
110+
return instrument("batchSoftDeleteAssets.urns_" + urns.size(),
111+
() -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode));
112+
}
113+
102114
@Override
103115
public List<URN> listUrns(@Nullable IndexFilter indexFilter,
104116
@Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) {

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/EBeanDAOUtils.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.metadata.dao.utils;
22

33
import com.linkedin.common.urn.Urn;
4+
import com.linkedin.data.DataMap;
45
import com.linkedin.data.schema.RecordDataSchema;
56
import com.linkedin.data.template.DataTemplateUtil;
67
import com.linkedin.data.template.RecordTemplate;
@@ -12,6 +13,7 @@
1213
import com.linkedin.metadata.aspect.AuditedAspect;
1314
import com.linkedin.metadata.aspect.SoftDeletedAspect;
1415
import com.linkedin.metadata.dao.EbeanMetadataAspect;
16+
import com.linkedin.metadata.dao.EntityDeletionInfo;
1517
import com.linkedin.metadata.dao.ListResult;
1618
import com.linkedin.metadata.query.AspectField;
1719
import com.linkedin.metadata.query.Condition;
@@ -257,6 +259,73 @@ public static <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> readSqlR
257259
}).collect(Collectors.toList());
258260
}
259261

262+
/**
263+
* Parse a list of {@link SqlRow} results from an entity table into a map of
264+
* URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and the Status aspect column.
265+
* Rows that cannot be parsed as a valid URN are skipped with a warning.
266+
*
267+
* @param sqlRows list of {@link SqlRow} from entity table query
268+
* @param urnClass URN class for deserialization
269+
* @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status")
270+
* @param <URN> URN type
271+
* @return map of URN to {@link EntityDeletionInfo}
272+
*/
273+
public static <URN extends Urn> Map<URN, EntityDeletionInfo> convertSqlRowsToEntityDeletionInfoMap(
274+
@Nonnull List<SqlRow> sqlRows, @Nonnull Class<URN> urnClass, @Nonnull String statusColumnName) {
275+
final Map<URN, EntityDeletionInfo> result = new HashMap<>();
276+
for (SqlRow row : sqlRows) {
277+
final String urnStr = row.getString("urn");
278+
try {
279+
result.put(getUrn(urnStr, urnClass), toEntityDeletionInfo(row, statusColumnName));
280+
} catch (IllegalArgumentException e) {
281+
log.warn("Failed to parse URN string: {}, skipping row", urnStr, e);
282+
}
283+
}
284+
return result;
285+
}
286+
287+
/**
288+
* Parse a single {@link SqlRow} from an entity table query into an {@link EntityDeletionInfo}.
289+
* Extracts deletion eligibility fields from the Status aspect column (statusRemoved, statusLastModifiedOn)
290+
* and collects all aspect columns for Kafka archival.
291+
*
292+
* @param row {@link SqlRow} from entity table query
293+
* @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status")
294+
* @return {@link EntityDeletionInfo}
295+
*/
296+
@Nonnull
297+
static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row, @Nonnull String statusColumnName) {
298+
// Collect all aspect columns (a_* prefixed, non-null), same pattern as readSqlRows()
299+
final Map<String, String> aspectColumnValues = new HashMap<>();
300+
for (String key : row.keySet()) {
301+
if (key.startsWith(SQLSchemaUtils.ASPECT_PREFIX) && row.get(key) != null) {
302+
aspectColumnValues.put(key, row.getString(key));
303+
}
304+
}
305+
306+
boolean statusRemoved = false;
307+
String statusLastModifiedOn = null;
308+
final String statusJson = row.getString(statusColumnName);
309+
if (statusJson != null) {
310+
final DataMap statusData = RecordUtils.toDataMap(statusJson);
311+
final Object lastModObj = statusData.get("lastmodifiedon");
312+
if (lastModObj != null) {
313+
statusLastModifiedOn = lastModObj.toString();
314+
}
315+
final Object aspectObj = statusData.get("aspect");
316+
if (aspectObj instanceof DataMap) {
317+
statusRemoved = Boolean.TRUE.equals(((DataMap) aspectObj).get("removed"));
318+
}
319+
}
320+
321+
return EntityDeletionInfo.builder()
322+
.deletedTs(row.getTimestamp("deleted_ts"))
323+
.statusRemoved(statusRemoved)
324+
.statusLastModifiedOn(statusLastModifiedOn)
325+
.aspectColumns(aspectColumnValues)
326+
.build();
327+
}
328+
260329
/**
261330
* Read EbeanMetadataAspect from {@link SqlRow}.
262331
* @param sqlRow {@link SqlRow}

0 commit comments

Comments
 (0)