Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit 042339a

Browse files
Feature/63 implement deprovisioning (#98)
* added deprovisioning for S3 and Azure Blob * fixed overwriting destination containers * do not allow to transition from ERROR to DEPROVISION_REQ * deactivated terraform-plan due to problems with the Azure AD SP
1 parent b4dbb86 commit 042339a

File tree

20 files changed

+465
-58
lines changed

20 files changed

+465
-58
lines changed

.github/workflows/terraform.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ jobs:
8686
id: validate
8787
run: terraform -chdir=scripts validate -no-color
8888

89-
- name: Terraform Plan
90-
id: plan
91-
if: github.event_name == 'pull_request'
92-
run: terraform -chdir=scripts plan -no-color -var "resourcesuffix=dev" -var "backend_account_key=${{ secrets.TF_BACKEND_KEY }}"
89+
# deactivated until the GithubActions-User has correct credentials
90+
# - name: Terraform Plan
91+
# id: plan
92+
# if: github.event_name == 'pull_request'
93+
# run: terraform -chdir=scripts plan -no-color -var "resourcesuffix=dev" -var "backend_account_key=${{ secrets.TF_BACKEND_KEY }}"
94+
9395

94-
# deactivated until the GithubActions-User has correct credentials
9596

9697
# - name: Taint the connector instance
9798
# if: github.ref == 'refs/heads/main' && github.event_name == 'push'

distributions/demo-e2e/dagx-config-docker.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ dagx.vault.clientid=<YOUR_SERVICEPRINCIPAL_CLIENTID>
77
dagx.vault.tenantid=<YOUR_SUBSCRIPTION_TENANTID>
88
# that must be the same certificate that was used during "terraform apply" for the primary service principal!
99
dagx.vault.certificate=./cert.pfx
10-
dagx.vault.name=dagx-demo-vault
10+
dagx.vault.name=dagx-dev-vault
1111
dagx.atlas.url=https://dev-dagx-atlas.westeurope.cloudapp.azure.com
1212
dagx.nifi.url=http://dev-dagx-nifi.westeurope.azurecontainer.io:8080/
1313
dagx.nifi.flow.url=http://dev-dagx-nifi.westeurope.azurecontainer.io:8888/

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/CoreTransferExtension.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.microsoft.dagx.spi.system.ServiceExtension;
1212
import com.microsoft.dagx.spi.system.ServiceExtensionContext;
1313
import com.microsoft.dagx.spi.transfer.TransferProcessManager;
14+
import com.microsoft.dagx.spi.transfer.TransferProcessObservable;
1415
import com.microsoft.dagx.spi.transfer.TransferWaitStrategy;
1516
import com.microsoft.dagx.spi.transfer.flow.DataFlowManager;
1617
import com.microsoft.dagx.spi.transfer.provision.ProvisionManager;
@@ -84,6 +85,7 @@ public void initialize(ServiceExtensionContext context) {
8485
.build();
8586

8687
context.registerService(TransferProcessManager.class, processManager);
88+
context.registerService(TransferProcessObservable.class, processManager);
8789

8890
monitor.info("Initialized Core Transfer extension");
8991
}

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/provision/ProvisionContextImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
public class ProvisionContextImpl implements ProvisionContext {
2222
private final Consumer<ProvisionedResource> resourceCallback;
2323
private final BiConsumer<ProvisionedDataDestinationResource, SecretToken> destinationCallback;
24+
private final BiConsumer<ProvisionedDataDestinationResource, Throwable> deprovisionCallback;
2425
private final TransferProcessStore processStore;
2526

2627
public ProvisionContextImpl(TransferProcessStore processStore,
2728
Consumer<ProvisionedResource> resourceCallback,
28-
BiConsumer<ProvisionedDataDestinationResource, SecretToken> destinationCallback) {
29+
BiConsumer<ProvisionedDataDestinationResource, SecretToken> destinationCallback,
30+
BiConsumer<ProvisionedDataDestinationResource, Throwable> deprovisionCallback) {
2931
this.processStore = processStore;
3032
this.resourceCallback = resourceCallback;
3133
this.destinationCallback = destinationCallback;
34+
this.deprovisionCallback = deprovisionCallback;
3235
}
3336

3437
@Override
@@ -41,6 +44,11 @@ public void callback(ProvisionedDataDestinationResource resource, SecretToken se
4144
destinationCallback.accept(resource, secretToken);
4245
}
4346

47+
@Override
48+
public void deprovisioned(ProvisionedDataDestinationResource resource, Throwable error) {
49+
deprovisionCallback.accept(resource, error);
50+
}
51+
4452
@Override
4553
public void create(String processId, String resourceDefinitionId, Object data) {
4654
processStore.createData(processId, resourceDefinitionId, data);

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/provision/ProvisionManagerImpl.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.microsoft.dagx.spi.security.Vault;
1111
import com.microsoft.dagx.spi.transfer.provision.ProvisionManager;
1212
import com.microsoft.dagx.spi.transfer.provision.Provisioner;
13+
import com.microsoft.dagx.spi.transfer.response.ResponseStatus;
1314
import com.microsoft.dagx.spi.transfer.store.TransferProcessStore;
1415
import com.microsoft.dagx.spi.types.TypeManager;
1516
import com.microsoft.dagx.spi.types.domain.transfer.*;
@@ -38,7 +39,7 @@ public ProvisionManagerImpl(Vault vault, TypeManager typeManager, Monitor monito
3839

3940
public void start(TransferProcessStore processStore) {
4041
this.processStore = processStore;
41-
var context = new ProvisionContextImpl(this.processStore, this::onResource, this::onDestinationResource);
42+
var context = new ProvisionContextImpl(this.processStore, this::onResource, this::onDestinationResource, this::onDeprovisionComplete);
4243
provisioners.forEach(provisioner -> provisioner.initialize(context));
4344
}
4445

@@ -64,7 +65,29 @@ public void provision(TransferProcess process) {
6465
public void deprovision(TransferProcess process) {
6566
for (ProvisionedResource definition : process.getProvisionedResourceSet().getResources()) {
6667
Provisioner<?, ProvisionedResource> chosenProvisioner = getProvisioner(definition);
67-
chosenProvisioner.deprovision(definition);
68+
final ResponseStatus status = chosenProvisioner.deprovision(definition);
69+
if (status != ResponseStatus.OK) {
70+
process.transitionError("Error during deprovisioning");
71+
processStore.update(process);
72+
}
73+
}
74+
}
75+
76+
void onDeprovisionComplete(ProvisionedDataDestinationResource resource, Throwable deprovisionError) {
77+
if (deprovisionError != null) {
78+
monitor.severe("Deprovisioning error: ", deprovisionError);
79+
} else {
80+
monitor.info("Deprovisioning successfully completed.");
81+
82+
final TransferProcess transferProcess = processStore.find(resource.getTransferProcessId());
83+
if (transferProcess != null) {
84+
transferProcess.transitionDeprovisioned();
85+
processStore.update(transferProcess);
86+
monitor.debug("Process " + transferProcess.getId() + " is now " + TransferProcessStates.from(transferProcess.getState()));
87+
} else {
88+
monitor.severe("ProvisionManager: no TransferProcess found for deprovisioned resource");
89+
}
90+
6891
}
6992
}
7093

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/transfer/TransferProcessManagerImpl.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,32 @@
77

88
import com.microsoft.dagx.spi.message.RemoteMessageDispatcherRegistry;
99
import com.microsoft.dagx.spi.monitor.Monitor;
10-
import com.microsoft.dagx.spi.transfer.TransferInitiateResponse;
11-
import com.microsoft.dagx.spi.transfer.TransferProcessManager;
12-
import com.microsoft.dagx.spi.transfer.TransferWaitStrategy;
10+
import com.microsoft.dagx.spi.transfer.*;
1311
import com.microsoft.dagx.spi.transfer.flow.DataFlowManager;
1412
import com.microsoft.dagx.spi.transfer.provision.ProvisionManager;
1513
import com.microsoft.dagx.spi.transfer.provision.ResourceManifestGenerator;
1614
import com.microsoft.dagx.spi.transfer.response.ResponseStatus;
1715
import com.microsoft.dagx.spi.transfer.store.TransferProcessStore;
1816
import com.microsoft.dagx.spi.types.domain.transfer.*;
1917

20-
import java.util.List;
21-
import java.util.Objects;
18+
import java.util.*;
2219
import java.util.concurrent.ExecutorService;
2320
import java.util.concurrent.Executors;
2421
import java.util.concurrent.atomic.AtomicBoolean;
2522
import java.util.stream.Collectors;
2623

2724
import static com.microsoft.dagx.spi.types.domain.transfer.TransferProcess.Type.CLIENT;
2825
import static com.microsoft.dagx.spi.types.domain.transfer.TransferProcess.Type.PROVIDER;
29-
import static com.microsoft.dagx.spi.types.domain.transfer.TransferProcessStates.INITIAL;
30-
import static com.microsoft.dagx.spi.types.domain.transfer.TransferProcessStates.PROVISIONED;
26+
import static com.microsoft.dagx.spi.types.domain.transfer.TransferProcessStates.*;
3127
import static java.lang.String.format;
3228
import static java.util.UUID.randomUUID;
3329

3430
/**
3531
*
3632
*/
37-
public class TransferProcessManagerImpl implements TransferProcessManager {
33+
public class TransferProcessManagerImpl implements TransferProcessManager, TransferProcessObservable {
3834
private final AtomicBoolean active = new AtomicBoolean();
35+
private final Map<String, List<TransferProcessListener>> listenerMap;
3936
private int batchSize = 5;
4037
private TransferWaitStrategy waitStrategy = () -> 5000L; // default wait five seconds
4138
private ResourceManifestGenerator manifestGenerator;
@@ -48,6 +45,11 @@ public class TransferProcessManagerImpl implements TransferProcessManager {
4845
private StatusCheckerRegistry statusCheckerRegistry;
4946

5047
private TransferProcessManagerImpl() {
48+
listenerMap = new HashMap<>();
49+
}
50+
51+
public Map<String, List<TransferProcessListener>> getListeners() {
52+
return listenerMap;
5153
}
5254

5355
public void start(TransferProcessStore processStore) {
@@ -74,6 +76,28 @@ public TransferInitiateResponse initiateProviderRequest(DataRequest dataRequest)
7476
return initiateRequest(PROVIDER, dataRequest);
7577
}
7678

79+
@Override
80+
public void registerListener(String processId, TransferProcessListener listener) {
81+
if (listenerMap.containsKey(processId)) {
82+
final List<TransferProcessListener> list = listenerMap.get(processId);
83+
if (!list.contains(listener)) {
84+
list.add(listener);
85+
}
86+
} else {
87+
final ArrayList<TransferProcessListener> list = new ArrayList<>();
88+
list.add(listener);
89+
listenerMap.put(processId, list);
90+
}
91+
}
92+
93+
@Override
94+
public void unregister(TransferProcessListener listener) {
95+
// unregister from all processes
96+
listenerMap.forEach((key, value) -> value.remove(listener));
97+
// clear the registration if no more listeners
98+
listenerMap.entrySet().removeIf(e -> e.getValue().isEmpty());
99+
}
100+
77101
private TransferInitiateResponse initiateRequest(TransferProcess.Type type, DataRequest dataRequest) {
78102
// make the request idempotent: if the process exists, return
79103
var processId = transferProcessStore.processIdForTransferId(dataRequest.getId());
@@ -99,7 +123,9 @@ private void run() {
99123

100124
int finished = checkCompleted();
101125

102-
if (provisioning + provisioned + sent + finished == 0) {
126+
int deprovisioning = checkDeprovisioningRequested();
127+
128+
if (provisioning + provisioned + sent + finished + deprovisioning == 0) {
103129
Thread.sleep(waitStrategy.waitForMillis());
104130
}
105131
waitStrategy.success();
@@ -122,6 +148,27 @@ private void run() {
122148
}
123149
}
124150

151+
/**
152+
* Transitions all processes that are in state DEPROVISIONING_REQ and deprovisions their associated
153+
* resources. Then they are moved to DEPROVISIONING
154+
*
155+
* @return the number of transfer processes in DEPROVISIONING_REQ
156+
*/
157+
private int checkDeprovisioningRequested() {
158+
List<TransferProcess> processesDeprovisioning = transferProcessStore.nextForState(DEPROVISIONING_REQ.code(), batchSize);
159+
160+
for (var process : processesDeprovisioning) {
161+
process.transitionDeprovisioning();
162+
transferProcessStore.update(process);
163+
monitor.debug("Process " + process.getId() + " is now " + TransferProcessStates.from(process.getState()));
164+
provisionManager.deprovision(process);
165+
publishDeprovisioned(process);
166+
}
167+
168+
return processesDeprovisioning.size();
169+
}
170+
171+
125172
/**
126173
* Transition all processes, who have provisioned resources, into the IN_PROCRESS or STREAMING status, depending on
127174
* whether they're finite or not.
@@ -160,30 +207,34 @@ private int checkCompleted() {
160207

161208
for (var process : processesInProgress.stream().filter(p -> p.getType() == CLIENT).collect(Collectors.toList())) {
162209

163-
//only check resources for which a checker was registered.
164-
// todo: maybe error out processes with uncheckable resources??
165210
List<ProvisionedResource> resources = process.getProvisionedResourceSet().getResources().stream().filter(this::hasChecker).collect(Collectors.toList());
166211

167-
//todo: comment this in if we want to error out uncheckable resources
168-
// var resourcesWithNoChecker = resources.stream().filter(resource -> statusCheckerRegistry.resolve(resource) == null).collect(Collectors.toList());
169-
// if (!resourcesWithNoChecker.isEmpty()) {
170-
// final String violatingResourceClasses = resourcesWithNoChecker.stream().map(r -> r.getClass().getName()).collect(Collectors.joining(", "));
171-
// monitor.severe("There is no StatusChecker for resource type " + violatingResourceClasses);
172-
// process.transitionError("No StatusChecker found for a provisioned resource of type(s) " + violatingResourceClasses + ". The violating resource is part of transfer process " + process.getId());
173-
// transferProcessStore.update(process);
174-
// continue;
175-
// }
176-
177212
// update the process once ALL resources are completed
178213
if (resources.stream().allMatch(this::isComplete)) {
179214
process.transitionCompleted();
180215
monitor.debug("Process " + process.getId() + " is now " + TransferProcessStates.COMPLETED);
216+
publishCompleted(process);
217+
181218
}
182219
transferProcessStore.update(process);
183220
}
184221
return processesInProgress.size();
185222
}
186223

224+
private void publishCompleted(TransferProcess process) {
225+
final List<TransferProcessListener> transferProcessListeners = listenerMap.get(process.getId());
226+
if (transferProcessListeners != null) {
227+
transferProcessListeners.forEach(l -> l.completed(process));
228+
}
229+
}
230+
231+
private void publishDeprovisioned(TransferProcess process) {
232+
final List<TransferProcessListener> transferProcessListeners = listenerMap.get(process.getId());
233+
if (transferProcessListeners != null) {
234+
transferProcessListeners.forEach(l -> l.deprovisioned(process));
235+
}
236+
}
237+
187238
private boolean hasChecker(ProvisionedResource provisionedResource) {
188239
return provisionedResource instanceof ProvisionedDataDestinationResource && statusCheckerRegistry.resolve((ProvisionedDataDestinationResource) provisionedResource) != null;
189240
}

0 commit comments

Comments
 (0)