Skip to content

Commit f9db3da

Browse files
san81wandna-amazon
authored andcommitted
Confluence and CloudWatch and multiple other failing tests fix (opensearch-project#6348)
Making the tests less flaky. More reliable. Avoiding possible Out of memory issue with large pay load generation. Signed-off-by: Nathan Wand <wandna@amazon.com>
1 parent 4631ceb commit f9db3da

File tree

11 files changed

+78
-39
lines changed

11 files changed

+78
-39
lines changed

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorValidationIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ private void verifyProcessingResults(String pipelineType, int expectedTotalEvent
211211
}
212212

213213
private static void verifySingleThreadUsage() {
214+
// Wait for all processor instances to be registered (one per worker)
215+
await().atMost(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)
216+
.untilAsserted(() -> assertThat(
217+
SingleThreadEventsTrackingTestProcessor.getProcessors().size(),
218+
equalTo(4)));
219+
214220
List<SingleThreadEventsTrackingTestProcessor> singleThreadProcessors = SingleThreadEventsTrackingTestProcessor.getProcessors();
215221
assertThat(singleThreadProcessors.size(), equalTo(4));
216222
assertAll(

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsClientFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1313
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
14+
import software.amazon.awssdk.core.retry.RetryPolicy;
1415
import software.amazon.awssdk.regions.Region;
1516
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
1617
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder;
@@ -64,8 +65,12 @@ public static CloudWatchLogsClient createCwlClient(final AwsConfig awsConfig,
6465
}
6566

6667
private static ClientOverrideConfiguration createOverrideConfiguration(final Map<String, String> customHeaders) {
68+
final RetryPolicy retryPolicy = RetryPolicy.builder()
69+
.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS)
70+
.build();
71+
6772
final ClientOverrideConfiguration.Builder configBuilder = ClientOverrideConfiguration.builder()
68-
.retryPolicy(r -> r.numRetries(AwsConfig.DEFAULT_CONNECTION_ATTEMPTS));
73+
.retryPolicy(retryPolicy);
6974

7075
customHeaders.forEach(configBuilder::putHeader);
7176

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsServiceTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@
1212
import org.opensearch.dataprepper.model.event.Event;
1313
import org.opensearch.dataprepper.model.event.EventHandle;
1414
import org.opensearch.dataprepper.model.event.JacksonEvent;
15+
import org.opensearch.dataprepper.model.log.JacksonLog;
1516
import org.opensearch.dataprepper.model.record.Record;
17+
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
1618
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.Buffer;
1719
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBuffer;
1820
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.buffer.InMemoryBufferFactory;
1921
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
2022
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
2123
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
2224
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
23-
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
24-
import org.opensearch.dataprepper.model.log.JacksonLog;
2525

2626
import java.util.ArrayList;
2727
import java.util.Collection;
@@ -31,13 +31,13 @@
3131
import static org.mockito.ArgumentMatchers.any;
3232
import static org.mockito.ArgumentMatchers.eq;
3333
import static org.mockito.Mockito.atLeast;
34+
import static org.mockito.Mockito.doAnswer;
3435
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.never;
3537
import static org.mockito.Mockito.spy;
36-
import static org.mockito.Mockito.verify;
3738
import static org.mockito.Mockito.times;
38-
import static org.mockito.Mockito.never;
39+
import static org.mockito.Mockito.verify;
3940
import static org.mockito.Mockito.when;
40-
import static org.mockito.Mockito.doAnswer;
4141

4242
class CloudWatchLogsServiceTest {
4343
private static final int LARGE_THREAD_COUNT = 1000;
@@ -95,8 +95,10 @@ Collection<Record<Event>> getSampleRecordsCollection() {
9595

9696
Collection<Record<Event>> getSampleRecordsOfLargerSize() {
9797
final ArrayList<Record<Event>> returnCollection = new ArrayList<>();
98+
int messageSize = (int) (thresholdConfig.getMaxRequestSizeBytes() / 24);
9899
for (int i = 0; i < thresholdConfig.getBatchSize() * 2; i++) {
99-
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("a".repeat((int) (thresholdConfig.getMaxRequestSizeBytes()/24)));
100+
JacksonEvent mockJacksonEvent =
101+
(JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize));
100102
returnCollection.add(new Record<>(mockJacksonEvent));
101103
}
102104

@@ -105,8 +107,10 @@ Collection<Record<Event>> getSampleRecordsOfLargerSize() {
105107

106108
Collection<Record<Event>> getSampleRecordsOfLimitSize() {
107109
final ArrayList<Record<Event>> returnCollection = new ArrayList<>();
110+
int messageSize = (int) thresholdConfig.getMaxEventSizeBytes();
108111
for (int i = 0; i < thresholdConfig.getBatchSize(); i++) {
109-
JacksonEvent mockJacksonEvent = (JacksonEvent) JacksonEvent.fromMessage("testMessage".repeat((int) thresholdConfig.getMaxEventSizeBytes()));
112+
JacksonEvent mockJacksonEvent =
113+
(JacksonEvent) JacksonEvent.fromMessage(RandomStringUtils.insecure().nextAlphabetic(messageSize));
110114
returnCollection.add(new Record<>(mockJacksonEvent));
111115
}
112116

@@ -248,8 +252,8 @@ void GIVEN_large_thread_count_WHEN_processing_log_events_THEN_dispatcher_should_
248252
}
249253

250254
private Record<Event> getLargeRecord(long size) {
251-
final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.randomAlphabetic((int)size))).withEventHandle(eventHandle).build();
255+
final Event event = JacksonLog.builder().withData(Map.of("key", RandomStringUtils.insecure().nextAlphabetic((int)size))).withEventHandle(eventHandle).build();
252256

253257
return new Record<>(event);
254-
}
258+
}
255259
}

data-prepper-plugins/date-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,18 @@ public boolean isValidPatterns() {
108108
}
109109

110110
public static boolean isValidPattern(final String pattern) {
111+
// Check for valid epoch patterns first
111112
if (pattern.equals("epoch_second") ||
112113
pattern.equals("epoch_milli") ||
113114
pattern.equals("epoch_micro") ||
114115
pattern.equals("epoch_nano")) {
115116
return true;
116117
}
118+
// Reject any other pattern starting with "epoch_" as invalid
119+
if (pattern.startsWith("epoch_")) {
120+
return false;
121+
}
122+
// Validate as DateTimeFormatter pattern
117123
try {
118124
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
119125
return true;

data-prepper-plugins/date-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/date/DateProcessorConfigTest.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,28 @@ void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_recei
6969
assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false));
7070
}
7171

72-
@Test
73-
void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAccessException {
74-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", random);
75-
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));
76-
77-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_second");
78-
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
79-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_milli");
80-
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
81-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano");
82-
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
83-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_micro");
72+
@ParameterizedTest
73+
@ValueSource(strings = {
74+
"epoch_second",
75+
"epoch_milli",
76+
"epoch_nano",
77+
"epoch_micro",
78+
"yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX"
79+
})
80+
void testValidOutputFormats(String outputFormat) throws NoSuchFieldException, IllegalAccessException {
81+
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", outputFormat);
8482
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
85-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz");
83+
}
84+
85+
@ParameterizedTest
86+
@ValueSource(strings = {
87+
"invalid[pattern]format",
88+
"epoch_xyz",
89+
"epoch_invalid"
90+
})
91+
void testInvalidOutputFormats(String outputFormat) throws NoSuchFieldException, IllegalAccessException {
92+
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", outputFormat);
8693
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));
87-
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX");
88-
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
8994
}
9095

