44import com .linkedin .datahub .upgrade .UpgradeStep ;
55import com .linkedin .datahub .upgrade .UpgradeStepResult ;
66import com .linkedin .datahub .upgrade .impl .DefaultUpgradeStepResult ;
7+ import com .linkedin .datahub .upgrade .system .elasticsearch .util .IndexUtils ;
78import com .linkedin .datahub .upgrade .system .elasticsearch .util .UsageEventIndexUtils ;
89import com .linkedin .gms .factory .config .ConfigurationProvider ;
910import com .linkedin .gms .factory .search .BaseElasticSearchComponentsFactory ;
1011import com .linkedin .upgrade .DataHubUpgradeState ;
12+ import io .datahubproject .metadata .context .OperationContext ;
1113import java .util .function .Function ;
1214import lombok .RequiredArgsConstructor ;
1315import lombok .extern .slf4j .Slf4j ;
@@ -28,28 +30,32 @@ public int retryCount() {
2830 return 3 ;
2931 }
3032
33+ @ Override
34+ public boolean skip (UpgradeContext context ) {
35+ boolean analyticsEnabled = configurationProvider .getPlatformAnalytics ().isEnabled ();
36+ if (!analyticsEnabled ) {
37+ log .info ("DataHub analytics is disabled, skipping usage event index setup" );
38+ }
39+ return !analyticsEnabled ;
40+ }
41+
3142 @ Override
3243 public Function <UpgradeContext , UpgradeStepResult > executable () {
3344 return (context ) -> {
3445 try {
35- boolean analyticsEnabled = configurationProvider . getPlatformAnalytics (). isEnabled ();
46+
3647 String indexPrefix = configurationProvider .getElasticSearch ().getIndex ().getPrefix ();
3748 // Handle null prefix by converting to empty string
3849 if (indexPrefix == null ) {
3950 indexPrefix = "" ;
4051 }
4152
42- if (!analyticsEnabled ) {
43- log .info ("DataHub analytics is disabled, skipping usage event index setup" );
44- return new DefaultUpgradeStepResult (id (), DataHubUpgradeState .SUCCEEDED );
45- }
46-
4753 boolean useOpenSearch = esComponents .getSearchClient ().getEngineType ().isOpenSearch ();
4854 int numShards = esComponents .getIndexBuilder ().getNumShards ();
4955 int numReplicas = esComponents .getIndexBuilder ().getNumReplicas ();
5056
5157 if (useOpenSearch ) {
52- setupOpenSearchUsageEvents (indexPrefix , numShards , numReplicas );
58+ setupOpenSearchUsageEvents (indexPrefix , numShards , numReplicas , context . opContext () );
5359 } else {
5460 setupElasticsearchUsageEvents (indexPrefix , numShards , numReplicas );
5561 }
@@ -64,10 +70,10 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
6470
6571 private void setupElasticsearchUsageEvents (String prefix , int numShards , int numReplicas )
6672 throws Exception {
67- String separator = prefix . isEmpty () ? "" : "_" ;
68- String prefixedPolicy = prefix + separator + "datahub_usage_event_policy" ;
69- String prefixedTemplate = prefix + separator + "datahub_usage_event_index_template" ;
70- String prefixedDataStream = prefix + separator + "datahub_usage_event" ;
73+ String prefixedPolicy = IndexUtils . createPrefixedName ( prefix , "datahub_usage_event_policy" ) ;
74+ String prefixedTemplate =
75+ IndexUtils . createPrefixedName ( prefix , "datahub_usage_event_index_template" ) ;
76+ String prefixedDataStream = IndexUtils . createPrefixedName ( prefix , "datahub_usage_event" ) ;
7177
7278 // Create ILM policy
7379 UsageEventIndexUtils .createIlmPolicy (esComponents , prefixedPolicy );
@@ -80,21 +86,36 @@ private void setupElasticsearchUsageEvents(String prefix, int numShards, int num
8086 UsageEventIndexUtils .createDataStream (esComponents , prefixedDataStream );
8187 }
8288
83- private void setupOpenSearchUsageEvents (String prefix , int numShards , int numReplicas )
89+ private void setupOpenSearchUsageEvents (
90+ String prefix , int numShards , int numReplicas , OperationContext operationContext )
8491 throws Exception {
85- String separator = prefix .isEmpty () ? "" : "_" ;
86- String prefixedPolicy = prefix + separator + "datahub_usage_event_policy" ;
87- String prefixedTemplate = prefix + separator + "datahub_usage_event_index_template" ;
88- String prefixedIndex = prefix + separator + "datahub_usage_event-000001" ;
89-
90- // Create ISM policy
91- UsageEventIndexUtils .createIsmPolicy (esComponents , prefixedPolicy , prefix );
92-
93- // Create index template
94- UsageEventIndexUtils .createOpenSearchIndexTemplate (
95- esComponents , prefixedTemplate , numShards , numReplicas , prefix );
96-
97- // Create initial index
98- UsageEventIndexUtils .createOpenSearchIndex (esComponents , prefixedIndex , prefix );
92+ String prefixedPolicy = IndexUtils .createPrefixedName (prefix , "datahub_usage_event_policy" );
93+ String prefixedTemplate =
94+ IndexUtils .createPrefixedName (prefix , "datahub_usage_event_index_template" );
95+ String prefixedIndex = IndexUtils .createPrefixedName (prefix , "datahub_usage_event-000001" );
96+
97+ // Create ISM policy (both AWS and self-hosted OpenSearch use the same format)
98+ boolean policyCreated =
99+ UsageEventIndexUtils .createIsmPolicy (
100+ esComponents , prefixedPolicy , prefix , operationContext );
101+ log .info ("ISM policy creation result: {}" , policyCreated );
102+
103+ if (policyCreated ) {
104+ log .info ("ISM policy created successfully, proceeding with template and index creation" );
105+
106+ // Create index template (both AWS and self-hosted OpenSearch use the same format and
107+ // endpoint)
108+ log .info ("Creating index template: {}" , prefixedTemplate );
109+ UsageEventIndexUtils .createOpenSearchIndexTemplate (
110+ esComponents , prefixedTemplate , numShards , numReplicas , prefix );
111+
112+ // Create initial numbered index (both AWS and self-hosted OpenSearch use the same approach)
113+ log .info ("Creating initial index: {}" , prefixedIndex );
114+ UsageEventIndexUtils .createOpenSearchIndex (esComponents , prefixedIndex , prefix );
115+ } else {
116+ log .warn (
117+ "ISM policy creation failed or is not supported. Skipping template and index creation to avoid configuration issues." );
118+ log .info ("Usage event tracking will not be available without proper policy configuration." );
119+ }
99120 }
100121}
0 commit comments