Skip to content

Commit 8f17ced

Browse files
Updating method for create(), update() and delete() to handle soft delete by setting deleted_ts (#549)
* Updating method for create(), update() and delete() to handle soft delete by setting deleted_ts * Updating unit test to delete asset with 2 aspects * Improving unit test * Updating javadoc and inline comments
1 parent 4675104 commit 8f17ced

File tree

11 files changed

+257
-185
lines changed

11 files changed

+257
-185
lines changed

dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java

Lines changed: 41 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ <ASPECT_UNION extends RecordTemplate> URN createAspectsWithCallbacks(@Nonnull UR
740740
boolean isTestModeFalseForAll = aspectCreateLambdas.stream().filter(aspectCreateLambda -> aspectCreateLambda.getIngestionParams().isTestMode()).collect(
741741
Collectors.toList()).isEmpty();
742742

743-
int numRows = createNewAspect(urn, aspectCreateLambdas, aspectValues, auditStamp, trackingContext, isTestModeFalseForAll);
743+
int numRows = createNewAssetWithAspects(urn, aspectCreateLambdas, aspectValues, auditStamp, trackingContext, isTestModeFalseForAll);
744744
for (RecordTemplate aspectValue : aspectValues) {
745745
// For each aspect, we need to trigger emit MAE
746746
// In new asset creation, old value is null
@@ -962,54 +962,6 @@ public <ASPECT extends RecordTemplate> URN create(@Nonnull URN urn,
962962
);
963963
}
964964

965-
/**
966-
* The common method that can be used for both: deletion of aspects and entity.
967-
*
968-
* @param urn the URN for the entity the aspects are attached to
969-
* @param aspectClasses Aspect Classes of the aspects being deleted, must be supported aspect types in
970-
* {@code ASPECT_UNION}
971-
* @param auditStamp the audit stamp of this action
972-
* @param trackingContext the tracking context for the operation
973-
* @param ingestionParams ingestion parameters
974-
* @param deleteAll if true, delete the entire asset, else mark aspects as deleted iteratively in a
975-
* transaction
976-
* @param maxTransactionRetry maximum number of transaction retries before throwing an exception
977-
* @return a collection of the deleted aspects (their value before deletion), each wrapped in an instance of
978-
* {@link ASPECT_UNION}
979-
*/
980-
protected Collection<ASPECT_UNION> deleteCommon(@Nonnull URN urn,
981-
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
982-
@Nonnull AuditStamp auditStamp, int maxTransactionRetry,
983-
@Nullable IngestionTrackingContext trackingContext,
984-
@Nullable IngestionParams ingestionParams,
985-
boolean deleteAll) {
986-
987-
// TODO: Handle pre-deletion callbacks if any
988-
989-
// If deleteAll is true, delete entire asset, else mark aspects as deleted iteratively in a transaction
990-
if (deleteAll) {
991-
Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>
992-
results = permanentDelete(urn, aspectClasses, auditStamp, maxTransactionRetry, trackingContext, ingestionParams.isTestMode());
993-
Collection<RecordTemplate> deletedAspects = new ArrayList<>();
994-
results.forEach((key, value) -> {
995-
// Check if aspect value present to avoid null pointer exception
996-
if (value.isPresent()) {
997-
DeleteResult deleteResult = new DeleteResult(value.get(), key);
998-
deletedAspects.add(unwrapDeleteResult(urn, deleteResult, auditStamp, trackingContext, ChangeType.DELETE_ALL));
999-
}
1000-
});
1001-
1002-
return deletedAspects.stream()
1003-
.filter(Objects::nonNull)
1004-
.map(x -> ModelUtils.newEntityUnion(_aspectUnionClass, x)).collect(Collectors.toList());
1005-
} else {
1006-
// TODO: delete aspects implementation can be moved here instead of in addCommon()
1007-
// Add common method should be used only for create and update
1008-
return Collections.emptyList();
1009-
}
1010-
1011-
//TODO: Handle post-ingestion callbacks if any
1012-
}
1013965

1014966
/**
1015967
* Delete asset and all its aspects atomically.
@@ -1076,9 +1028,36 @@ public Collection<ASPECT_UNION> deleteAll(@Nonnull URN urn,
10761028
int maxTransactionRetry,
10771029
@Nullable IngestionTrackingContext trackingContext,
10781030
@Nullable IngestionParams ingestionParams) {
1031+
10791032
IngestionParams nonNullIngestionParams = ingestionParams == null
10801033
? new IngestionParams().setIngestionMode(IngestionMode.LIVE).setTestMode(false) : ingestionParams;
1081-
return deleteCommon(urn, aspectClasses, auditStamp, maxTransactionRetry, trackingContext, nonNullIngestionParams, true);
1034+
1035+
final Map<Class<?>, RecordTemplate> results = new HashMap<>();
1036+
runInTransactionWithRetry(() -> {
1037+
aspectClasses.forEach(aspectClass -> {
1038+
try {
1039+
RecordTemplate deletedAspect = delete(urn, aspectClass, auditStamp, maxTransactionRetry, trackingContext);
1040+
results.put(aspectClass, deletedAspect);
1041+
} catch (NullPointerException e) {
1042+
log.warn("Aspect {} for urn {} does not exist", aspectClass.getName(), urn);
1043+
}
1044+
});
1045+
1046+
permanentDelete(urn, nonNullIngestionParams.isTestMode());
1047+
return results;
1048+
}, maxTransactionRetry);
1049+
1050+
1051+
Collection<RecordTemplate> deletedAspects = new ArrayList<>();
1052+
results.forEach((key, value) -> {
1053+
// Check if aspect value present to avoid null pointer exception
1054+
DeleteResult deleteResult = new DeleteResult(value, key);
1055+
deletedAspects.add(unwrapDeleteResult(urn, deleteResult, auditStamp, trackingContext, ChangeType.DELETE_ALL));
1056+
});
1057+
1058+
return deletedAspects.stream()
1059+
.filter(Objects::nonNull)
1060+
.map(x -> ModelUtils.newEntityUnion(_aspectUnionClass, x)).collect(Collectors.toList());
10821061
}
10831062

10841063
/**
@@ -1340,24 +1319,19 @@ protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN
13401319
@Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted,
13411320
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode);
13421321

1343-
protected abstract <ASPECT_UNION extends RecordTemplate> int createNewAspect(@NonNull URN urn,
1322+
protected abstract <ASPECT_UNION extends RecordTemplate> int createNewAssetWithAspects(@NonNull URN urn,
13441323
@Nonnull List<AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas,
13451324
@Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp newAuditStamp,
13461325
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode);
13471326

13481327
/**
1349-
* Permanently deletes the entity from the table.
1350-
* @param urn the URN for the entity the aspect is attached to
1351-
* @param aspectClasses Aspect Classes of the aspects being deleted, must be supported aspect types in {@code ASPECT_UNION}
1352-
* @param auditStamp the audit stamp of this action
1353-
* @param maxTransactionRetry maximum number of transaction retries before throwing an exception
1354-
* @param trackingContext the tracking context for the operation
1328+
* Mark the asset as deleted.
1329+
*
1330+
* @param urn the URN for the entity the aspect is attached to
13551331
* @param isTestMode whether the test mode is enabled or not
1356-
* @return a map of the deleted aspects (their value before deletion), each wrapped in an instance of {@link ASPECT_UNION}
1332+
* @return the number of rows updated in delete operation.
13571333
*/
1358-
protected abstract Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> permanentDelete(@Nonnull URN urn,
1359-
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nullable AuditStamp auditStamp,
1360-
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode);
1334+
protected abstract int permanentDelete(@Nonnull URN urn, boolean isTestMode);
13611335

13621336
/**
13631337
* Saves the new value of an aspect to entity tables. This is used when backfilling metadata from the old schema to
@@ -1572,13 +1546,13 @@ protected abstract <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn,
15721546
/**
15731547
* Update an aspect for an entity with specific version and {@link AuditStamp} with optimistic locking.
15741548
*
1575-
* @param urn {@link Urn} for the entity
1576-
* @param value the aspect to update
1577-
* @param aspectClass the type of aspect to update
1549+
* @param urn {@link Urn} for the entity
1550+
* @param value the aspect to update
1551+
* @param aspectClass the type of aspect to update
15781552
* @param newAuditStamp the {@link AuditStamp} for the new aspect
1579-
* @param version the version for the aspect
1580-
* @param oldTimestamp the timestamp for the old aspect
1581-
* @param isTestMode whether the test mode is enabled or not
1553+
* @param version the version for the aspect
1554+
* @param oldTimestamp the timestamp for the old aspect
1555+
* @param isTestMode whether the test mode is enabled or not
15821556
*/
15831557
protected abstract <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonnull URN urn,
15841558
@Nullable RecordTemplate value, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp newAuditStamp,

dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import com.linkedin.metadata.dao.retention.VersionBasedRetention;
1717
import com.linkedin.metadata.dao.tracking.BaseTrackingManager;
1818
import com.linkedin.metadata.dao.urnpath.EmptyPathExtractor;
19-
import com.linkedin.metadata.events.ChangeType;
2019
import com.linkedin.metadata.events.IngestionMode;
2120
import com.linkedin.metadata.events.IngestionTrackingContext;
2221
import com.linkedin.metadata.internal.IngestionParams;
@@ -35,10 +34,8 @@
3534
import java.sql.Timestamp;
3635
import java.util.ArrayList;
3736
import java.util.Arrays;
38-
import java.util.Collection;
3937
import java.util.Collections;
4038
import java.util.HashMap;
41-
import java.util.HashSet;
4239
import java.util.List;
4340
import java.util.Map;
4441
import java.util.Optional;
@@ -101,21 +98,17 @@ protected <ASPECT extends RecordTemplate> long saveLatest(FooUrn urn, Class<ASPE
10198
}
10299

103100
@Override
104-
protected <ASPECT_UNION extends RecordTemplate> int createNewAspect(@Nonnull FooUrn urn,
101+
protected <ASPECT_UNION extends RecordTemplate> int createNewAssetWithAspects(@Nonnull FooUrn urn,
105102
@Nonnull List<AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas,
106103
@Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp newAuditStamp,
107104
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
108105
return aspectValues.size();
109106
}
110107

111108
@Override
112-
protected Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> permanentDelete(@Nonnull FooUrn urn,
113-
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nullable AuditStamp auditStamp,
114-
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
115-
Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> result = new HashMap<>();
116-
result.put(AspectFoo.class, Optional.of(new AspectFoo().setValue("foo")));
117-
result.put(AspectBar.class, Optional.of(new AspectBar().setValue("bar")));
118-
return result;
109+
protected int permanentDelete(@Nonnull FooUrn urn, boolean isTestMode) {
110+
// 1 aspect is deleted: 1 row in table
111+
return 1;
119112
}
120113

121114
@Override
@@ -747,30 +740,6 @@ public void testCreateAspectWithCallbacks() throws URISyntaxException {
747740
verifyNoMoreInteractions(_mockEventProducer);
748741
}
749742

750-
@Test
751-
public void testDeleteAll() throws URISyntaxException {
752-
// Set-up test data
753-
FooUrn urn = new FooUrn(1);
754-
RecordTemplate foo = new AspectFoo().setValue("foo");
755-
RecordTemplate bar = new AspectBar().setValue("bar");
756-
757-
Set<Class<? extends RecordTemplate>> aspectsClasses = new HashSet<>();
758-
aspectsClasses.add(AspectFoo.class);
759-
aspectsClasses.add(AspectBar.class);
760-
761-
Collection<EntityAspectUnion> results = _dummyLocalDAO.deleteAll(urn, aspectsClasses, _dummyAuditStamp);
762-
assertEquals(results.size(), 2);
763-
results.forEach(result -> {
764-
assertNotNull(result);
765-
assertTrue(result.isAspectBar() || result.isAspectFoo());
766-
});
767-
verify(_mockEventProducer, times(1))
768-
.produceAspectSpecificMetadataAuditEvent(urn, foo, null, AspectFoo.class, _dummyAuditStamp, IngestionMode.LIVE, ChangeType.DELETE_ALL);
769-
verify(_mockEventProducer, times(1))
770-
.produceAspectSpecificMetadataAuditEvent(urn, bar, null, AspectBar.class, _dummyAuditStamp, IngestionMode.LIVE, ChangeType.DELETE_ALL);
771-
verifyNoMoreInteractions(_mockEventProducer);
772-
}
773-
774743
@Test
775744
public void testMAEEmissionForAspectCallbackHelper() throws URISyntaxException {
776745
FooUrn urn = new FooUrn(1);

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ public void ensureSchemaUpToDate() {
100100
@Transactional
101101
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
102102
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode) {
103-
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext, isTestMode);
103+
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext,
104+
isTestMode, true);
104105
}
105106

106107
@Override
@@ -111,7 +112,7 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
111112
@Nonnull AuditStamp auditStamp,
112113
@Nullable Timestamp oldTimestamp,
113114
@Nullable IngestionTrackingContext ingestionTrackingContext,
114-
boolean isTestMode) {
115+
boolean isTestMode, boolean softDeleteOverwrite) {
115116

116117
final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
117118
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
@@ -121,7 +122,7 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
121122
final SqlUpdate sqlUpdate;
122123
if (oldTimestamp != null) {
123124
sqlUpdate = _server.createSqlUpdate(
124-
SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction, isTestMode));
125+
SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction, isTestMode, softDeleteOverwrite));
125126
sqlUpdate.setParameter("oldTimestamp", oldTimestamp.toString());
126127
} else {
127128
sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction, isTestMode));
@@ -223,11 +224,26 @@ public <ASPECT_UNION extends RecordTemplate> int create(
223224
}
224225
}
225226
insertIntoSql.append(CLOSING_BRACKET);
226-
insertSqlValues.append(CLOSING_BRACKET_WITH_SEMICOLON);
227+
insertSqlValues.append(CLOSING_BRACKET);
228+
229+
// Construct DELETED_TS_CHECK_FOR_CREATE String
230+
StringBuilder deletedTsCheckForCreate = new StringBuilder();
231+
deletedTsCheckForCreate.append(DELETED_TS_DUPLICATE_KEY_CHECK);
232+
for (int i = 0; i < classNames.size(); i++) {
233+
deletedTsCheckForCreate.append(getAspectColumnName(urn.getEntityType(), classNames.get(i)));
234+
deletedTsCheckForCreate.append(" = :aspect").append(i);
235+
if (i != classNames.size() - 1) {
236+
deletedTsCheckForCreate.append(", ");
237+
}
238+
}
239+
deletedTsCheckForCreate.append(DELETED_TS_SET_VALUE_CONDITIONALLY);
240+
241+
// Build the final insert statement as follows:
242+
// INSERT INTO <table_name> (<columns>) VALUES (<values>)
243+
// ON DUPLICATE KEY UPDATE aspectclass1 = aspect1, ...,
244+
// deleted_ts = IF(deleted_ts IS NULL, CAST('DuplicateKeyException' AS UNSIGNED), NULL);
245+
String insertStatement = insertIntoSql.toString() + insertSqlValues.toString() + deletedTsCheckForCreate.toString();
227246

