Skip to content

Commit 09d02b9

Browse files
chaitalicodchaitalithombare
andauthored
ATLAS-4984: Option to ignore spark_process attributes details and sparkPlanDescription (#331)
Co-authored-by: chaitalithombare <chaitalithombare@apache.org>
1 parent be1fb84 commit 09d02b9

File tree

4 files changed

+180
-2
lines changed

4 files changed

+180
-2
lines changed

webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
163163
public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
164164
public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
165165
public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix";
166+
public static final String CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES = "atlas.notification.consumer.preprocess.spark_process.attributes";
166167
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user";
167168
public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds";
168169
public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@@ -209,6 +210,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
209210
private final boolean preprocessEnabled;
210211
private final boolean createShellEntityForNonExistingReference;
211212
private final boolean authorizeUsingMessageUser;
213+
private final boolean sparkProcessAttributes;
214+
212215
private final Map<String, Authentication> authnCache;
213216
private final NotificationInterface notificationInterface;
214217
private final Configuration applicationProperties;
@@ -367,8 +370,8 @@ public NotificationHookConsumer(NotificationInterface notificationInterface, Atl
367370
hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
368371
rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
369372
s3V2DirectoryPruneObjectPrefix = applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true);
370-
371-
preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
373+
sparkProcessAttributes = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_SPARK_PROCESS_ATTRIBUTES, false);
374+
preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty() || sparkProcessAttributes;
372375
entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore);
373376

374377
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633);
@@ -680,6 +683,10 @@ private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<Hook
680683
pruneObjectPrefixForS3V2Directory(context);
681684
}
682685

686+
if (sparkProcessAttributes) {
687+
preprocessSparkProcessAttributes(context);
688+
}
689+
683690
context.moveRegisteredReferredEntities();
684691

685692
if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) {
@@ -789,6 +796,21 @@ private void preprocessHiveTypes(PreprocessorContext context) {
789796
}
790797
}
791798

