Skip to content

Commit 9f791a3

Browse files
feat(browseV2): add browseV2 logic to system update (#8506)
Co-authored-by: david-leifker <[email protected]>
1 parent 2e2a674 commit 9f791a3

File tree

12 files changed

+227
-5
lines changed

12 files changed

+227
-5
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.linkedin.datahub.upgrade.config;
2+
3+
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
4+
import com.linkedin.metadata.entity.EntityService;
5+
import com.linkedin.metadata.search.SearchService;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
9+
10+
@Configuration
11+
public class BackfillBrowsePathsV2Config {
12+
13+
@Bean
14+
public BackfillBrowsePathsV2 backfillBrowsePathsV2(EntityService entityService, SearchService searchService) {
15+
return new BackfillBrowsePathsV2(entityService, searchService);
16+
}
17+
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.linkedin.datahub.upgrade.system.SystemUpdate;
44
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
55
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
6+
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
67
import com.linkedin.gms.factory.common.TopicConventionFactory;
78
import com.linkedin.gms.factory.config.ConfigurationProvider;
89
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
@@ -30,10 +31,11 @@ public class SystemUpdateConfig {
3031
@Bean(name = "systemUpdate")
3132
public SystemUpdate systemUpdate(final BuildIndices buildIndices, final CleanIndices cleanIndices,
3233
@Qualifier("duheKafkaEventProducer") final KafkaEventProducer kafkaEventProducer,
33-
final GitVersion gitVersion, @Qualifier("revision") String revision) {
34+
final GitVersion gitVersion, @Qualifier("revision") String revision,
35+
final BackfillBrowsePathsV2 backfillBrowsePathsV2) {
3436

3537
String version = String.format("%s-%s", gitVersion.getVersion(), revision);
36-
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version);
38+
return new SystemUpdate(buildIndices, cleanIndices, kafkaEventProducer, version, backfillBrowsePathsV2);
3739
}
3840

3941
@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/SystemUpdate.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
77
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
88
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
9+
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
910
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
1011
import lombok.extern.slf4j.Slf4j;
1112

@@ -21,11 +22,12 @@ public class SystemUpdate implements Upgrade {
2122
private final List<UpgradeStep> _steps;
2223

2324
public SystemUpdate(final BuildIndices buildIndicesJob, final CleanIndices cleanIndicesJob,
24-
final KafkaEventProducer kafkaEventProducer, final String version) {
25+
final KafkaEventProducer kafkaEventProducer, final String version,
26+
final BackfillBrowsePathsV2 backfillBrowsePathsV2) {
2527

2628
_preStartupUpgrades = List.of(buildIndicesJob);
2729
_steps = List.of(new DataHubStartupStep(kafkaEventProducer, version));
28-
_postStartupUpgrades = List.of(cleanIndicesJob);
30+
_postStartupUpgrades = List.of(cleanIndicesJob, backfillBrowsePathsV2);
2931
}
3032

3133
@Override
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.linkedin.datahub.upgrade.system.entity.steps;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.linkedin.datahub.upgrade.Upgrade;
5+
import com.linkedin.datahub.upgrade.UpgradeStep;
6+
import com.linkedin.metadata.entity.EntityService;
7+
import com.linkedin.metadata.search.SearchService;
8+
import java.util.List;
9+
10+
11+
public class BackfillBrowsePathsV2 implements Upgrade {
12+
13+
private final List<UpgradeStep> _steps;
14+
15+
public BackfillBrowsePathsV2(EntityService entityService, SearchService searchService) {
16+
_steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService));
17+
}
18+
19+
@Override
20+
public String id() {
21+
return "BackfillBrowsePathsV2";
22+
}
23+
24+
@Override
25+
public List<UpgradeStep> steps() {
26+
return _steps;
27+
}
28+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package com.linkedin.datahub.upgrade.system.entity.steps;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.google.common.collect.ImmutableSet;
5+
import com.linkedin.common.AuditStamp;
6+
import com.linkedin.common.BrowsePathsV2;
7+
import com.linkedin.common.urn.Urn;
8+
import com.linkedin.common.urn.UrnUtils;
9+
import com.linkedin.datahub.upgrade.UpgradeContext;
10+
import com.linkedin.datahub.upgrade.UpgradeStep;
11+
import com.linkedin.datahub.upgrade.UpgradeStepResult;
12+
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
13+
import com.linkedin.events.metadata.ChangeType;
14+
import com.linkedin.metadata.Constants;
15+
import com.linkedin.metadata.entity.EntityService;
16+
import com.linkedin.metadata.query.filter.Condition;
17+
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
18+
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
19+
import com.linkedin.metadata.query.filter.Criterion;
20+
import com.linkedin.metadata.query.filter.CriterionArray;
21+
import com.linkedin.metadata.query.filter.Filter;
22+
import com.linkedin.metadata.search.ScrollResult;
23+
import com.linkedin.metadata.search.SearchEntity;
24+
import com.linkedin.metadata.search.SearchService;
25+
import com.linkedin.metadata.utils.GenericRecordUtils;
26+
import com.linkedin.mxe.MetadataChangeProposal;
27+
import com.linkedin.mxe.SystemMetadata;
28+
import java.util.function.Function;
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import java.util.Set;
32+
33+
import static com.linkedin.metadata.Constants.*;
34+
35+
36+
@Slf4j
37+
public class BackfillBrowsePathsV2Step implements UpgradeStep {
38+
39+
public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
40+
41+
private static final Set<String> ENTITY_TYPES_TO_MIGRATE = ImmutableSet.of(
42+
Constants.DATASET_ENTITY_NAME,
43+
Constants.DASHBOARD_ENTITY_NAME,
44+
Constants.CHART_ENTITY_NAME,
45+
Constants.DATA_JOB_ENTITY_NAME,
46+
Constants.DATA_FLOW_ENTITY_NAME,
47+
Constants.ML_MODEL_ENTITY_NAME,
48+
Constants.ML_MODEL_GROUP_ENTITY_NAME,
49+
Constants.ML_FEATURE_TABLE_ENTITY_NAME,
50+
Constants.ML_FEATURE_ENTITY_NAME
51+
);
52+
private static final Integer BATCH_SIZE = 5000;
53+
54+
private final EntityService _entityService;
55+
private final SearchService _searchService;
56+
57+
public BackfillBrowsePathsV2Step(EntityService entityService, SearchService searchService) {
58+
_searchService = searchService;
59+
_entityService = entityService;
60+
}
61+
62+
@Override
63+
public Function<UpgradeContext, UpgradeStepResult> executable() {
64+
return (context) -> {
65+
final AuditStamp auditStamp =
66+
new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
67+
68+
String scrollId = null;
69+
for (String entityType : ENTITY_TYPES_TO_MIGRATE) {
70+
int migratedCount = 0;
71+
do {
72+
log.info(String.format("Upgrading batch %s-%s of browse paths for entity type %s", migratedCount,
73+
migratedCount + BATCH_SIZE, entityType));
74+
scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId);
75+
migratedCount += BATCH_SIZE;
76+
} while (scrollId != null);
77+
}
78+
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
79+
};
80+
}
81+
82+
private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, String scrollId) {
83+
84+
// Condition: has `browsePaths` AND does NOT have `browsePathV2`
85+
Criterion missingBrowsePathV2 = new Criterion();
86+
missingBrowsePathV2.setCondition(Condition.IS_NULL);
87+
missingBrowsePathV2.setField("browsePathV2");
88+
// Excludes entities without browsePaths
89+
Criterion hasBrowsePathV1 = new Criterion();
90+
hasBrowsePathV1.setCondition(Condition.EXISTS);
91+
hasBrowsePathV1.setField("browsePaths");
92+
93+
CriterionArray criterionArray = new CriterionArray();
94+
criterionArray.add(missingBrowsePathV2);
95+
criterionArray.add(hasBrowsePathV1);
96+
97+
ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
98+
conjunctiveCriterion.setAnd(criterionArray);
99+
100+
ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
101+
conjunctiveCriterionArray.add(conjunctiveCriterion);
102+
103+
Filter filter = new Filter();
104+
filter.setOr(conjunctiveCriterionArray);
105+
106+
final ScrollResult scrollResult = _searchService.scrollAcrossEntities(
107+
ImmutableList.of(entityType),
108+
"*",
109+
filter,
110+
null,
111+
scrollId,
112+
"5m",
113+
BATCH_SIZE,
114+
null
115+
);
116+
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
117+
return null;
118+
}
119+
120+
for (SearchEntity searchEntity : scrollResult.getEntities()) {
121+
try {
122+
ingestBrowsePathsV2(searchEntity.getEntity(), auditStamp);
123+
} catch (Exception e) {
124+
// don't stop the whole step because of one bad urn or one bad ingestion
125+
log.error(String.format("Error ingesting default browsePathsV2 aspect for urn %s", searchEntity.getEntity()), e);
126+
}
127+
}
128+
129+
return scrollResult.getScrollId();
130+
}
131+
132+
private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
133+
BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true);
134+
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
135+
MetadataChangeProposal proposal = new MetadataChangeProposal();
136+
proposal.setEntityUrn(urn);
137+
proposal.setEntityType(urn.getEntityType());
138+
proposal.setAspectName(Constants.BROWSE_PATHS_V2_ASPECT_NAME);
139+
proposal.setChangeType(ChangeType.UPSERT);
140+
proposal.setSystemMetadata(new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis()));
141+
proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2));
142+
_entityService.ingestProposal(
143+
proposal,
144+
auditStamp,
145+
false
146+
);
147+
}
148+
149+
@Override
150+
public String id() {
151+
return "BackfillBrowsePathsV2Step";
152+
}
153+
154+
/**
155+
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum retries.
156+
*/
157+
@Override
158+
public boolean isOptional() {
159+
return true;
160+
}
161+
162+
@Override
163+
public boolean skip(UpgradeContext context) {
164+
return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2));
165+
}
166+
}
167+

