|
14 | 14 | import org.junit.jupiter.api.AfterAll; |
15 | 15 | import org.junit.jupiter.api.BeforeAll; |
16 | 16 | import org.junit.jupiter.api.BeforeEach; |
| 17 | +import org.junit.jupiter.api.Disabled; |
17 | 18 | import org.junit.jupiter.api.Test; |
18 | 19 | import org.junit.jupiter.api.extension.ExtendWith; |
19 | 20 | import org.junit.jupiter.params.ParameterizedTest; |
|
24 | 25 | import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; |
25 | 26 | import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; |
26 | 27 | import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; |
| 28 | +import software.amazon.awssdk.core.pagination.sync.SdkIterable; |
27 | 29 | import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient; |
| 30 | +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbIndex; |
28 | 31 | import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable; |
29 | 32 | import software.amazon.awssdk.enhanced.dynamodb.Key; |
30 | 33 | import software.amazon.awssdk.enhanced.dynamodb.TableSchema; |
31 | 34 | import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest; |
| 35 | +import software.amazon.awssdk.enhanced.dynamodb.model.Page; |
| 36 | +import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional; |
| 37 | +import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest; |
32 | 38 | import software.amazon.awssdk.services.dynamodb.DynamoDbClient; |
33 | 39 |
|
34 | 40 | import java.time.Duration; |
35 | 41 | import java.time.Instant; |
| 42 | +import java.util.List; |
36 | 43 | import java.util.Optional; |
37 | 44 | import java.util.Random; |
38 | 45 | import java.util.UUID; |
39 | 46 | import java.util.concurrent.TimeUnit; |
| 47 | +import java.util.stream.Collectors; |
| 48 | +import java.util.stream.IntStream; |
| 49 | +import java.util.stream.Stream; |
40 | 50 |
|
41 | 51 | import static org.awaitility.Awaitility.await; |
42 | 52 | import static org.hamcrest.MatcherAssert.assertThat; |
|
50 | 60 | import static org.hamcrest.Matchers.sameInstance; |
51 | 61 | import static org.junit.jupiter.api.Assertions.assertThrows; |
52 | 62 | import static org.mockito.Mockito.when; |
| 63 | +import static org.opensearch.dataprepper.plugins.sourcecoordinator.dynamodb.DynamoDbClientWrapper.SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX; |
| 64 | +import static org.opensearch.dataprepper.plugins.sourcecoordinator.dynamodb.DynamoDbSourceCoordinationStore.SOURCE_STATUS_COMBINATION_KEY_FORMAT; |
53 | 65 |
|
54 | 66 | @ExtendWith(MockitoExtension.class) |
55 | 67 | class DynamoDbSourceCoordinationStoreIT { |
@@ -93,8 +105,8 @@ void setUp() { |
93 | 105 | when(dynamoStoreSettings.getRegion()).thenReturn(region); |
94 | 106 | when(dynamoStoreSettings.getStsRoleArn()).thenReturn(stsRoleArn); |
95 | 107 | when(dynamoStoreSettings.getStsExternalId()).thenReturn(stsExternalId); |
96 | | - when(dynamoStoreSettings.getProvisionedReadCapacityUnits()).thenReturn(1L); |
97 | | - when(dynamoStoreSettings.getProvisionedWriteCapacityUnits()).thenReturn(1L); |
| 108 | + when(dynamoStoreSettings.getProvisionedReadCapacityUnits()).thenReturn(10L); |
| 109 | + when(dynamoStoreSettings.getProvisionedWriteCapacityUnits()).thenReturn(10L); |
98 | 110 |
|
99 | 111 | when(dynamoDbClientFactory.provideDynamoDbClient(region, stsRoleArn, stsExternalId)).thenReturn(dynamoDbClient); |
100 | 112 |
|
@@ -251,45 +263,58 @@ void tryCreatePartitionItem_creates_an_item() { |
251 | 263 | assertThat(getItem.getExpirationTime(), lessThanOrEqualTo(Instant.now().getEpochSecond())); |
252 | 264 | } |
253 | 265 |
|
| 266 | + @Disabled("This test is flaky on the current version of DynamoDB Local. However, newer versions require JDK 17+.") |
254 | 267 | @Test |
255 | | - void tryAcquireAvailablePartition_gets_first_unassigned_partition() { |
| 268 | + void tryAcquireAvailablePartition_gets_first_unassigned_partition() throws InterruptedException { |
256 | 269 | final DynamoDbSourceCoordinationStore objectUnderTest = createObjectUnderTest(); |
257 | 270 | final String partitionProgressState = UUID.randomUUID().toString(); |
258 | 271 |
|
259 | | - final String unassignedPartitionKey1 = UUID.randomUUID().toString(); |
260 | | - final String unassignedPartitionKey2 = UUID.randomUUID().toString(); |
261 | | - final String unassignedPartitionKey3 = UUID.randomUUID().toString(); |
| 272 | + final List<String> partitionKeys = IntStream.rangeClosed(1, 3) |
| 273 | + .mapToObj(i -> UUID.randomUUID() + "_" + i) |
| 274 | + .collect(Collectors.toList()); |
262 | 275 |
|
263 | | - objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
264 | | - unassignedPartitionKey1, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
265 | | - objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
266 | | - unassignedPartitionKey2, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
267 | | - objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
268 | | - unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
| 276 | + for (final String partitionKey : partitionKeys) { |
| 277 | + final boolean createSuccess = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
| 278 | + partitionKey, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
| 279 | + assertThat(createSuccess, equalTo(true)); |
| 280 | + Thread.sleep(150); |
| 281 | + } |
269 | 282 |
|
270 | | - // Wait for partition to be available in DynamoDB Local before attempting to acquire |
271 | | - final Optional<SourcePartitionStoreItem>[] maybeAcquiredHolder = new Optional[]{Optional.empty()}; |
272 | | - await().atMost(5, TimeUnit.SECONDS) |
273 | | - .pollInterval(100, TimeUnit.MILLISECONDS) |
274 | | - .untilAsserted(() -> { |
275 | | - maybeAcquiredHolder[0] = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); |
276 | | - assertThat(maybeAcquiredHolder[0].isPresent(), equalTo(true)); |
277 | | - }); |
| 283 | + await().atMost(10, TimeUnit.SECONDS).until(() -> { |
| 284 | + final String primaryKey = String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED); |
| 285 | + final Stream<DynamoDbSourcePartitionItem> items = querySourceStatusIndex(primaryKey); |
278 | 286 |
|
279 | | - final Optional<SourcePartitionStoreItem> maybeAcquired = maybeAcquiredHolder[0]; |
| 287 | + return items.count() == partitionKeys.size(); |
| 288 | + }); |
| 289 | + |
| 290 | + final Optional<SourcePartitionStoreItem> maybeAcquired = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); |
280 | 291 |
|
281 | 292 | assertThat(maybeAcquired, notNullValue()); |
282 | 293 | assertThat(maybeAcquired.isPresent(), equalTo(true)); |
283 | | - |
284 | 294 | final SourcePartitionStoreItem acquiredItem = maybeAcquired.get(); |
285 | 295 |
|
286 | 296 | assertThat(acquiredItem, notNullValue()); |
| 297 | + final String unassignedPartitionKey1 = partitionKeys.get(0); |
| 298 | + |
287 | 299 | assertThat(acquiredItem.getSourceIdentifier(), equalTo(sourceIdentifier)); |
288 | 300 | assertThat(acquiredItem.getSourcePartitionKey(), equalTo(unassignedPartitionKey1)); |
289 | 301 | assertThat(acquiredItem.getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); |
290 | 302 | assertThat(acquiredItem.getPartitionOwner(), equalTo(ownerId)); |
291 | 303 | } |
292 | 304 |
|
| 305 | + private Stream<DynamoDbSourcePartitionItem> querySourceStatusIndex(final String partitionKey) { |
| 306 | + final DynamoDbIndex<DynamoDbSourcePartitionItem> sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX); |
| 307 | + final QueryEnhancedRequest queryEnhancedRequest = QueryEnhancedRequest.builder() |
| 308 | + .limit(1) |
| 309 | + .queryConditional(QueryConditional.keyEqualTo(Key.builder().partitionValue(partitionKey).build())) |
| 310 | + .build(); |
| 311 | + |
| 312 | + final SdkIterable<Page<DynamoDbSourcePartitionItem>> pages = sourceStatusIndex.query(queryEnhancedRequest); |
| 313 | + |
| 314 | + return pages.stream() |
| 315 | + .flatMap(page -> page.items().stream()); |
| 316 | + } |
| 317 | + |
293 | 318 | private DynamoDbSourcePartitionItem putDynamoDbSourcePartitionItem(final SourcePartitionStatus sourcePartitionStatus) { |
294 | 319 | final DynamoDbSourcePartitionItem putItem = createUnsavedPartitionItem(sourcePartitionStatus); |
295 | 320 | table.putItem(putItem); |
|
0 commit comments