799+
private void preprocessSparkProcessAttributes(PreprocessorContext context) {
800+
List<AtlasEntity> entities = context.getEntities();
801+
802+
if (entities != null) {
803+
for (int i = 0; i < entities.size(); i++) {
804+
AtlasEntity entity = entities.get(i);
805+
EntityPreprocessor preprocessor = EntityPreprocessor.getSparkPreprocessor(entity.getTypeName());
806+
807+
if (preprocessor != null) {
808+
preprocessor.preprocess(entity, context);
809+
}
810+
}
811+
}
812+
}
813+
792814
private void skipHiveColumnLineage(PreprocessorContext context) {
793815
List<AtlasEntity> entities = context.getEntities();
794816

webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public abstract class EntityPreprocessor {
3434
public static final String TYPE_HIVE_DB_DDL = "hive_db_ddl";
3535
public static final String TYPE_HIVE_TABLE_DDL = "hive_table_ddl";
3636
public static final String TYPE_HIVE_TABLE = "hive_table";
37+
public static final String TYPE_SPARK_PROCESS = "spark_process";
3738
public static final String TYPE_RDBMS_INSTANCE = "rdbms_instance";
3839
public static final String TYPE_RDBMS_DB = "rdbms_db";
3940
public static final String TYPE_RDBMS_TABLE = "rdbms_table";
@@ -42,6 +43,8 @@ public abstract class EntityPreprocessor {
4243
public static final String TYPE_RDBMS_FOREIGN_KEY = "rdbms_foreign_key";
4344

4445
public static final String ATTRIBUTE_COLUMNS = "columns";
46+
public static final String ATTRIBUTE_DETAILS = "details";
47+
public static final String ATTRIBUTE_SPARKPLANDESCRIPTION = "sparkPlanDescription";
4548
public static final String ATTRIBUTE_INPUTS = "inputs";
4649
public static final String ATTRIBUTE_OUTPUTS = "outputs";
4750
public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
@@ -66,6 +69,7 @@ public abstract class EntityPreprocessor {
6669
private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>();
6770
private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>();
6871
private static final Map<String, EntityPreprocessor> AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>();
72+
private static final Map<String, EntityPreprocessor> SPARK_PREPROCESSOR_MAP = new HashMap<>();
6973

7074
private final String typeName;
7175

@@ -85,6 +89,10 @@ public static EntityPreprocessor getS3V2Preprocessor(String typeName) {
8589
return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) : null;
8690
}
8791

92+
public static EntityPreprocessor getSparkPreprocessor(String typeName) {
93+
return typeName != null ? SPARK_PREPROCESSOR_MAP.get(typeName) : null;
94+
}
95+
8896
public static String getQualifiedName(AtlasEntity entity) {
8997
Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null;
9098

@@ -175,6 +183,10 @@ protected boolean isEmpty(Object obj) {
175183
new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor()
176184
};
177185

186+
EntityPreprocessor[] sparkPreprocessors = new EntityPreprocessor[]{
187+
new SparkPreprocessor.SparkProcessPreprocessor()
188+
};
189+
178190
for (EntityPreprocessor preprocessor : hivePreprocessors) {
179191
HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
180192
}
@@ -186,5 +198,9 @@ protected boolean isEmpty(Object obj) {
186198
for (EntityPreprocessor preprocessor : s3V2Preprocessors) {
187199
AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
188200
}
201+
202+
for (EntityPreprocessor preprocessor : sparkPreprocessors) {
203+
SPARK_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
204+
}
189205
}
190206
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.atlas.notification.preprocessor;
20+
21+
import org.apache.atlas.model.instance.AtlasEntity;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
public class SparkPreprocessor {
26+
private static final Logger LOG = LoggerFactory.getLogger(SparkPreprocessor.class);
27+
static class SparkProcessPreprocessor extends EntityPreprocessor {
28+
public SparkProcessPreprocessor() {
29+
super(TYPE_SPARK_PROCESS);
30+
}
31+
32+
@Override
33+
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
34+
entity.removeAttribute(ATTRIBUTE_DETAILS);
35+
entity.removeAttribute(ATTRIBUTE_SPARKPLANDESCRIPTION);
36+
}
37+
}
38+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.atlas.notification.preprocessor;
20+
21+
import org.apache.atlas.kafka.AtlasKafkaMessage;
22+
import org.apache.atlas.kafka.KafkaNotification;
23+
import org.apache.atlas.model.instance.AtlasEntity;
24+
import org.apache.atlas.model.notification.HookNotification;
25+
import org.apache.atlas.notification.hook.HookMessageDeserializer;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import org.testng.annotations.Test;
29+
30+
import java.util.Map;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.ArrayList;
34+
35+
import java.util.regex.Pattern;
36+
37+
import static org.testng.Assert.assertEquals;
38+
39+
public class SparkPreprocessorTest {
40+
private static final Logger LOG = LoggerFactory.getLogger(SparkPreprocessorTest.class);
41+
private final HookMessageDeserializer deserializer = new HookMessageDeserializer();
42+
public static final String TYPE_SPARK_PROCESS = "spark_process";
43+
public static final String ATTRIBUTE_DETAILS = "details";
44+
public static final String ATTRIBUTE_SPARKPLANDESCRIPTION = "sparkPlanDescription";
45+
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
46+
public static final String ATTRIBUTE_NAME = "name";
47+
public static final String ATTRIBUTE_ISINCOMPLETE = "isIncomplete";
48+
public static final String ATTRIBUTE_REMOTEUSER = "remoteUser";
49+
public static final String ATTRIBUTE_EXECUTIONID = "executionId";
50+
public static final String ATTRIBUTE_QUERYTEXT = "queryText";
51+
public static final String ATTRIBUTE_CURRUSER = "currUser";
52+
public static final String ATTRIBUTE_GUID = "guid";
53+
private static final List<String> EMPTY_STR_LIST = new ArrayList<>();
54+
private static final List<Pattern> EMPTY_PATTERN_LIST = new ArrayList<>();
55+
56+
private void getPreprocessorContext(AtlasEntity entity) {
57+
EntityPreprocessor preprocessor = EntityPreprocessor.getSparkPreprocessor(entity.getTypeName());
58+
HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity));
59+
60+
AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
61+
62+
PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null,
63+
EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false,
64+
false, true, false, null);
65+
if (preprocessor != null) {
66+
preprocessor.preprocess(entity, context);
67+
}
68+
}
69+
70+
public Object[][] provideSparkProcessData() {
71+
Map<String, Object> attributes = new HashMap<>();
72+
attributes.put(ATTRIBUTE_NAME, "execution-1");
73+
attributes.put(ATTRIBUTE_QUALIFIED_NAME, "application_1740993925593_0006-execution-1sparkTab1");
74+
attributes.put(ATTRIBUTE_DETAILS, "== Parsed Logical Plan ==\\nCreateHiveTableAsSelectCommand ...");
75+
attributes.put(ATTRIBUTE_SPARKPLANDESCRIPTION, "Execute CreateHiveTableAsSelectCommand ...");
76+
attributes.put(ATTRIBUTE_GUID, "-32055574130361399");
77+
attributes.put(ATTRIBUTE_ISINCOMPLETE, "false");
78+
attributes.put(ATTRIBUTE_REMOTEUSER, "spark");
79+
attributes.put(ATTRIBUTE_EXECUTIONID, 1);
80+
attributes.put(ATTRIBUTE_QUERYTEXT, null);
81+
attributes.put(ATTRIBUTE_CURRUSER, "spark");
82+
83+
return new Object[][] { { attributes } };
84+
}
85+
86+
@Test
87+
public void replaceAttributesInSparkProcess() {
88+
Object[][] testData = provideSparkProcessData();
89+
90+
for (Object[] data : testData) {
91+
Map<String, Object> attributes = (Map<String, Object>) data[0]; // Extract attributes
92+
93+
AtlasEntity atlasEntity = new AtlasEntity();
94+
atlasEntity.setTypeName(TYPE_SPARK_PROCESS);
95+
attributes.forEach(atlasEntity::setAttribute);
96+
getPreprocessorContext(atlasEntity);
97+
98+
assertEquals(atlasEntity.getAttribute(ATTRIBUTE_DETAILS), null);
99+
assertEquals(atlasEntity.getAttribute(ATTRIBUTE_SPARKPLANDESCRIPTION), null);
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)