2323import org .apache .atlas .glossary .GlossaryService ;
2424import org .apache .atlas .model .impexp .AtlasExportRequest ;
2525import org .apache .atlas .model .impexp .AtlasExportResult ;
26+ import org .apache .atlas .model .instance .AtlasClassification ;
2627import org .apache .atlas .model .instance .AtlasEntity ;
2728import org .apache .atlas .model .instance .AtlasEntity .AtlasEntityWithExtInfo ;
2829import org .apache .atlas .model .instance .AtlasObjectId ;
3435import org .apache .atlas .model .typedef .AtlasStructDef ;
3536import org .apache .atlas .model .typedef .AtlasTypesDef ;
3637import org .apache .atlas .repository .graph .GraphHelper ;
38+ import org .apache .atlas .repository .graphdb .AtlasEdge ;
39+ import org .apache .atlas .repository .graphdb .AtlasEdgeDirection ;
3740import org .apache .atlas .repository .graphdb .AtlasGraph ;
3841import org .apache .atlas .repository .graphdb .AtlasVertex ;
3942import org .apache .atlas .repository .store .graph .v2 .AtlasGraphUtilsV2 ;
4346import org .apache .atlas .util .AtlasGremlinQueryProvider ;
4447import org .apache .commons .collections .CollectionUtils ;
4548import org .apache .commons .collections .MapUtils ;
49+ import org .apache .commons .lang3 .StringUtils ;
4650import org .slf4j .Logger ;
4751import org .slf4j .LoggerFactory ;
4852import org .springframework .stereotype .Component ;
5256import java .util .ArrayList ;
5357import java .util .HashMap ;
5458import java .util .HashSet ;
59+ import java .util .Iterator ;
5560import java .util .List ;
5661import java .util .Map ;
5762import java .util .Set ;
63+ import java .util .stream .StreamSupport ;
5864
5965import static org .apache .atlas .model .impexp .AtlasExportRequest .FETCH_TYPE_CONNECTED ;
6066import static org .apache .atlas .model .impexp .AtlasExportRequest .FETCH_TYPE_FULL ;
6167import static org .apache .atlas .model .impexp .AtlasExportRequest .FETCH_TYPE_INCREMENTAL ;
68+ import static org .apache .atlas .repository .Constants .CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY ;
69+ import static org .apache .atlas .repository .Constants .CLASSIFICATION_EDGE_NAME_PROPERTY_KEY ;
70+ import static org .apache .atlas .repository .Constants .CLASSIFICATION_LABEL ;
6271import static org .apache .atlas .repository .Constants .GUID_PROPERTY_KEY ;
6372import static org .apache .atlas .repository .Constants .MODIFICATION_TIMESTAMP_PROPERTY_KEY ;
73+ import static org .apache .atlas .repository .graph .GraphHelper .getGuid ;
74+ import static org .apache .atlas .repository .graph .GraphHelper .getPropagatedClassificationEdge ;
75+ import static org .apache .atlas .repository .graph .GraphHelper .getTypeName ;
6476
6577@ Component
6678public class ExportService {
@@ -91,13 +103,14 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
91103 long startTime = System .currentTimeMillis ();
92104 AtlasExportResult result = new AtlasExportResult (request , userName , requestingIP , hostName , startTime , getCurrentChangeMarker ());
93105 ExportContext context = new ExportContext (result , exportSink );
106+ RelationshipAttributesExtractor relationshipAttributesExtractor = new RelationshipAttributesExtractor (typeRegistry );
94107
95108 exportTypeProcessor = new ExportTypeProcessor (typeRegistry , glossaryService );
96109
97110 try {
98111 LOG .info ("==> export(user={}, from={})" , userName , requestingIP );
99112
100- AtlasExportResult .OperationStatus [] statuses = processItems (request , context );
113+ AtlasExportResult .OperationStatus [] statuses = processItems (request , context , relationshipAttributesExtractor );
101114
102115 processTypesDef (context );
103116
@@ -118,7 +131,7 @@ public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, Str
118131 return context .result ;
119132 }
120133
121- public void processEntity (AtlasEntityWithExtInfo entityWithExtInfo , ExportContext context ) throws AtlasBaseException {
134+ public void processEntity (AtlasEntityWithExtInfo entityWithExtInfo , ExportContext context , RelationshipAttributesExtractor relationshipAttributesExtractor ) throws AtlasBaseException {
122135 exportTypeProcessor .addTypes (entityWithExtInfo .getEntity (), context );
123136
124137 if (MapUtils .isNotEmpty (context .termsGlossary )) {
@@ -139,6 +152,10 @@ public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContex
139152
140153 context .guidsProcessed .addAll (entityWithExtInfo .getReferredEntities ().keySet ());
141154 }
155+
156+ if (context .isHiveTableIncremental () && !context .visitedVertices .contains (entityWithExtInfo .getEntity ().getGuid ())) {
157+ getEntityGuids (entityWithExtInfo , context , relationshipAttributesExtractor );
158+ }
142159 }
143160
144161 @ VisibleForTesting
@@ -219,20 +236,20 @@ private void processTypesDef(ExportContext context) {
219236 }
220237 }
221238
222- private AtlasExportResult .OperationStatus [] processItems (AtlasExportRequest request , ExportContext context ) {
239+ private AtlasExportResult .OperationStatus [] processItems (AtlasExportRequest request , ExportContext context , RelationshipAttributesExtractor relationshipAttributesExtractor ) {
223240 AtlasExportResult .OperationStatus [] statuses = new AtlasExportResult .OperationStatus [request .getItemsToExport ().size ()];
224241 List <AtlasObjectId > itemsToExport = request .getItemsToExport ();
225242
226243 for (int i = 0 ; i < itemsToExport .size (); i ++) {
227244 AtlasObjectId item = itemsToExport .get (i );
228245
229- statuses [i ] = processObjectId (item , context );
246+ statuses [i ] = processObjectId (item , context , relationshipAttributesExtractor );
230247 }
231248
232249 return statuses ;
233250 }
234251
235- private AtlasExportResult .OperationStatus processObjectId (AtlasObjectId item , ExportContext context ) {
252+ private AtlasExportResult .OperationStatus processObjectId (AtlasObjectId item , ExportContext context , RelationshipAttributesExtractor relationshipAttributesExtractor ) {
236253 LOG .debug ("==> processObjectId({})" , item );
237254
238255 try {
@@ -248,14 +265,14 @@ private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, Ex
248265 AtlasVertex vertex = AtlasGraphUtilsV2 .findByGuid (guid );
249266 String typeName = GraphHelper .getTypeName (vertex );
250267 context .startingEntityType = typeName ;
251- processEntityGuid (guid , context );
268+ processEntityGuid (guid , context , relationshipAttributesExtractor );
252269 }
253270
254271 while (!context .guidsToProcess .isEmpty ()) {
255272 while (!context .guidsToProcess .isEmpty ()) {
256273 String guid = context .guidsToProcess .remove (0 );
257274
258- processEntityGuid (guid , context );
275+ processEntityGuid (guid , context , relationshipAttributesExtractor );
259276 }
260277
261278 if (!context .lineageToProcess .isEmpty ()) {
@@ -285,7 +302,7 @@ private List<String> getStartingEntity(AtlasObjectId item, ExportContext context
285302 return startEntityFetchByExportRequest .get (context .result .getRequest (), item );
286303 }
287304
288- private void processEntityGuid (String guid , ExportContext context ) throws AtlasBaseException {
305+ private void processEntityGuid (String guid , ExportContext context , RelationshipAttributesExtractor relationshipAttributesExtractor ) throws AtlasBaseException {
289306 LOG .debug ("==> processEntityGuid({})" , guid );
290307
291308 if (context .guidsProcessed .contains (guid )) {
@@ -299,7 +316,7 @@ private void processEntityGuid(String guid, ExportContext context) throws AtlasB
299316 } else {
300317 AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever .toAtlasEntityWithExtInfo (guid );
301318
302- processEntity (entityWithExtInfo , context );
319+ processEntity (entityWithExtInfo , context , relationshipAttributesExtractor );
303320 }
304321
305322 LOG .debug ("<== processEntityGuid({})" , guid );
@@ -413,6 +430,136 @@ private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext c
413430 context .reportProgress ();
414431 }
415432
433+ public void getEntityGuids (AtlasEntityWithExtInfo entityWithExtInfo , ExportContext context , RelationshipAttributesExtractor relationshipAttributesExtractor ) throws AtlasBaseException {
434+ if (!context .classificationEntity .containsKey (entityWithExtInfo .getEntity ().getGuid ())) {
435+ if (CollectionUtils .isNotEmpty (entityWithExtInfo .getEntity ().getClassifications ())) {
436+ for (AtlasClassification c : entityWithExtInfo .getEntity ().getClassifications ()) {
437+ context .classificationEntity
438+ .computeIfAbsent (c .getEntityGuid (), key -> new UniqueList <>())
439+ .add (c .getTypeName ());
440+ }
441+ }
442+ }
443+ String entityGuid = getClassificationLineage (entityWithExtInfo , context , relationshipAttributesExtractor );
444+ if (entityGuid != null ) {
445+ if (!context .visitedVertices .contains (entityGuid )) {
446+ context .visitedVertices .add (entityGuid );
447+ getEntityGuids (new AtlasEntityWithExtInfo (entityGraphRetriever .toAtlasEntity (entityGuid )), context , relationshipAttributesExtractor );
448+ }
449+ }
450+ }
451+
452+ public String getClassificationLineage (AtlasEntityWithExtInfo entityWithExtInfo , ExportContext context , RelationshipAttributesExtractor relationshipAttributesExtractor ) throws AtlasBaseException {
453+ Iterable tagPropagationEdge ;
454+ Boolean isParent = false ;
455+ AtlasVertex adjacentVertex = null ;
456+ AtlasVertex entityVertexStart = entityGraphRetriever .getEntityVertex (entityWithExtInfo .getEntity ().getGuid ());
457+ String entityGuid = getGuid (entityVertexStart );
458+ String entityTypeName = getTypeName (entityVertexStart );
459+
460+ if (CollectionUtils .isNotEmpty (entityWithExtInfo .getEntity ().getClassifications ())) {
461+ for (AtlasClassification classification : entityWithExtInfo .getEntity ().getClassifications ()) {
462+ String classificationName = classification .getTypeName ();
463+
464+ if (context .propagatedEdgeMap .containsKey (entityGuid ) &&
465+ context .propagatedEdgeMap .get (entityGuid ).contains (classificationName )) {
466+ continue ;
467+ }
468+
469+ context .propagatedEdgeMap .computeIfAbsent (entityGuid , key -> new UniqueList <>())
470+ .add (classificationName );
471+ //We are iterating on classifications here again because while traversing
472+ // there could be more classifications on entities where classifications are blocked on edges
473+ String direction = relationshipAttributesExtractor .isLineageType (entityTypeName )
474+ ? "__Process.inputs"
475+ : "__Process.outputs" ;
476+
477+ tagPropagationEdge = entityVertexStart .query ()// found 2 edges
478+ .direction (direction .equals ("__Process.inputs" ) ? AtlasEdgeDirection .OUT : AtlasEdgeDirection .IN )
479+ .label (direction )
480+ .edges ();
481+ Iterator <AtlasEdge > iterator = tagPropagationEdge .iterator ().hasNext () ? tagPropagationEdge .iterator () : null ;
482+
483+ if (iterator != null ) {
484+ while (iterator .hasNext ()) {
485+ AtlasEdge edge = null ;
486+ AtlasEdge propagationEdge = iterator .next ();
487+ if (relationshipAttributesExtractor .isLineageType (entityTypeName )) {
488+ edge = checkPropagatedEdge (propagationEdge .getInVertex (), classificationName , classification .getEntityGuid (), edge );
489+ //this is getting executed for hive process for incoming edge from table
490+ }
491+ else {
492+ edge = checkPropagatedEdge (propagationEdge .getOutVertex (), classificationName , classification .getEntityGuid (), edge );
493+ //this is checked for outvertex in case of table or other propagating entity which is not a process
494+ }
495+ if (edge == null ) {
496+ isParent = checkIfParentVertex (propagationEdge .getInVertex (), classificationName );
497+ //This is checking if propagated edge is not found then entity must be a parent of a classification
498+ }
499+
500+ if (edge != null || isParent ) {
501+ entityLineageProcess (entityVertexStart , context , propagationEdge ,
502+ iterator , entityGraphRetriever , relationshipAttributesExtractor , entityGuid , classificationName );
503+ }
504+ }
505+ }
506+ }
507+ }
508+ return (adjacentVertex == null ) ? null : getGuid (adjacentVertex );
509+ }
510+
511+ private void entityLineageProcess (AtlasVertex entityVertexStart , ExportContext context ,
512+ AtlasEdge propagationEdge , Iterator <AtlasEdge > iterator ,
513+ EntityGraphRetriever entityGraphRetriever ,
514+ RelationshipAttributesExtractor relationshipAttributesExtractor ,
515+ String entityGuid , String classificationName ) throws AtlasBaseException {
516+
517+ if (context .classificationEntity .containsKey (entityGuid ) &&
518+ context .classificationEntity .get (entityGuid ).contains (classificationName )) {
519+ return ;
520+ }
521+ AtlasVertex outVertex = propagationEdge .getOutVertex ();
522+ AtlasVertex inVertex = propagationEdge .getInVertex ();
523+
524+ AtlasVertex adjacentVertex = StringUtils .equals (outVertex .getIdForDisplay (), entityVertexStart .getIdForDisplay ()) ? inVertex : outVertex ;
525+ String adjacentEntityGuid = getGuid (adjacentVertex );
526+
527+ AtlasEntity entity = entityGraphRetriever .toAtlasEntity (adjacentEntityGuid );
528+ //Here we check if adjacent entity is parent of ongoing classification
529+ //Here we check for iterator.
530+ boolean adjacentHasClassification = context .classificationEntity .containsKey (adjacentEntityGuid ) &&
531+ context .classificationEntity .get (adjacentEntityGuid ).contains (classificationName );
532+
533+ boolean shouldRecurse = adjacentHasClassification || iterator .hasNext ();
534+ if (shouldRecurse ) {
535+ if (!context .guidsToProcess .contains (adjacentEntityGuid )) {
536+ context .guidsToProcess .add (adjacentEntityGuid );
537+ }
538+ getEntityGuids (new AtlasEntityWithExtInfo (entity ), context , relationshipAttributesExtractor );
539+ }
540+ if (!context .guidsToProcess .contains (adjacentEntityGuid )) {
541+ context .guidsToProcess .add (adjacentEntityGuid );
542+ }
543+ }
544+
545+ //This method executes when a process or a table has propagated classification edge
546+ private AtlasEdge checkPropagatedEdge (AtlasVertex vertex , String classification , String guid , AtlasEdge edgeID ) {
547+ AtlasEdge edge = getPropagatedClassificationEdge (vertex , classification , guid );
548+ return edge ;
549+ }
550+
551+ //This method gets executed when a table/Process is a parent to a classification
552+ public Boolean checkIfParentVertex (AtlasVertex vertex , String classification ) {
553+ Iterable <AtlasEdge > appliedClassifications = vertex .query ()
554+ .direction (AtlasEdgeDirection .OUT )
555+ .label (CLASSIFICATION_LABEL )
556+ .has (CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY , false )
557+ .has (CLASSIFICATION_EDGE_NAME_PROPERTY_KEY , classification )
558+ .edges ();
559+
560+ return StreamSupport .stream (appliedClassifications .spliterator (), false ).findAny ().isPresent ();
561+ }
562+
416563 public enum TraversalDirection {
417564 UNKNOWN ,
418565 INWARD ,
@@ -460,6 +607,10 @@ static class ExportContext {
460607 final Set <String > relationshipTypes = new HashSet <>();
461608 final Set <String > businessMetadataTypes = new HashSet <>();
462609 final Map <String , String > termsGlossary = new HashMap <>();
610+ final Map <String , UniqueList <String >> classificationEntity = new HashMap <>();
611+ final Map <String , UniqueList <String >> propagatedEdgeMap = new HashMap <>();
612+ final Set <String > visitedVertices = new HashSet <>();
613+
463614 final AtlasExportResult result ;
464615 final ExportFetchType fetchType ;
465616 final boolean skipLineage ;
@@ -511,6 +662,9 @@ public void clear() {
511662 guidsToProcess .clear ();
512663 guidsProcessed .clear ();
513664 guidDirection .clear ();
665+ visitedVertices .clear ();
666+ classificationEntity .clear ();
667+ propagatedEdgeMap .clear ();
514668 startingEntityType = null ;
515669 }
516670
0 commit comments