Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit 98272bf

Browse files
Merge pull request #89 from microsoft/feature/62-poll-for-completion
Consumer: poll the data destination for completion
2 parents ddd726f + be9b84b commit 98272bf

File tree

55 files changed

+1345
-200
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1345
-200
lines changed

.github/workflows/verify.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ jobs:
1616
- name: Gradle Test Core
1717
env:
1818
AZ_STORAGE_SAS: ${{ secrets.AZ_STORAGE_SAS }}
19+
AZ_STORAGE_KEY: ${{ secrets.AZ_STORAGE_KEY }}
1920
NIFI_API_AUTH: ${{ secrets.NIFI_API_AUTH }}
2021
S3_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_ACCESS_KEY }}
2122
S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}

common/build.gradle.kts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,19 @@
55

66
plugins {
77
`java-library`
8+
`java-test-fixtures`
89
}
910

11+
val storageBlobVersion: String by project;
12+
1013
dependencies {
1114
api(project(":spi"))
12-
15+
16+
testFixturesApi(platform("com.amazonaws:aws-java-sdk-bom:1.11.1018"))
17+
testFixturesApi("com.amazonaws:aws-java-sdk-s3")
18+
testFixturesApi("com.azure:azure-storage-blob:${storageBlobVersion}")
19+
20+
testFixturesImplementation("org.junit.jupiter:junit-jupiter-api:5.5.2")
21+
testFixturesRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.5.2")
1322
}
1423

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*
5+
*/
6+
7+
package com.microsoft.dagx.common.testfixtures;
8+
9+
import com.azure.core.credential.AzureSasCredential;
10+
import com.azure.storage.blob.BlobContainerClient;
11+
import com.azure.storage.blob.BlobServiceClient;
12+
import com.azure.storage.blob.BlobServiceClientBuilder;
13+
import org.jetbrains.annotations.NotNull;
14+
import org.junit.jupiter.api.AfterEach;
15+
import org.junit.jupiter.api.BeforeEach;
16+
17+
import java.io.File;
18+
import java.util.Objects;
19+
import java.util.UUID;
20+
21+
import static com.microsoft.dagx.common.ConfigurationFunctions.propOrEnv;
22+
import static org.junit.jupiter.api.Assertions.fail;
23+
24+
public class AbstractAzureBlobTest {
25+
26+
protected static final String accountName = "dagxblobstoreitest";
27+
protected static BlobServiceClient blobServiceClient;
28+
protected String containerName;
29+
protected boolean reuseClient = true;
30+
protected String testRunId;
31+
32+
@BeforeEach
33+
public void setupClient() {
34+
35+
testRunId = UUID.randomUUID().toString();
36+
containerName = "fetch-azure-processor-" + testRunId;
37+
38+
if (blobServiceClient == null || !reuseClient) {
39+
final String accountSas = getSasToken();
40+
blobServiceClient = new BlobServiceClientBuilder().credential(new AzureSasCredential(accountSas)).endpoint("https://" + accountName + ".blob.core.windows.net").buildClient();
41+
}
42+
43+
if (blobServiceClient.getBlobContainerClient(containerName).exists()) {
44+
fail("Container " + containerName + " already exists - tests will fail!");
45+
}
46+
47+
//create container
48+
BlobContainerClient blobContainerClient = blobServiceClient.createBlobContainer(containerName);
49+
if (!blobContainerClient.exists()) {
50+
fail("Setup incomplete, tests will fail");
51+
52+
}
53+
}
54+
55+
@AfterEach
56+
public void teardown() {
57+
try {
58+
blobServiceClient.deleteBlobContainer(containerName);
59+
} catch (Exception ex) {
60+
fail("teardown failed, subsequent tests might fail as well!");
61+
}
62+
}
63+
64+
@NotNull
65+
protected String getSasToken() {
66+
return Objects.requireNonNull(propOrEnv("AZ_STORAGE_SAS", null), "AZ_STORAGE_SAS");
67+
}
68+
69+
protected void putBlob(String name, File file) {
70+
blobServiceClient.getBlobContainerClient(containerName)
71+
.getBlobClient(name)
72+
.uploadFromFile(file.getAbsolutePath(), true);
73+
}
74+
}
Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
*
55
*/
66

7-
package microsoft.dagx.transfer.nifi.processors;
7+
package com.microsoft.dagx.common.testfixtures;
88

99
import com.amazonaws.auth.AWSCredentials;
1010
import com.amazonaws.auth.AWSStaticCredentialsProvider;
@@ -22,9 +22,12 @@
2222
import static com.microsoft.dagx.common.ConfigurationFunctions.propOrEnv;
2323
import static org.junit.jupiter.api.Assertions.fail;
2424