9196
@Test

data-prepper-plugins/dynamodb-source-coordination-store/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies {
1717
implementation 'software.amazon.awssdk:sts'
1818
implementation 'javax.inject:javax.inject:1'
1919
testImplementation 'com.amazonaws:DynamoDBLocal:2.2.1'
20+
testImplementation 'org.awaitility:awaitility:4.2.0'
2021
}
2122

2223
configurations {

data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreIT.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import java.util.Optional;
3737
import java.util.Random;
3838
import java.util.UUID;
39+
import java.util.concurrent.TimeUnit;
3940

41+
import static org.awaitility.Awaitility.await;
4042
import static org.hamcrest.MatcherAssert.assertThat;
4143
import static org.hamcrest.Matchers.equalTo;
4244
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -265,8 +267,16 @@ void tryAcquireAvailablePartition_gets_first_unassigned_partition() {
265267
objectUnderTest.tryCreatePartitionItem(sourceIdentifier,
266268
unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false);
267269

268-
final Optional<SourcePartitionStoreItem> maybeAcquired =
269-
objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20));
270+
// Wait for partition to be available in DynamoDB Local before attempting to acquire
271+
final Optional<SourcePartitionStoreItem>[] maybeAcquiredHolder = new Optional[]{Optional.empty()};
272+
await().atMost(5, TimeUnit.SECONDS)
273+
.pollInterval(100, TimeUnit.MILLISECONDS)
274+
.untilAsserted(() -> {
275+
maybeAcquiredHolder[0] = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20));
276+
assertThat(maybeAcquiredHolder[0].isPresent(), equalTo(true));
277+
});
278+
279+
final Optional<SourcePartitionStoreItem> maybeAcquired = maybeAcquiredHolder[0];
270280

