Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions CLAUDE.md
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this!

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

DataHub GMA (General Metadata Architecture) is the backend framework for LinkedIn's DataHub metadata search & discovery
platform. It provides type-safe, schema-driven metadata management with event-driven data consistency across multiple
storage backends (SQL via Ebean, Elasticsearch for search). Built on LinkedIn's Pegasus data framework and Rest.li for
REST APIs.

**Java version**: 1.8 (Java 8) — required, newer versions will cause build failures.

## Build Commands

```bash
./gradlew build # Full build + all tests
./gradlew :module-name:build # Build a specific module (e.g., :dao-api, :dao-impl:ebean-dao)
./gradlew :module-name:test # Run tests for a specific module
./gradlew spotlessCheck # Check code formatting (CI enforced)
./gradlew spotlessApply # Auto-fix formatting
./gradlew checkstyleMain # Run checkstyle on main sources
./gradlew idea # Generate IntelliJ project files
```

Tests use **TestNG** (not JUnit) as the default test framework across all subprojects. The elasticsearch integration
tests require Docker.

**Apple Silicon (M-series Mac)**: Requires `brew install mariadb` and uncommenting three lines in
`EmbeddedMariaInstance.java` (see `docs/developers.md`).

## Module Architecture

```
core-models/ → Pegasus PDL schemas: Urn, AuditStamp, Url, Time (no Java logic)
core-models-utils/ → URN utility helpers
dao-api/ → DAO abstractions (BaseLocalDAO, BaseSearchDAO, BaseBrowseDAO),
event producers, query utilities, retention policies
dao-impl/
ebean-dao/ → SQL storage via Ebean ORM (EbeanLocalDAO, relationship queries)
elasticsearch-dao/ → ES 5.x/6.x search implementation
elasticsearch-dao-7/→ ES 7.x search implementation
restli-resources/ → Rest.li resource base classes (BaseEntityResource,
BaseSearchableEntityResource) mapping DAOs to REST endpoints
validators/ → Schema validators ensuring PDL models conform to GMA conventions
(AspectValidator, EntityValidator, SnapshotValidator, etc.)
gradle-plugins/ → Annotation parsing (@gma) and code generation for metadata events
testing/ → Test infrastructure, ES integration test harness, test models
```

## Key Architectural Patterns

- **Urn (Universal Resource Name)**: `urn:li:entityType:entityKey` — the universal identifier for all entities. Typed
URN subclasses provide entity-specific keys.
- **Aspect Union Pattern**: Each entity type defines a Pegasus union of its supported aspects. Validators enforce that
union members are record types only.
- **Aspect Versioning**: Version 0 = latest. Each aspect write creates a new immutable version. Retention policies
(indefinite, time-based, version-based) control history.
- **Layered Storage**: BaseLocalDAO (SQL, source of truth) → BaseSearchDAO (Elasticsearch, derived index) →
BaseBrowseDAO (hierarchical navigation). BaseRemoteDAO proxies to other GMS instances.
- **Event Sourcing**: Writes to LocalDAO trigger MCE/MAE event emission via BaseMetadataEventProducer. The
`gradle-plugins` auto-generate event PDL schemas from `@gma` annotations on aspect PDL files.
- **Generic Type Binding**: DAOs are heavily parameterized with generics (`<ASPECT_UNION, URN>`) and validate type
constraints at construction time using reflection via `ModelUtils`.

## Pegasus/Rest.li Data Models

PDL (Pegasus Data Language) schemas live in `src/main/pegasus/` directories and compile to Java `RecordTemplate`
classes. Key namespaces:

- `com.linkedin.common.*` — Core types (Urn, AuditStamp)
- `com.linkedin.metadata.aspect.*` — Aspect wrappers
- `com.linkedin.metadata.query.*` — Search/filter structures
- `com.linkedin.metadata.events.*` — Change tracking types
- `com.linkedin.metadata.snapshot.*` — Entity snapshots (versioned aspect collections)

When modifying PDL schemas, the Pegasus gradle plugin regenerates Java bindings automatically during build.

## Commit Convention