25-
public class AbstractS3Test {
25+
/**
26+
* Base class for tests that need an S3 bucket created and deleted on every test run.
27+
*/
28+
public abstract class AbstractS3Test {
2629

27-
protected final static String REGION = System.getProperty("it.aws.region", Regions.US_EAST_1.getName());
30+
protected final static String region = System.getProperty("it.aws.region", Regions.US_EAST_1.getName());
2831
// Adding REGION to bucket prevents errors of
2932
// "A conflicting conditional operation is currently in progress against this resource."
3033
// when bucket is rapidly added/deleted and consistency propagation causes this error.
@@ -36,12 +39,17 @@ public class AbstractS3Test {
3639

3740
@BeforeEach
3841
public void setupClient() {
39-
bucketName = "test-bucket-" + System.currentTimeMillis() + "-" + REGION;
42+
bucketName = createBucketName();
4043
credentials = getCredentials();
41-
client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).withRegion(REGION).build();
44+
client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials)).withRegion(region).build();
4245
createBucket(bucketName);
4346
}
4447

48+
@NotNull
49+
protected String createBucketName() {
50+
return "test-bucket-" + System.currentTimeMillis() + "-" + region;
51+
}
52+
4553
@AfterEach
4654
void cleanup() {
4755
deleteBucket(bucketName);
@@ -59,9 +67,9 @@ protected void createBucket(String bucketName) {
5967
fail("Bucket " + bucketName + " exists. Choose a different bucket name to continue test");
6068
}
6169

62-
CreateBucketRequest request = AbstractS3Test.REGION.contains("east")
70+
CreateBucketRequest request = AbstractS3Test.region.contains("east")
6371
? new CreateBucketRequest(bucketName) // See https://github.com/boto/boto3/issues/125
64-
: new CreateBucketRequest(bucketName, AbstractS3Test.REGION);
72+
: new CreateBucketRequest(bucketName, AbstractS3Test.region);
6573
client.createBucket(request);
6674

6775
if (!client.doesBucketExistV2(bucketName)) {
@@ -84,7 +92,7 @@ protected void deleteBucket(String bucketName) {
8492
}
8593

8694
if (objectListing.isTruncated()) {
87-
objectListing = client.listNextBatchOfObjects(objectListing);
95+
objectListing = client.listNextBatchOfObjects(objectListing);/**/
8896
} else {
8997
break;
9098
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
*
55
*/
66

7-
package microsoft.dagx.transfer.nifi.processors;
7+
package com.microsoft.dagx.common.testfixtures;
88

99
import java.io.File;
1010
import java.net.URI;
@@ -15,7 +15,7 @@
1515
import static org.junit.jupiter.api.Assertions.fail;
1616

1717
public class TestUtils {
18-
protected final static String SAMPLE_FILE_RESOURCE_NAME = "hello.txt";
18+
public final static String SAMPLE_FILE_RESOURCE_NAME = "hello.txt";
1919

2020
public static Path getResourcePath(String resourceName) {
2121
Path path = null;

extensions/transfer/transfer-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ plugins {
1111
dependencies {
1212
api(project(":spi"))
1313

14+
testImplementation(project(":extensions:transfer:transfer-store-memory"))
1415

1516
}
1617

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/CoreTransferExtension.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
import com.microsoft.dagx.spi.transfer.store.TransferProcessStore;
1919
import com.microsoft.dagx.spi.types.TypeManager;
2020
import com.microsoft.dagx.spi.types.domain.transfer.DataRequest;
21+
import com.microsoft.dagx.spi.types.domain.transfer.StatusCheckerRegistry;
2122
import com.microsoft.dagx.transfer.core.flow.DataFlowManagerImpl;
2223
import com.microsoft.dagx.transfer.core.protocol.provider.RemoteMessageDispatcherRegistryImpl;
2324
import com.microsoft.dagx.transfer.core.provision.ProvisionManagerImpl;
2425
import com.microsoft.dagx.transfer.core.provision.ResourceManifestGeneratorImpl;
2526
import com.microsoft.dagx.transfer.core.transfer.ExponentialWaitStrategy;
2627
import com.microsoft.dagx.transfer.core.transfer.TransferProcessManagerImpl;
2728

29+
import java.util.Set;
30+
2831
/**
2932
* Provides core data transfer services to the system.
3033
*/
@@ -44,7 +47,7 @@ public LoadPhase phase() {
4447

4548
@Override
4649
public void initialize(ServiceExtensionContext context) {
47-
this.monitor = context.getMonitor();
50+
monitor = context.getMonitor();
4851
this.context = context;
4952

5053
var typeManager = context.getTypeManager();
@@ -60,19 +63,23 @@ public void initialize(ServiceExtensionContext context) {
6063
var manifestGenerator = new ResourceManifestGeneratorImpl();
6164
context.registerService(ResourceManifestGenerator.class, manifestGenerator);
6265

66+
var statusCheckerRegistry = new StatusCheckerRegistryImpl();
67+
context.registerService(StatusCheckerRegistry.class, statusCheckerRegistry);
68+
6369
var vault = context.getService(Vault.class);
6470

6571
provisionManager = new ProvisionManagerImpl(vault, typeManager, monitor);
6672
context.registerService(ProvisionManager.class, provisionManager);
6773

68-
var waitStrategy = context.hasService(TransferWaitStrategy.class) ? context.getService(TransferWaitStrategy.class) :new ExponentialWaitStrategy(DEFAULT_ITERATION_WAIT);
69-
74+
var waitStrategy = context.hasService(TransferWaitStrategy.class) ? context.getService(TransferWaitStrategy.class) : new ExponentialWaitStrategy(DEFAULT_ITERATION_WAIT);
75+
7076
processManager = TransferProcessManagerImpl.Builder.newInstance()
7177
.waitStrategy(waitStrategy)
7278
.manifestGenerator(manifestGenerator)
7379
.dataFlowManager(dataFlowManager)
7480
.provisionManager(provisionManager)
7581
.dispatcherRegistry(dispatcherRegistry)
82+
.statusCheckerRegistry(statusCheckerRegistry)
7683
.monitor(monitor)
7784
.build();
7885

@@ -81,6 +88,11 @@ public void initialize(ServiceExtensionContext context) {
8188
monitor.info("Initialized Core Transfer extension");
8289
}
8390

91+
@Override
92+
public Set<String> provides() {
93+
return Set.of("dagx:statuschecker", "dagx:dispatcher", "dagx:manifestgenerator");
94+
}
95+
8496
@Override
8597
public void start() {
8698
var transferProcessStore = context.getService(TransferProcessStore.class);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*
5+
*/
6+
7+
package com.microsoft.dagx.transfer.core;
8+
9+
import com.microsoft.dagx.spi.types.domain.transfer.ProvisionedDataDestinationResource;
10+
import com.microsoft.dagx.spi.types.domain.transfer.StatusChecker;
11+
import com.microsoft.dagx.spi.types.domain.transfer.StatusCheckerRegistry;
12+
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
16+
public class StatusCheckerRegistryImpl implements StatusCheckerRegistry {
17+
private final Map<Class<? extends ProvisionedDataDestinationResource>, StatusChecker<? extends ProvisionedDataDestinationResource>> inMemoryMap;
18+
19+
public StatusCheckerRegistryImpl() {
20+
inMemoryMap = new HashMap<>();
21+
}
22+
23+
@Override
24+
public void register(Class<? extends ProvisionedDataDestinationResource> provisionedResourceClass, StatusChecker<? extends ProvisionedDataDestinationResource> statusChecker) {
25+
inMemoryMap.put(provisionedResourceClass, statusChecker);
26+
}
27+
28+
@Override
29+
public <T extends ProvisionedDataDestinationResource> StatusChecker<T> resolve(Class<? extends ProvisionedDataDestinationResource> provisionedResourceClass) {
30+
return (StatusChecker<T>) inMemoryMap.get(provisionedResourceClass);
31+
}
32+
33+
@Override
34+
public <T extends ProvisionedDataDestinationResource> StatusChecker<T> resolve(T resource) {
35+
return resolve(resource.getClass());
36+
}
37+
}

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/provision/ProvisionManagerImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ void onDestinationResource(ProvisionedDataDestinationResource destinationResourc
7676
return;
7777
}
7878

79-
80-
transferProcess.getDataRequest().updateDestination(destinationResource.createDataDestination());
79+
if (!destinationResource.isError()) {
80+
transferProcess.getDataRequest().updateDestination(destinationResource.createDataDestination());
81+
}
8182

8283
if (secretToken != null) {
8384
String keyName = destinationResource.getResourceName();

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/transfer/ExponentialWaitStrategy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@
1111
* Implements an exponential backoff strategy for failed iterations.
1212
*/
1313
public class ExponentialWaitStrategy implements TransferWaitStrategy {
14+
private final long successWaitPeriodMillis;
1415
private int errorCount = 0;
15-
private long successWaitPeriod;
1616

1717

18-
public ExponentialWaitStrategy(long successWaitPeriod) {
19-
this.successWaitPeriod = successWaitPeriod;
18+
public ExponentialWaitStrategy(long successWaitPeriodMillis) {
19+
this.successWaitPeriodMillis = successWaitPeriodMillis;
2020
}
2121

2222
@Override
2323
public long waitForMillis() {
24-
return successWaitPeriod;
24+
return successWaitPeriodMillis;
2525
}
2626

2727
@Override
@@ -33,7 +33,7 @@ public void success() {
3333
public long retryInMillis() {
3434
errorCount++;
3535
double exponentialMultiplier = Math.pow(2.0, errorCount - 1);
36-
double result = exponentialMultiplier * successWaitPeriod;
36+
double result = exponentialMultiplier * successWaitPeriodMillis;
3737
return (long) Math.min(result, Long.MAX_VALUE);
3838
}
3939
}

0 commit comments

Comments
 (0)