|
33 | 33 | import org.apache.atlas.model.instance.EntityMutationResponse; |
34 | 34 | import org.apache.atlas.model.instance.EntityMutations.EntityOperation; |
35 | 35 | import org.apache.atlas.model.notification.EntityNotification; |
36 | | -import org.apache.atlas.repository.audit.AtlasAuditService; |
37 | 36 | import org.apache.atlas.repository.converters.AtlasInstanceConverter; |
38 | 37 | import org.apache.atlas.repository.graph.FullTextMapperV2; |
39 | 38 | import org.apache.atlas.repository.graph.GraphHelper; |
|
59 | 58 | import java.util.ListIterator; |
60 | 59 | import java.util.Map; |
61 | 60 | import java.util.Set; |
62 | | -import java.util.function.Predicate; |
| 61 | +import java.util.stream.Collectors; |
63 | 62 |
|
64 | 63 | import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; |
65 | 64 | import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; |
|
69 | 68 | public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier { |
70 | 69 | private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); |
71 | 70 |
|
72 | | - private static final Predicate<AtlasEntityHeader> PRED_IS_NOT_TYPE_AUDIT_ENTITY = obj -> !obj.getTypeName().equals(AtlasAuditService.ENTITY_TYPE_AUDIT_ENTRY); |
73 | | - |
74 | 71 | private final Set<EntityChangeListener> entityChangeListeners; |
75 | 72 | private final Set<EntityChangeListenerV2> entityChangeListenersV2; |
76 | 73 | private final AtlasInstanceConverter instanceConverter; |
@@ -400,24 +397,32 @@ private String getListenerName(EntityChangeListener listener) { |
400 | 397 | return listener.getClass().getSimpleName(); |
401 | 398 | } |
402 | 399 |
|
403 | | - private boolean skipAuditEntries(List<AtlasEntityHeader> entityHeaders) { |
404 | | - return CollectionUtils.isEmpty(entityHeaders) || entityHeaders.stream().noneMatch(PRED_IS_NOT_TYPE_AUDIT_ENTITY); |
| 400 | + private List<AtlasEntityHeader> filterNonInternalEntities(List<AtlasEntityHeader> entityHeaders) { |
| 401 | + if (CollectionUtils.isEmpty(entityHeaders)) { |
| 402 | + return Collections.emptyList(); |
| 403 | + } |
| 404 | + |
| 405 | + return entityHeaders.stream().filter(atlasEntityHeader -> !GraphHelper.isInternalType(atlasEntityHeader.getTypeName())).collect(Collectors.toList()); |
405 | 406 | } |
406 | 407 |
|
407 | 408 | private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { |
408 | 409 | if (CollectionUtils.isEmpty(entityHeaders)) { |
409 | 410 | return; |
410 | 411 | } |
411 | | - if (skipAuditEntries(entityHeaders)) { |
| 412 | + |
| 413 | + List<AtlasEntityHeader> nonInternalEntities = filterNonInternalEntities(entityHeaders); |
| 414 | + |
| 415 | + if (CollectionUtils.isEmpty(nonInternalEntities)) { |
| 416 | + LOG.info("Skipping notifications: All entities are internal types"); |
412 | 417 | return; |
413 | 418 | } |
414 | 419 |
|
415 | 420 | MetricRecorder metric = RequestContext.get().startMetricRecord("notifyListeners"); |
416 | 421 |
|
417 | 422 | if (isV2EntityNotificationEnabled) { |
418 | | - notifyV2Listeners(entityHeaders, operation, isImport); |
| 423 | + notifyV2Listeners(nonInternalEntities, operation, isImport); |
419 | 424 | } else { |
420 | | - notifyV1Listeners(entityHeaders, operation, isImport); |
| 425 | + notifyV1Listeners(nonInternalEntities, operation, isImport); |
421 | 426 | } |
422 | 427 |
|
423 | 428 | RequestContext.get().endMetricRecord(metric); |
|
0 commit comments