Follow [Conventional Commits](https://www.conventionalcommits.org/): `<type>(scope): description`

Types: `feat`, `fix`, `refactor`, `docs`, `test`, `perf`, `style`, `build`, `ci`

Max line length: 88 characters. Use imperative present tense, no capitalized first letter, no trailing dot.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.metadata.dao;

import java.sql.Timestamp;
import java.util.Map;
import lombok.Builder;
import lombok.Value;


/**
* A value class that holds deletion-relevant fields for a single entity, used by batch deletion validation.
* Contains status flags for deletion eligibility checks and all aspect column values for Kafka archival.
*/
@Value
@Builder
public class EntityDeletionInfo {

Timestamp deletedTs;

boolean statusRemoved;

String statusLastModifiedOn;

/**
* All aspect column values (column name → raw JSON string) for Kafka archival by the service layer.
*/
Map<String, String> aspectColumns;
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class EbeanLocalAccess<URN extends Urn> implements IEbeanLocalAccess<URN>

// TODO confirm if the default page size is 1000 in other code context.
private static final int DEFAULT_PAGE_SIZE = 1000;
private static final int MAX_BATCH_DELETE_SIZE = 2000;
private static final String STATUS_ASPECT_FQCN = "com.linkedin.common.Status";
private static final String ASPECT_JSON_PLACEHOLDER = "__PLACEHOLDER__";
private static final String DEFAULT_ACTOR = "urn:li:principal:UNKNOWN";
private static final String EBEAN_SERVER_CONFIG = "EbeanServerConfig";
Expand Down Expand Up @@ -344,6 +346,58 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) {
return _server.createSqlUpdate(deleteSqlStatement).execute();
}

@Override
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
if (urns.isEmpty()) {
return Collections.emptyMap();
}
if (urns.size() > MAX_BATCH_DELETE_SIZE) {
throw new IllegalArgumentException(
String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE));
}

final String statusColumnName = getStatusColumnName();
final Urn firstUrn = urns.get(0);
final String tableName = isTestMode ? getTestTableName(firstUrn) : getTableName(firstUrn);
final List<String> columns = getDeletionInfoColumns(tableName);
final String sql = SQLStatementUtils.createReadDeletionInfoByUrnsSql(urns, columns, isTestMode);
return EBeanDAOUtils.convertSqlRowsToEntityDeletionInfoMap(
_server.createSqlQuery(sql).findList(), _urnClass, statusColumnName);
}

/**
* Returns the column list needed for batch deletion info: urn, deleted_ts, and all aspect columns (a_*).
* Excludes index columns (i_*) and other derived columns to reduce data transfer.
*/
private List<String> getDeletionInfoColumns(@Nonnull String tableName) {
return validator.getColumns(tableName).stream()
.filter(c -> c.equals("urn") || c.equals("deleted_ts") || c.startsWith(ASPECT_PREFIX))
.collect(Collectors.toList());
}

/**
* Resolves the entity table column name for the Status aspect via {@link SQLSchemaUtils}.
*/
private String getStatusColumnName() {
return getAspectColumnName(_entityType, STATUS_ASPECT_FQCN);
}

@Override
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
if (urns.isEmpty()) {
return 0;
}
if (urns.size() > MAX_BATCH_DELETE_SIZE) {
throw new IllegalArgumentException(
String.format("Batch size %d exceeds maximum of %d", urns.size(), MAX_BATCH_DELETE_SIZE));
}

final String statusColumnName = getStatusColumnName();
final String sql = SQLStatementUtils.createBatchSoftDeleteAssetSql(urns, cutoffTimestamp, statusColumnName,
isTestMode);
return _server.createSqlUpdate(sql).execute();
}

@Override
public List<URN> listUrns(@Nullable IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion,
@Nullable URN lastUrn, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,20 @@ protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) {
return _localAccess.softDeleteAsset(urn, isTestMode);
}

/**
* Delegates to {@link IEbeanLocalAccess#readDeletionInfoBatch}.
*/
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
return _localAccess.readDeletionInfoBatch(urns, isTestMode);
}

/**
* Delegates to {@link IEbeanLocalAccess#batchSoftDeleteAssets}.
*/
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
return _localAccess.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode);
}

