-
Notifications
You must be signed in to change notification settings - Fork 59
Updating method for create(), update() and delete() to handle soft delete by setting deleted_ts #549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updating method for create(), update() and delete() to handle soft delete by setting deleted_ts #549
Changes from 1 commit
eec50f8
746c801
798c7af
01f8f52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -757,7 +757,7 @@ <ASPECT_UNION extends RecordTemplate> URN createAspectsWithCallbacks(@Nonnull UR | |
| boolean isTestModeFalseForAll = aspectCreateLambdas.stream().filter(aspectCreateLambda -> aspectCreateLambda.getIngestionParams().isTestMode()).collect( | ||
| Collectors.toList()).isEmpty(); | ||
|
|
||
| int numRows = createNewAspect(urn, aspectCreateLambdas, aspectValues, auditStamp, trackingContext, isTestModeFalseForAll); | ||
| int numRows = createNewAssetWithAspects(urn, aspectCreateLambdas, aspectValues, auditStamp, trackingContext, isTestModeFalseForAll); | ||
| for (RecordTemplate aspectValue : aspectValues) { | ||
| // For each aspect, we need to trigger emit MAE | ||
| // In new asset creation, old value is null | ||
|
|
@@ -979,54 +979,6 @@ public <ASPECT extends RecordTemplate> URN create(@Nonnull URN urn, | |
| ); | ||
| } | ||
|
|
||
| /** | ||
| * The common method that can be used for both: deletion of aspects and entity. | ||
| * | ||
| * @param urn the URN for the entity the aspects are attached to | ||
| * @param aspectClasses Aspect Classes of the aspects being deleted, must be supported aspect types in | ||
| * {@code ASPECT_UNION} | ||
| * @param auditStamp the audit stamp of this action | ||
| * @param trackingContext the tracking context for the operation | ||
| * @param ingestionParams ingestion parameters | ||
| * @param deleteAll if true, delete the entire asset, else mark aspects as deleted iteratively in a | ||
| * transaction | ||
| * @param maxTransactionRetry maximum number of transaction retries before throwing an exception | ||
| * @return a collection of the deleted aspects (their value before deletion), each wrapped in an instance of | ||
| * {@link ASPECT_UNION} | ||
| */ | ||
| protected Collection<ASPECT_UNION> deleteCommon(@Nonnull URN urn, | ||
| @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, | ||
| @Nonnull AuditStamp auditStamp, int maxTransactionRetry, | ||
| @Nullable IngestionTrackingContext trackingContext, | ||
| @Nullable IngestionParams ingestionParams, | ||
| boolean deleteAll) { | ||
|
|
||
| // TODO: Handle pre-deletion callbacks if any | ||
|
|
||
| // If deleteAll is true, delete entire asset, else mark aspects as deleted iteratively in a transaction | ||
| if (deleteAll) { | ||
| Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> | ||
| results = permanentDelete(urn, aspectClasses, auditStamp, maxTransactionRetry, trackingContext, ingestionParams.isTestMode()); | ||
| Collection<RecordTemplate> deletedAspects = new ArrayList<>(); | ||
| results.forEach((key, value) -> { | ||
| // Check if aspect value present to avoid null pointer exception | ||
| if (value.isPresent()) { | ||
| DeleteResult deleteResult = new DeleteResult(value.get(), key); | ||
| deletedAspects.add(unwrapDeleteResult(urn, deleteResult, auditStamp, trackingContext, ChangeType.DELETE_ALL)); | ||
| } | ||
| }); | ||
|
|
||
| return deletedAspects.stream() | ||
| .filter(Objects::nonNull) | ||
| .map(x -> ModelUtils.newEntityUnion(_aspectUnionClass, x)).collect(Collectors.toList()); | ||
| } else { | ||
| // TODO: delete aspects implementation can be moved here instead of in addCommon() | ||
| // Add common method should be used only for create and update | ||
| return Collections.emptyList(); | ||
| } | ||
|
|
||
| //TODO: Handle post-ingestion callbacks if any | ||
| } | ||
|
|
||
| /** | ||
| * Delete asset and all its aspects atomically. | ||
|
|
@@ -1093,9 +1045,37 @@ public Collection<ASPECT_UNION> deleteAll(@Nonnull URN urn, | |
| int maxTransactionRetry, | ||
| @Nullable IngestionTrackingContext trackingContext, | ||
| @Nullable IngestionParams ingestionParams) { | ||
|
|
||
| IngestionParams nonNullIngestionParams = ingestionParams == null | ||
| ? new IngestionParams().setIngestionMode(IngestionMode.LIVE).setTestMode(false) : ingestionParams; | ||
| return deleteCommon(urn, aspectClasses, auditStamp, maxTransactionRetry, trackingContext, nonNullIngestionParams, true); | ||
|
|
||
| final Map<Class<?>, RecordTemplate> results = new HashMap<>(); | ||
| runInTransactionWithRetry(() -> { | ||
| Map<Class<?>, RecordTemplate> deletedAspects = new HashMap<>(); | ||
| aspectClasses.forEach(aspectClass -> { | ||
| try { | ||
| RecordTemplate deletedAspect = delete(urn, aspectClass, auditStamp, maxTransactionRetry, trackingContext); | ||
| results.put(aspectClass, deletedAspect); | ||
| } catch (NullPointerException e) { | ||
| log.warn("Aspect {} for urn {} does not exist", aspectClass.getName(), urn); | ||
| } | ||
|
Comment on lines
+1058
to
+1060
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this throw on both:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| }); | ||
|
|
||
| permanentDelete(urn, nonNullIngestionParams.isTestMode()); | ||
| return results; | ||
| }, maxTransactionRetry); | ||
|
|
||
|
|
||
| Collection<RecordTemplate> deletedAspects = new ArrayList<>(); | ||
| results.forEach((key, value) -> { | ||
| // Check if aspect value present to avoid null pointer exception | ||
| DeleteResult deleteResult = new DeleteResult(value, key); | ||
| deletedAspects.add(unwrapDeleteResult(urn, deleteResult, auditStamp, trackingContext, ChangeType.DELETE_ALL)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Few questions / checks:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| }); | ||
|
|
||
| return deletedAspects.stream() | ||
| .filter(Objects::nonNull) | ||
| .map(x -> ModelUtils.newEntityUnion(_aspectUnionClass, x)).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1353,24 +1333,20 @@ protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN | |
| @Nullable ASPECT newEntry, @Nonnull AuditStamp newAuditStamp, boolean isSoftDeleted, | ||
| @Nullable IngestionTrackingContext trackingContext, boolean isTestMode); | ||
|
|
||
| protected abstract <ASPECT_UNION extends RecordTemplate> int createNewAspect(@NonNull URN urn, | ||
| protected abstract <ASPECT_UNION extends RecordTemplate> int createNewAssetWithAspects(@NonNull URN urn, | ||
| @Nonnull List<AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas, | ||
| @Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp newAuditStamp, | ||
| @Nullable IngestionTrackingContext trackingContext, boolean isTestMode); | ||
|
|
||
| /** | ||
| * Permanently deletes the entity from the table. | ||
| * @param urn the URN for the entity the aspect is attached to | ||
| * @param aspectClasses Aspect Classes of the aspects being deleted, must be supported aspect types in {@code ASPECT_UNION} | ||
| * @param auditStamp the audit stamp of this action | ||
| * @param maxTransactionRetry maximum number of transaction retries before throwing an exception | ||
| * @param trackingContext the tracking context for the operation | ||
| * | ||
| * @param urn the URN for the entity the aspect is attached to | ||
| * @param isTestMode whether the test mode is enabled or not | ||
| * @return a map of the deleted aspects (their value before deletion), each wrapped in an instance of {@link ASPECT_UNION} | ||
| * @return a map of the deleted aspects (their value before deletion), each wrapped in an instance of | ||
| * {@link ASPECT_UNION} | ||
| */ | ||
| protected abstract Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> permanentDelete(@Nonnull URN urn, | ||
| @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nullable AuditStamp auditStamp, | ||
| int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode); | ||
| protected abstract int permanentDelete(@Nonnull URN urn, boolean isTestMode); | ||
|
||
|
|
||
| /** | ||
| * Saves the new value of an aspect to entity tables. This is used when backfilling metadata from the old schema to | ||
|
|
@@ -1585,13 +1561,13 @@ protected abstract <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, | |
| /** | ||
| * Update an aspect for an entity with specific version and {@link AuditStamp} with optimistic locking. | ||
| * | ||
| * @param urn {@link Urn} for the entity | ||
| * @param value the aspect to update | ||
| * @param aspectClass the type of aspect to update | ||
| * @param urn {@link Urn} for the entity | ||
| * @param value the aspect to update | ||
| * @param aspectClass the type of aspect to update | ||
| * @param newAuditStamp the {@link AuditStamp} for the new aspect | ||
| * @param version the version for the aspect | ||
| * @param oldTimestamp the timestamp for the old aspect | ||
| * @param isTestMode whether the test mode is enabled or not | ||
| * @param version the version for the aspect | ||
| * @param oldTimestamp the timestamp for the old aspect | ||
| * @param isTestMode whether the test mode is enabled or not | ||
| */ | ||
| protected abstract <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonnull URN urn, | ||
| @Nullable RecordTemplate value, @Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp newAuditStamp, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,7 +100,8 @@ public void ensureSchemaUpToDate() { | |
| @Transactional | ||
| public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass, | ||
| @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode) { | ||
| return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext, isTestMode); | ||
| return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext, | ||
| isTestMode, true); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -111,7 +112,7 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking( | |
| @Nonnull AuditStamp auditStamp, | ||
| @Nullable Timestamp oldTimestamp, | ||
| @Nullable IngestionTrackingContext ingestionTrackingContext, | ||
| boolean isTestMode) { | ||
| boolean isTestMode, boolean softDeleteOverwrite) { | ||
|
|
||
| final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis(); | ||
| final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR; | ||
|
|
@@ -121,7 +122,7 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking( | |
| final SqlUpdate sqlUpdate; | ||
| if (oldTimestamp != null) { | ||
| sqlUpdate = _server.createSqlUpdate( | ||
| SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction, isTestMode)); | ||
| SQLStatementUtils.createAspectUpdateWithOptimisticLockSql(urn, aspectClass, urnExtraction, isTestMode, softDeleteOverwrite)); | ||
| sqlUpdate.setParameter("oldTimestamp", oldTimestamp.toString()); | ||
| } else { | ||
| sqlUpdate = _server.createSqlUpdate(SQLStatementUtils.createAspectUpsertSql(urn, aspectClass, urnExtraction, isTestMode)); | ||
|
|
@@ -223,11 +224,25 @@ public <ASPECT_UNION extends RecordTemplate> int create( | |
| } | ||
| } | ||
| insertIntoSql.append(CLOSING_BRACKET); | ||
| insertSqlValues.append(CLOSING_BRACKET_WITH_SEMICOLON); | ||
| insertSqlValues.append(CLOSING_BRACKET); | ||
|
|
||
| // Construct DELETED_TS_CHECK_FOR_CREATE String | ||
| StringBuilder deletedTsCheckForCreate = new StringBuilder(); | ||
| deletedTsCheckForCreate.append(DELETED_TS_DUPLICATE_KEY_CHECK); | ||
| for (int i = 0; i < classNames.size(); i++) { | ||
| deletedTsCheckForCreate.append(getAspectColumnName(urn.getEntityType(), classNames.get(i))); | ||
| deletedTsCheckForCreate.append(" = :aspect").append(i); | ||
| if (i != classNames.size() - 1) { | ||
| deletedTsCheckForCreate.append(", "); | ||
| } | ||
| } | ||
|
Comment on lines
+234
to
+238
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this for loop tested in unit testing at all? (is it easy to test?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good callout. this code will be used when more than asset with more than 1 aspect is being in created. sample vale from unit test: |
||
| deletedTsCheckForCreate.append(DELETED_TS_CONDITIONAL_VALUE_SET); | ||
|
|
||
| // Build the final insert statement | ||
| // For example: INSERT INTO <table_name> (<columns>) VALUES (<values>); | ||
| String insertStatement = insertIntoSql.toString() + insertSqlValues.toString(); | ||
| String insertStatement = insertIntoSql.toString() + insertSqlValues.toString() + deletedTsCheckForCreate.toString(); | ||
|
|
||
|
|
||
| insertStatement = String.format(insertStatement, getTableName(urn)); | ||
|
|
||
| sqlUpdate = _server.createSqlUpdate(insertStatement); | ||
|
|
@@ -247,6 +262,7 @@ public <ASPECT_UNION extends RecordTemplate> int create( | |
| sqlUpdate.setParameter("aspect" + i, toJsonString(auditedAspect)); | ||
| } | ||
|
|
||
|
|
||
| // If a non-default UrnPathExtractor is provided, the user MUST specify in their schema generation scripts | ||
| // 'ALTER TABLE <table> ADD COLUMN a_urn JSON'. | ||
| if (urnExtraction) { | ||
|
|
@@ -303,16 +319,17 @@ public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion( | |
| } | ||
|
|
||
| /** | ||
| * Delete all aspects + urn for the given urn. | ||
| * By this time pre-deletion hooks should be processed. | ||
| * Old values are not needed for delete, But should be retrieved and used for in post-update hooks if needed. | ||
| * @param urn {@link Urn} for the entity | ||
| * Delete all aspects + urn for the given urn. By this time pre-deletion hooks should be processed. Old values are not | ||
| * needed for delete, But should be retrieved and used for in post-update hooks if needed. | ||
| * | ||
| * @param urn {@link Urn} for the entity | ||
| * @param isTestMode whether the operation is in test mode or not | ||
| * @return number of rows deleted. | ||
| */ | ||
| @Override | ||
| public int deleteAll(@Nonnull URN urn, boolean isTestMode) { | ||
| final String deleteSqlStatement = SQLStatementUtils.createDeleteAssetSql(urn, isTestMode); | ||
| public int softDeleteAsset(@Nonnull URN urn, boolean isTestMode) { | ||
| // Update this to mark deleted_TS to NOW based on URN | ||
| final String deleteSqlStatement = SQLStatementUtils.createSoftDeleteAssetSql(urn, isTestMode); | ||
| return _server.createSqlUpdate(deleteSqlStatement).execute(); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -656,31 +656,24 @@ protected <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN urn, @Non | |
| * @return the number of rows inserted | ||
| */ | ||
| @Override | ||
| protected <ASPECT_UNION extends RecordTemplate> int createNewAspect(@Nonnull URN urn, | ||
| protected <ASPECT_UNION extends RecordTemplate> int createNewAssetWithAspects(@Nonnull URN urn, | ||
| @Nonnull List<AspectCreateLambda<? extends RecordTemplate>> aspectCreateLambdas, | ||
| @Nonnull List<? extends RecordTemplate> aspectValues, @Nonnull AuditStamp newAuditStamp, | ||
| @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) { | ||
| return runInTransactionWithRetry(() -> | ||
| // do a get to ensure the urn does not already exist | ||
| // if exists and deletedTs is null, then throw an exception | ||
| // if exists and deletedTs is not null, then update the deletedTs to null and create records | ||
|
||
| _localAccess.create(urn, aspectValues, aspectCreateLambdas, newAuditStamp, trackingContext, isTestMode), 1); | ||
| } | ||
|
|
||
| @Override | ||
| protected Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> permanentDelete(@Nonnull URN urn, | ||
| @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nullable AuditStamp auditStamp, | ||
| int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isTestMode) { | ||
| protected int permanentDelete(@Nonnull URN urn, boolean isTestMode) { | ||
| // If the table does not have the URN, return empty map. Nothing to delete here. | ||
| if (!exists(urn)) { | ||
| return Collections.emptyMap(); | ||
| return 0; | ||
| } | ||
| // If the table has the URN, get the asset record, including all the aspects. | ||
| // This will be used to delete to return deleted record info in the API. | ||
| Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>> deletedAspects = new HashMap<>(); | ||
| aspectClasses.forEach(aspectClass -> deletedAspects.put(aspectClass, Optional.ofNullable(getLatest(urn, aspectClass, isTestMode).getAspect()))); | ||
| // Perform deletion using urn and return the previously retrieved record. | ||
| return runInTransactionWithRetry(() -> { | ||
| _localAccess.deleteAll(urn, isTestMode); | ||
| return deletedAspects; | ||
| }, maxTransactionRetry); | ||
| return _localAccess.softDeleteAsset(urn, isTestMode); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -879,7 +872,7 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn | |
| // Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called. | ||
| _server.execute(oldSchemaSqlUpdate); | ||
| return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp, | ||
| trackingContext, isTestMode); | ||
| trackingContext, isTestMode, true); | ||
| }, 1); | ||
| } else { | ||
| // In OLD_SCHEMA and DUAL_SCHEMA mode, the aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table. | ||
|
|
@@ -889,7 +882,7 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn | |
| // Additionally, in DUAL_SCHEMA mode: apply a regular update (no optimistic locking) to the entity table | ||
| if (_schemaConfig == SchemaConfig.DUAL_SCHEMA) { | ||
| _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, null, | ||
| trackingContext, isTestMode); | ||
| trackingContext, isTestMode, false); | ||
| } | ||
| return _server.execute(oldSchemaSqlUpdate); | ||
| }, 1); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleting this, because reusing the delete() implementation for deleting all aspects (this)
instead of doing Hard Asset deletion (deleting entire row)