Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -24,19 +25,28 @@
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import software.amazon.awssdk.core.pagination.sync.SdkIterable;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbIndex;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable;
import software.amazon.awssdk.enhanced.dynamodb.Key;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.Page;
import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional;
import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -50,6 +60,8 @@
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sourcecoordinator.dynamodb.DynamoDbClientWrapper.SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX;
import static org.opensearch.dataprepper.plugins.sourcecoordinator.dynamodb.DynamoDbSourceCoordinationStore.SOURCE_STATUS_COMBINATION_KEY_FORMAT;

@ExtendWith(MockitoExtension.class)
class DynamoDbSourceCoordinationStoreIT {
Expand Down Expand Up @@ -93,8 +105,8 @@ void setUp() {
when(dynamoStoreSettings.getRegion()).thenReturn(region);
when(dynamoStoreSettings.getStsRoleArn()).thenReturn(stsRoleArn);
when(dynamoStoreSettings.getStsExternalId()).thenReturn(stsExternalId);
when(dynamoStoreSettings.getProvisionedReadCapacityUnits()).thenReturn(1L);
when(dynamoStoreSettings.getProvisionedWriteCapacityUnits()).thenReturn(1L);
when(dynamoStoreSettings.getProvisionedReadCapacityUnits()).thenReturn(10L);
when(dynamoStoreSettings.getProvisionedWriteCapacityUnits()).thenReturn(10L);

when(dynamoDbClientFactory.provideDynamoDbClient(region, stsRoleArn, stsExternalId)).thenReturn(dynamoDbClient);

Expand Down Expand Up @@ -251,45 +263,58 @@ void tryCreatePartitionItem_creates_an_item() {
assertThat(getItem.getExpirationTime(), lessThanOrEqualTo(Instant.now().getEpochSecond()));
}

@Disabled("This test is flaky on the current version of DynamoDB Local. However, newer versions require JDK 17+.")
@Test
void tryAcquireAvailablePartition_gets_first_unassigned_partition() {
void tryAcquireAvailablePartition_gets_first_unassigned_partition() throws InterruptedException {
final DynamoDbSourceCoordinationStore objectUnderTest = createObjectUnderTest();
final String partitionProgressState = UUID.randomUUID().toString();

final String unassignedPartitionKey1 = UUID.randomUUID().toString();
final String unassignedPartitionKey2 = UUID.randomUUID().toString();
final String unassignedPartitionKey3 = UUID.randomUUID().toString();
final List<String> partitionKeys = IntStream.rangeClosed(1, 3)
.mapToObj(i -> UUID.randomUUID() + "_" + i)
.collect(Collectors.toList());

objectUnderTest.tryCreatePartitionItem(sourceIdentifier,
unassignedPartitionKey1, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false);
objectUnderTest.tryCreatePartitionItem(sourceIdentifier,
unassignedPartitionKey2, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false);
objectUnderTest.tryCreatePartitionItem(sourceIdentifier,
unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false);
for (final String partitionKey : partitionKeys) {
final boolean createSuccess = objectUnderTest.tryCreatePartitionItem(sourceIdentifier,
partitionKey, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false);
assertThat(createSuccess, equalTo(true));
Thread.sleep(150);
}

// Wait for partition to be available in DynamoDB Local before attempting to acquire
final Optional<SourcePartitionStoreItem>[] maybeAcquiredHolder = new Optional[]{Optional.empty()};
await().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
maybeAcquiredHolder[0] = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20));
assertThat(maybeAcquiredHolder[0].isPresent(), equalTo(true));
});
await().atMost(10, TimeUnit.SECONDS).until(() -> {
final String primaryKey = String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED);
final Stream<DynamoDbSourcePartitionItem> items = querySourceStatusIndex(primaryKey);

final Optional<SourcePartitionStoreItem> maybeAcquired = maybeAcquiredHolder[0];
return items.count() == partitionKeys.size();
});

final Optional<SourcePartitionStoreItem> maybeAcquired = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20));

assertThat(maybeAcquired, notNullValue());
assertThat(maybeAcquired.isPresent(), equalTo(true));

final SourcePartitionStoreItem acquiredItem = maybeAcquired.get();

assertThat(acquiredItem, notNullValue());
final String unassignedPartitionKey1 = partitionKeys.get(0);

assertThat(acquiredItem.getSourceIdentifier(), equalTo(sourceIdentifier));
assertThat(acquiredItem.getSourcePartitionKey(), equalTo(unassignedPartitionKey1));
assertThat(acquiredItem.getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED));
assertThat(acquiredItem.getPartitionOwner(), equalTo(ownerId));
}

private Stream<DynamoDbSourcePartitionItem> querySourceStatusIndex(final String partitionKey) {
final DynamoDbIndex<DynamoDbSourcePartitionItem> sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX);
final QueryEnhancedRequest queryEnhancedRequest = QueryEnhancedRequest.builder()
.limit(1)
.queryConditional(QueryConditional.keyEqualTo(Key.builder().partitionValue(partitionKey).build()))
.build();

final SdkIterable<Page<DynamoDbSourcePartitionItem>> pages = sourceStatusIndex.query(queryEnhancedRequest);

return pages.stream()
.flatMap(page -> page.items().stream());
}

private DynamoDbSourcePartitionItem putDynamoDbSourcePartitionItem(final SourcePartitionStatus sourcePartitionStatus) {
final DynamoDbSourcePartitionItem putItem = createUnsavedPartitionItem(sourcePartitionStatus);
table.putItem(putItem);
Expand Down
Loading