@Override
public <ASPECT extends RecordTemplate> void updateEntityTables(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass) {
if (_schemaConfig == SchemaConfig.OLD_SCHEMA_ONLY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,30 @@ <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(@Nonnull
*/
int softDeleteAsset(@Nonnull URN urn, boolean isTestMode);

/**
* Read deletion-relevant fields for a batch of URNs in a single SELECT.
* Returns deletion-relevant fields for validation and all aspect columns for Kafka archival.
* URNs not found in the database will not have entries in the returned map.
*
* @param urns list of URNs to check
* @param isTestMode whether to use test schema
* @return map of URN to {@link EntityDeletionInfo}
*/
Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode);

/**
* Batch soft-delete entities by setting deleted_ts = NOW() for URNs that meet all deletion criteria.
* The UPDATE includes guard clauses (deleted_ts IS NULL, Status.removed = true, lastmodifiedon &lt; cutoff)
* as defense-in-depth against race conditions. The Status aspect column name is resolved internally via
* {@link com.linkedin.metadata.dao.utils.SQLSchemaUtils#getAspectColumnName}.
*
* @param urns list of URNs to soft-delete
* @param cutoffTimestamp only delete if Status.lastmodifiedon is before this timestamp
* @param isTestMode whether to use test schema
* @return number of rows actually soft-deleted
*/
int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode);

/**
* Returns list of urns that satisfy the given filter conditions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) {
return instrument("softDeleteAsset", () -> _delegate.softDeleteAsset(urn, isTestMode));
}

@Override
public Map<URN, EntityDeletionInfo> readDeletionInfoBatch(@Nonnull List<URN> urns, boolean isTestMode) {
return instrument("readDeletionInfoBatch.urns_" + urns.size(),
() -> _delegate.readDeletionInfoBatch(urns, isTestMode));
}

@Override
public int batchSoftDeleteAssets(@Nonnull List<URN> urns, @Nonnull String cutoffTimestamp, boolean isTestMode) {
return instrument("batchSoftDeleteAssets.urns_" + urns.size(),
() -> _delegate.batchSoftDeleteAssets(urns, cutoffTimestamp, isTestMode));
}

@Override
public List<URN> listUrns(@Nullable IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.dao.utils;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
Expand All @@ -12,6 +13,7 @@
import com.linkedin.metadata.aspect.AuditedAspect;
import com.linkedin.metadata.aspect.SoftDeletedAspect;
import com.linkedin.metadata.dao.EbeanMetadataAspect;
import com.linkedin.metadata.dao.EntityDeletionInfo;
import com.linkedin.metadata.dao.ListResult;
import com.linkedin.metadata.query.AspectField;
import com.linkedin.metadata.query.Condition;
Expand Down Expand Up @@ -257,6 +259,73 @@ public static <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> readSqlR
}).collect(Collectors.toList());
}

/**
* Parse a list of {@link SqlRow} results from an entity table into a map of
* URN to {@link EntityDeletionInfo}. Each row must contain urn, deleted_ts, and the Status aspect column.
* Rows that cannot be parsed as a valid URN are skipped with a warning.
*
* @param sqlRows list of {@link SqlRow} from entity table query
* @param urnClass URN class for deserialization
* @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status")
* @param <URN> URN type
* @return map of URN to {@link EntityDeletionInfo}
*/
public static <URN extends Urn> Map<URN, EntityDeletionInfo> convertSqlRowsToEntityDeletionInfoMap(
@Nonnull List<SqlRow> sqlRows, @Nonnull Class<URN> urnClass, @Nonnull String statusColumnName) {
final Map<URN, EntityDeletionInfo> result = new HashMap<>();
for (SqlRow row : sqlRows) {
final String urnStr = row.getString("urn");
try {
result.put(getUrn(urnStr, urnClass), toEntityDeletionInfo(row, statusColumnName));
} catch (IllegalArgumentException e) {
log.warn("Failed to parse URN string: {}, skipping row", urnStr, e);
}
}
return result;
}

/**
* Parse a single {@link SqlRow} from an entity table query into an {@link EntityDeletionInfo}.
* Extracts deletion eligibility fields from the Status aspect column (statusRemoved, statusLastModifiedOn)
* and collects all aspect columns for Kafka archival.
*
* @param row {@link SqlRow} from entity table query
* @param statusColumnName the entity table column name for the Status aspect (e.g. "a_status")
* @return {@link EntityDeletionInfo}
*/
@Nonnull
static EntityDeletionInfo toEntityDeletionInfo(@Nonnull SqlRow row, @Nonnull String statusColumnName) {
// Collect all aspect columns (a_* prefixed, non-null), same pattern as readSqlRows()
final Map<String, String> aspectColumnValues = new HashMap<>();
for (String key : row.keySet()) {
if (key.startsWith(SQLSchemaUtils.ASPECT_PREFIX) && row.get(key) != null) {
aspectColumnValues.put(key, row.getString(key));
}
}

boolean statusRemoved = false;
String statusLastModifiedOn = null;
final String statusJson = row.getString(statusColumnName);
if (statusJson != null) {
final DataMap statusData = RecordUtils.toDataMap(statusJson);
final Object lastModObj = statusData.get("lastmodifiedon");
if (lastModObj != null) {
statusLastModifiedOn = lastModObj.toString();
}
final Object aspectObj = statusData.get("aspect");
if (aspectObj instanceof DataMap) {
statusRemoved = Boolean.TRUE.equals(((DataMap) aspectObj).get("removed"));
}
}

return EntityDeletionInfo.builder()
.deletedTs(row.getTimestamp("deleted_ts"))
.statusRemoved(statusRemoved)
.statusLastModifiedOn(statusLastModifiedOn)
.aspectColumns(aspectColumnValues)
.build();
}

/**
* Read EbeanMetadataAspect from {@link SqlRow}.
* @param sqlRow {@link SqlRow}
Expand Down
Loading
Loading