271281
assertThat(maybeAcquired, notNullValue());
272282
assertThat(maybeAcquired.isPresent(), equalTo(true));

data-prepper-plugins/saas-source-plugins/confluence-source/src/test/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceConfigHelperTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void testValidateConfig() {
118118

119119
@Test
120120
void testValidateConfigBasic() {
121-
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://test.com");
121+
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
122122
when(confluenceSourceConfig.getAuthType()).thenReturn(BASIC);
123123
when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
124124
when(authenticationConfig.getBasicConfig()).thenReturn(basicConfig);
@@ -137,7 +137,7 @@ void testValidateConfigBasic() {
137137

138138
@Test
139139
void testValidateConfigOauth2() {
140-
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://test.com");
140+
when(confluenceSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
141141
when(confluenceSourceConfig.getAuthType()).thenReturn(OAUTH2);
142142
when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
143143
when(authenticationConfig.getOauth2Config()).thenReturn(oauth2Config);

data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraConfigHelperTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ void testGetProjectNameFilter() {
126126
void testValidateConfig() {
127127
assertThrows(RuntimeException.class, () -> JiraConfigHelper.validateConfig(jiraSourceConfig));
128128

129-
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com");
129+
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
130130
assertThrows(RuntimeException.class, () -> JiraConfigHelper.validateConfig(jiraSourceConfig));
131131

132132
when(jiraSourceConfig.getAuthType()).thenReturn("fakeType");
@@ -135,7 +135,7 @@ void testValidateConfig() {
135135

136136
@Test
137137
void testValidateConfigBasic() {
138-
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com");
138+
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
139139
when(jiraSourceConfig.getAuthType()).thenReturn(BASIC);
140140
when(jiraSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
141141
when(authenticationConfig.getBasicConfig()).thenReturn(basicConfig);
@@ -154,7 +154,7 @@ void testValidateConfigBasic() {
154154

155155
@Test
156156
void testValidateConfigOauth2() {
157-
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://test.com");
157+
when(jiraSourceConfig.getAccountUrl()).thenReturn("https://somedomain.atlassian.net");
158158
when(jiraSourceConfig.getAuthType()).thenReturn(OAUTH2);
159159
when(jiraSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
160160
when(authenticationConfig.getOauth2Config()).thenReturn(oauth2Config);

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/test/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClientTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.opensearch.dataprepper.plugins.source.microsoft_office365;
1111

1212
import io.micrometer.core.instrument.Counter;
13-
import io.micrometer.core.instrument.DistributionSummary;
1413
import io.micrometer.core.instrument.Timer;
1514
import org.junit.jupiter.api.BeforeEach;
1615
import org.junit.jupiter.api.Test;
@@ -41,9 +40,6 @@
4140
import java.util.Map;
4241
import java.util.ArrayList;
4342

44-
import org.junit.jupiter.params.ParameterizedTest;
45-
import org.junit.jupiter.params.provider.CsvSource;
46-
4743
import static org.junit.jupiter.api.Assertions.assertEquals;
4844
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4945
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -53,7 +49,6 @@
5349
import static org.mockito.ArgumentMatchers.eq;
5450
import static org.mockito.ArgumentMatchers.argThat;
5551
import static org.mockito.Mockito.times;
56-
import static org.mockito.Mockito.never;
5752
import static org.mockito.Mockito.verify;
5853
import static org.mockito.Mockito.when;
5954
import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES;

0 commit comments

Comments
 (0)