docker/datahub-upgrade/env/docker-without-neo4j.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ DATAHUB_GMS_HOST=datahub-gms
2020
DATAHUB_GMS_PORT=8080
2121

2222
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
23+
BACKFILL_BROWSE_PATHS_V2=true
2324

2425
# Uncomment and set these to support SSL connection to Elasticsearch
2526
# ELASTICSEARCH_USE_SSL=

docker/datahub-upgrade/env/docker.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ DATAHUB_GMS_HOST=datahub-gms
2424
DATAHUB_GMS_PORT=8080
2525

2626
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
27+
BACKFILL_BROWSE_PATHS_V2=true
2728

2829
# Uncomment and set these to support SSL connection to Elasticsearch
2930
# ELASTICSEARCH_USE_SSL=

docker/quickstart/docker-compose-m1.quickstart.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ services:
146146
- DATAHUB_GMS_HOST=datahub-gms
147147
- DATAHUB_GMS_PORT=8080
148148
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
149+
- BACKFILL_BROWSE_PATHS_V2=true
149150
hostname: datahub-upgrade
150151
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
151152
labels:

docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ services:
139139
- DATAHUB_GMS_HOST=datahub-gms
140140
- DATAHUB_GMS_PORT=8080
141141
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
142+
- BACKFILL_BROWSE_PATHS_V2=true
142143
hostname: datahub-upgrade
143144
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
144145
labels:

docker/quickstart/docker-compose-without-neo4j.quickstart.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ services:
139139
- DATAHUB_GMS_HOST=datahub-gms
140140
- DATAHUB_GMS_PORT=8080
141141
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
142+
- BACKFILL_BROWSE_PATHS_V2=true
142143
hostname: datahub-upgrade
143144
image: ${DATAHUB_UPGRADE_IMAGE:-acryldata/datahub-upgrade}:${DATAHUB_VERSION:-head}
144145
labels:

0 commit comments

Comments
 (0)