Skip to content

Commit 8079e1c

Browse files
fix(system-update): fix index prefix in new setup for es/os (#15087)
1 parent 695fb49 commit 8079e1c

File tree

2 files changed

+180
-8
lines changed

2 files changed

+180
-8
lines changed

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/CreateUsageEventIndicesStep.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
6464

6565
private void setupElasticsearchUsageEvents(String prefix, int numShards, int numReplicas)
6666
throws Exception {
67-
String prefixedPolicy = prefix + "datahub_usage_event_policy";
68-
String prefixedTemplate = prefix + "datahub_usage_event_index_template";
69-
String prefixedDataStream = prefix + "datahub_usage_event";
67+
String separator = prefix.isEmpty() ? "" : "_";
68+
String prefixedPolicy = prefix + separator + "datahub_usage_event_policy";
69+
String prefixedTemplate = prefix + separator + "datahub_usage_event_index_template";
70+
String prefixedDataStream = prefix + separator + "datahub_usage_event";
7071

7172
// Create ILM policy
7273
UsageEventIndexUtils.createIlmPolicy(esComponents, prefixedPolicy);
@@ -81,9 +82,10 @@ private void setupElasticsearchUsageEvents(String prefix, int numShards, int num
8182

8283
private void setupOpenSearchUsageEvents(String prefix, int numShards, int numReplicas)
8384
throws Exception {
84-
String prefixedPolicy = prefix + "datahub_usage_event_policy";
85-
String prefixedTemplate = prefix + "datahub_usage_event_index_template";
86-
String prefixedIndex = prefix + "datahub_usage_event-000001";
85+
String separator = prefix.isEmpty() ? "" : "_";
86+
String prefixedPolicy = prefix + separator + "datahub_usage_event_policy";
87+
String prefixedTemplate = prefix + separator + "datahub_usage_event_index_template";
88+
String prefixedIndex = prefix + separator + "datahub_usage_event-000001";
8789

8890
// Create ISM policy
8991
UsageEventIndexUtils.createIsmPolicy(esComponents, prefixedPolicy, prefix);

datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/elasticsearch/steps/CreateUsageEventIndicesStepTest.java

Lines changed: 172 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,21 @@ public void testExecutable_WithEmptyPrefix() throws Exception {
267267
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
268268
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
269269

270-
// Verify empty prefix was used
270+
// Verify empty prefix was used and no underscore separator was added
271271
Mockito.verify(index).getPrefix();
272+
273+
// Verify that the low-level requests were made with correct names (no underscore prefix)
274+
Mockito.verify(searchClient)
275+
.performLowLevelRequest(
276+
Mockito.argThat(
277+
request -> request.getEndpoint().equals("_ilm/policy/datahub_usage_event_policy")));
278+
Mockito.verify(searchClient)
279+
.performLowLevelRequest(
280+
Mockito.argThat(
281+
request ->
282+
request
283+
.getEndpoint()
284+
.equals("_index_template/datahub_usage_event_index_template")));
272285
}
273286

274287
@Test
@@ -287,8 +300,165 @@ public void testExecutable_WithNullPrefix() throws Exception {
287300
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
288301
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
289302

290-
// Verify null prefix was handled
303+
// Verify null prefix was handled and no underscore separator was added
304+
Mockito.verify(index).getPrefix();
305+
306+
// Verify that the low-level requests were made with correct names (no underscore prefix)
307+
Mockito.verify(searchClient)
308+
.performLowLevelRequest(
309+
Mockito.argThat(
310+
request -> request.getEndpoint().equals("_ilm/policy/datahub_usage_event_policy")));
311+
Mockito.verify(searchClient)
312+
.performLowLevelRequest(
313+
Mockito.argThat(
314+
request ->
315+
request
316+
.getEndpoint()
317+
.equals("_index_template/datahub_usage_event_index_template")));
318+
}
319+
320+
@Test
321+
public void testExecutable_WithNonEmptyPrefix() throws Exception {
322+
// Arrange
323+
Mockito.when(platformAnalytics.isEnabled()).thenReturn(true);
324+
Mockito.when(searchEngineType.isOpenSearch()).thenReturn(false);
325+
Mockito.when(index.getPrefix()).thenReturn("prod"); // Non-empty prefix
326+
327+
// Act
328+
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
329+
UpgradeStepResult result = executable.apply(upgradeContext);
330+
331+
// Assert
332+
Assert.assertNotNull(result);
333+
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
334+
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
335+
336+
// Verify non-empty prefix was used and underscore separator was added
291337
Mockito.verify(index).getPrefix();
338+
339+
// Verify that the low-level requests were made with correct names (with underscore prefix)
340+
Mockito.verify(searchClient)
341+
.performLowLevelRequest(
342+
Mockito.argThat(
343+
request ->
344+
request.getEndpoint().equals("_ilm/policy/prod_datahub_usage_event_policy")));
345+
Mockito.verify(searchClient)
346+
.performLowLevelRequest(
347+
Mockito.argThat(
348+
request ->
349+
request
350+
.getEndpoint()
351+
.equals("_index_template/prod_datahub_usage_event_index_template")));
352+
}
353+
354+
@Test
355+
public void testExecutable_WithSpecificPrefix() throws Exception {
356+
// Arrange
357+
Mockito.when(platformAnalytics.isEnabled()).thenReturn(true);
358+
Mockito.when(searchEngineType.isOpenSearch()).thenReturn(false);
359+
Mockito.when(index.getPrefix())
360+
.thenReturn("kbcpyv7ss3-staging-test"); // Specific prefix from issue
361+
362+
// Act
363+
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
364+
UpgradeStepResult result = executable.apply(upgradeContext);
365+
366+
// Assert
367+
Assert.assertNotNull(result);
368+
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
369+
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
370+
371+
// Verify specific prefix was used and underscore separator was added
372+
Mockito.verify(index).getPrefix();
373+
374+
// Verify that the low-level requests were made with correct names (with underscore prefix)
375+
Mockito.verify(searchClient)
376+
.performLowLevelRequest(
377+
Mockito.argThat(
378+
request ->
379+
request
380+
.getEndpoint()
381+
.equals("_ilm/policy/kbcpyv7ss3-staging-test_datahub_usage_event_policy")));
382+
Mockito.verify(searchClient)
383+
.performLowLevelRequest(
384+
Mockito.argThat(
385+
request ->
386+
request
387+
.getEndpoint()
388+
.equals(
389+
"_index_template/kbcpyv7ss3-staging-test_datahub_usage_event_index_template")));
390+
}
391+
392+
@Test
393+
public void testExecutable_OpenSearchPath_WithEmptyPrefix() throws Exception {
394+
// Arrange
395+
Mockito.when(platformAnalytics.isEnabled()).thenReturn(true);
396+
Mockito.when(searchEngineType.isOpenSearch()).thenReturn(true);
397+
Mockito.when(index.getPrefix()).thenReturn(""); // Empty prefix
398+
399+
// Act
400+
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
401+
UpgradeStepResult result = executable.apply(upgradeContext);
402+
403+
// Assert
404+
Assert.assertNotNull(result);
405+
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
406+
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
407+
408+
// Verify OpenSearch path was taken and empty prefix was used
409+
Mockito.verify(searchEngineType).isOpenSearch();
410+
Mockito.verify(index).getPrefix();
411+
412+
// Verify that the low-level requests were made with correct names (no underscore prefix)
413+
Mockito.verify(searchClient)
414+
.performLowLevelRequest(
415+
Mockito.argThat(
416+
request ->
417+
request
418+
.getEndpoint()
419+
.equals("_plugins/_ism/policies/datahub_usage_event_policy")));
420+
Mockito.verify(searchClient)
421+
.performLowLevelRequest(
422+
Mockito.argThat(
423+
request ->
424+
request.getEndpoint().equals("_template/datahub_usage_event_index_template")));
425+
}
426+
427+
@Test
428+
public void testExecutable_OpenSearchPath_WithNonEmptyPrefix() throws Exception {
429+
// Arrange
430+
Mockito.when(platformAnalytics.isEnabled()).thenReturn(true);
431+
Mockito.when(searchEngineType.isOpenSearch()).thenReturn(true);
432+
Mockito.when(index.getPrefix()).thenReturn("prod"); // Non-empty prefix
433+
434+
// Act
435+
Function<UpgradeContext, UpgradeStepResult> executable = step.executable();
436+
UpgradeStepResult result = executable.apply(upgradeContext);
437+
438+
// Assert
439+
Assert.assertNotNull(result);
440+
Assert.assertEquals(result.stepId(), "CreateUsageEventIndicesStep");
441+
Assert.assertEquals(result.result(), DataHubUpgradeState.SUCCEEDED);
442+
443+
// Verify OpenSearch path was taken and non-empty prefix was used
444+
Mockito.verify(searchEngineType).isOpenSearch();
445+
Mockito.verify(index).getPrefix();
446+
447+
// Verify that the low-level requests were made with correct names (with underscore prefix)
448+
Mockito.verify(searchClient)
449+
.performLowLevelRequest(
450+
Mockito.argThat(
451+
request ->
452+
request
453+
.getEndpoint()
454+
.equals("_plugins/_ism/policies/prod_datahub_usage_event_policy")));
455+
Mockito.verify(searchClient)
456+
.performLowLevelRequest(
457+
Mockito.argThat(
458+
request ->
459+
request
460+
.getEndpoint()
461+
.equals("_template/prod_datahub_usage_event_index_template")));
292462
}
293463

294464
@Test

0 commit comments

Comments
 (0)