228-
// Build the final insert statement
229-
// For example: INSERT INTO <table_name> (<columns>) VALUES (<values>);
230-
String insertStatement = insertIntoSql.toString() + insertSqlValues.toString();
231247
insertStatement = String.format(insertStatement, getTableName(urn));
232248

233249
sqlUpdate = _server.createSqlUpdate(insertStatement);
@@ -247,6 +263,7 @@ public <ASPECT_UNION extends RecordTemplate> int create(
247263
sqlUpdate.setParameter("aspect" + i, toJsonString(auditedAspect));
248264
}
249265

266+
250267
// If a non-default UrnPathExtractor is provided, the user MUST specify in their schema generation scripts
251268
// 'ALTER TABLE <table> ADD COLUMN a_urn JSON'.
252269
if (urnExtraction) {
@@ -303,16 +320,16 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(
303320
}
304321

305322
/**
306-
* Delete all aspects + urn for the given urn.
323+
* Soft delete all aspects + urn for the given urn by setting deleted_ts=NOW().
307324
* By this time pre-deletion hooks should be processed.
308-
* Old values are not needed for delete, But should be retrieved and used for in post-update hooks if needed.
309-
* @param urn {@link Urn} for the entity
325+
* @param urn {@link Urn} for the entity
310326
* @param isTestMode whether the operation is in test mode or not
311327
* @return number of rows deleted.
312328
*/
313329
@Override
314-
public int deleteAll(@Nonnull URN urn, boolean isTestMode) {
315-
final String deleteSqlStatement = SQLStatementUtils.createDeleteAssetSql(urn, isTestMode);
330+
public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) {
331+
// Update this to mark deleted_TS to NOW based on URN
332+
final String deleteSqlStatement = SQLStatementUtils.createSoftDeleteAssetSql(urn, isTestMode);
316333
return _server.createSqlUpdate(deleteSqlStatement).execute();
317334
}
318335

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -720,31 +720,24 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non
720720
* @return the number of rows inserted
721721
*/
722722
@Override
723-
protected <ASPECT_UNION extends RecordTemplate> int createNewAspect(@Nonnull URN urn,
723+
protected <ASPECT_UNION extends RecordTemplate> int createNewAssetWithAspects(@Nonnull URN urn,
724724
@Nonnull List<AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas,
725725
@Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp newAuditStamp,
726726
@Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
727727
return runInTransactionWithRetry(() ->
728+
// behavior of create: do a get to ensure the urn does not already exist
729+
// if exists and deletedTs is null, then throw an exception
730+
// if exists and deletedTs is not null, then update the deletedTs to null and create records
728731
_localAccess.create(urn, aspectValues, aspectCreateLambdas, newAuditStamp, trackingContext, isTestMode), 1);
729732
}
730733

731734
@Override
732-
protected Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> permanentDelete(@Nonnull URN urn,
733-
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nullable AuditStamp auditStamp,
734-
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) {
735+
protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) {
735736
// If the table does not have the URN, return empty map. Nothing to delete here.
736737
if (!exists(urn)) {
737-
return Collections.emptyMap();
738+
return 0;
738739
}
739-
// If the table has the URN, get the asset record, including all the aspects.
740-
// This will be used to delete to return deleted record info in the API.
741-
Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> deletedAspects = new HashMap<>();
742-
aspectClasses.forEach(aspectClass -> deletedAspects.put(aspectClass, Optional.ofNullable(getLatest(urn, aspectClass, isTestMode).getAspect())));
743-
// Perform deletion using urn and return the previously retrieved record.
744-
return runInTransactionWithRetry(() -> {
745-
_localAccess.deleteAll(urn, isTestMode);
746-
return deletedAspects;
747-
}, maxTransactionRetry);
740+
return _localAccess.softDeleteAsset(urn, isTestMode);
748741
}
749742

750743
@Override
@@ -943,7 +936,7 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
943936
// Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called.
944937
_server.execute(oldSchemaSqlUpdate);
945938
return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp,
946-
trackingContext, isTestMode);
939+
trackingContext, isTestMode, true);
947940
}, 1);
948941
} else {
949942
// In OLD_SCHEMA and DUAL_SCHEMA mode, the aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table.
@@ -953,7 +946,7 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
953946
// Additionally, in DUAL_SCHEMA mode: apply a regular update (no optimistic locking) to the entity table
954947
if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) {
955948
_localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, null,
956-
trackingContext, isTestMode);
949+
trackingContext, isTestMode, false);
957950
}
958951
return _server.execute(oldSchemaSqlUpdate);
959952
}, 1);

0 commit comments

Comments
 (0)