Skip to content

Commit eeb2c88

Browse files
feat(search): unified entity index (#14966)
1 parent a6222ca commit eeb2c88

File tree

231 files changed

+21643
-2317
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

231 files changed

+21643
-2317
lines changed

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/types/mappers/MapperUtilsTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
11
package com.linkedin.datahub.graphql.types.mappers;
22

3-
import static org.mockito.Mockito.mock;
4-
import static org.mockito.Mockito.when;
53
import static org.testng.Assert.assertEquals;
64
import static org.testng.Assert.assertThrows;
75

86
import com.linkedin.common.urn.Urn;
97
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
108
import com.linkedin.datahub.graphql.QueryContext;
9+
import com.linkedin.datahub.graphql.TestUtils;
1110
import com.linkedin.datahub.graphql.generated.MatchedField;
1211
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
1312
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
1413
import com.linkedin.metadata.models.registry.EntityRegistry;
1514
import com.linkedin.metadata.snapshot.Snapshot;
16-
import io.datahubproject.test.metadata.context.TestOperationContexts;
1715
import java.net.URISyntaxException;
1816
import java.util.List;
1917
import org.testng.annotations.BeforeTest;
@@ -44,9 +42,7 @@ public void testMatchedFieldValidation() throws URISyntaxException {
4442
IllegalArgumentException.class,
4543
() -> ValidationApiUtils.validateUrn(entityRegistry, invalidUrn));
4644

47-
QueryContext mockContext = mock(QueryContext.class);
48-
when(mockContext.getOperationContext())
49-
.thenReturn(TestOperationContexts.systemContextNoSearchAuthorization(entityRegistry));
45+
QueryContext mockContext = TestUtils.getMockAllowContext();
5046

5147
List<MatchedField> actualMatched =
5248
MapperUtils.getMatchedFieldEntry(

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/loadindices/LoadIndicesIndexManager.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,25 @@ public LoadIndicesIndexManager(
5959
public List<ReindexConfig> discoverDataHubIndexConfigs() throws IOException {
6060
List<ReindexConfig> configs = new ArrayList<>();
6161

62-
// Get entity indices using IndexConvention pattern
63-
String entityPattern = indexConvention.getAllEntityIndicesPattern();
64-
log.debug("Querying entity indices with pattern: {}", entityPattern);
65-
GetIndexRequest entityRequest = new GetIndexRequest(entityPattern);
66-
GetIndexResponse entityResponse = searchClient.getIndex(entityRequest, RequestOptions.DEFAULT);
67-
String[] entityIndices = entityResponse.getIndices();
68-
69-
for (String indexName : entityIndices) {
70-
try {
71-
ReindexConfig config = indexBuilder.buildReindexState(indexName, Map.of(), Map.of());
72-
configs.add(config);
73-
log.debug("Added entity index config: {}", indexName);
74-
} catch (IOException e) {
75-
log.warn(
76-
"Failed to build reindex config for entity index {}: {}", indexName, e.getMessage());
62+
// Get entity indices using IndexConvention patterns
63+
List<String> entityPatterns = indexConvention.getAllEntityIndicesPatterns();
64+
log.debug("Querying entity indices with patterns: {}", entityPatterns);
65+
66+
for (String entityPattern : entityPatterns) {
67+
GetIndexRequest entityRequest = new GetIndexRequest(entityPattern);
68+
GetIndexResponse entityResponse =
69+
searchClient.getIndex(entityRequest, RequestOptions.DEFAULT);
70+
String[] entityIndices = entityResponse.getIndices();
71+
72+
for (String indexName : entityIndices) {
73+
try {
74+
ReindexConfig config = indexBuilder.buildReindexState(indexName, Map.of(), Map.of());
75+
configs.add(config);
76+
log.debug("Added entity index config: {}", indexName);
77+
} catch (IOException e) {
78+
log.warn(
79+
"Failed to build reindex config for entity index {}: {}", indexName, e.getMessage());
80+
}
7781
}
7882
}
7983

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/cron/steps/TweakReplicasStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
5757
CronArgs args = createArgs(context);
5858
try {
5959
for (ElasticSearchIndexed service : services) {
60-
service.tweakReplicasAll(structuredProperties, args.dryRun);
60+
service.tweakReplicasAll(context.opContext(), structuredProperties, args.dryRun);
6161
}
6262
} catch (Exception e) {
6363
log.error("TweakReplicasStep failed.", e);

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/BuildIndicesPostStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
5050
try {
5151

5252
List<ReindexConfig> indexConfigs =
53-
getAllReindexConfigs(services, structuredProperties).stream()
53+
getAllReindexConfigs(context.opContext(), services, structuredProperties).stream()
5454
.filter(ReindexConfig::requiresReindex)
5555
.collect(Collectors.toList());
5656

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/BuildIndicesPreStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
5353
return (context) -> {
5454
try {
5555
final List<ReindexConfig> reindexConfigs =
56-
getAllReindexConfigs(services, structuredProperties);
56+
getAllReindexConfigs(context.opContext(), services, structuredProperties);
5757

5858
// Get indices to update
5959
List<ReindexConfig> indexConfigs =

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/BuildIndicesStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
3737
return (context) -> {
3838
try {
3939
for (ElasticSearchIndexed service : services) {
40-
service.reindexAll(structuredProperties);
40+
service.reindexAll(context.opContext(), structuredProperties);
4141
}
4242
} catch (Exception e) {
4343
log.error("BuildIndicesStep failed.", e);

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/CleanIndicesStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
5151
return (context) -> {
5252
try {
5353
// Each service enumerates and cleans up their own indices
54-
IndexUtils.getAllReindexConfigs(indexedServices, structuredProperties)
54+
IndexUtils.getAllReindexConfigs(context.opContext(), indexedServices, structuredProperties)
5555
.forEach(
5656
reindexConfig ->
5757
ESIndexBuilder.cleanOrphanedIndices(searchClient, esConfig, reindexConfig));

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/CreateUsageEventIndicesStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
4747
configurationProvider.getElasticSearch().getIndex().getFinalPrefix();
4848

4949
boolean useOpenSearch = esComponents.getSearchClient().getEngineType().isOpenSearch();
50-
int numShards = esComponents.getIndexBuilder().getNumShards();
51-
int numReplicas = esComponents.getIndexBuilder().getNumReplicas();
50+
int numShards = configurationProvider.getElasticSearch().getIndex().getNumShards();
51+
int numReplicas = configurationProvider.getElasticSearch().getIndex().getNumReplicas();
5252

5353
if (useOpenSearch) {
5454
setupOpenSearchUsageEvents(indexPrefix, numShards, numReplicas, context.opContext());

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/ReindexDebugStep.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.linkedin.structured.StructuredPropertyDefinition;
1313
import com.linkedin.upgrade.DataHubUpgradeState;
1414
import com.linkedin.util.Pair;
15+
import io.datahubproject.metadata.context.OperationContext;
1516
import java.io.IOException;
1617
import java.util.List;
1718
import java.util.Map;
@@ -80,7 +81,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
8081
log.error("ReindexDebugStep failed: No ElasticSearchService found");
8182
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
8283
}
83-
setConfig(args.index);
84+
setConfig(context.opContext(), args.index);
8485
if (config == null) {
8586
log.error("ReindexDebugStep failed: No matching config found for index: {}", args.index);
8687
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
@@ -99,10 +100,11 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
99100
};
100101
}
101102

102-
void setConfig(String targetIndex) throws IOException, IllegalAccessException {
103+
void setConfig(OperationContext opContext, String targetIndex)
104+
throws IOException, IllegalAccessException {
103105
// datahubpolicyindex_v2 has some docs upon starting quickdebug...
104106
// String targetIndex = "datahubpolicyindex_v2";
105-
List<ReindexConfig> configs = service.buildReindexConfigs(structuredProperties);
107+
List<ReindexConfig> configs = service.buildReindexConfigs(opContext, structuredProperties);
106108
config = null; // Reset config to null
107109
for (ReindexConfig cfg : configs) {
108110
String cfgname = cfg.name();

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/util/IndexUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,16 @@ private IndexUtils() {}
5454
private static List<ReindexConfig> _reindexConfigs = new ArrayList<>();
5555

5656
public static List<ReindexConfig> getAllReindexConfigs(
57+
OperationContext opContext,
5758
List<ElasticSearchIndexed> elasticSearchIndexedList,
5859
Collection<Pair<Urn, StructuredPropertyDefinition>> structuredProperties)
5960
throws IOException {
6061
// Avoid locking & reprocessing
6162
List<ReindexConfig> reindexConfigs = new ArrayList<>(_reindexConfigs);
6263
if (reindexConfigs.isEmpty()) {
6364
for (ElasticSearchIndexed elasticSearchIndexed : elasticSearchIndexedList) {
64-
reindexConfigs.addAll(elasticSearchIndexed.buildReindexConfigs(structuredProperties));
65+
reindexConfigs.addAll(
66+
elasticSearchIndexed.buildReindexConfigs(opContext, structuredProperties));
6567
}
6668
_reindexConfigs = new ArrayList<>(reindexConfigs);
6769
}

0 commit comments

Comments
 (0)