Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit 74eaa58

Browse files
restricted the StatusChecker to ProvisionedDataDestinationResource
1 parent b754134 commit 74eaa58

File tree

5 files changed

+49
-34
lines changed

5 files changed

+49
-34
lines changed

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/StatusCheckerRegistryImpl.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,32 @@
66

77
package com.microsoft.dagx.transfer.core;
88

9-
import com.microsoft.dagx.spi.types.domain.transfer.ProvisionedResource;
9+
import com.microsoft.dagx.spi.types.domain.transfer.ProvisionedDataDestinationResource;
1010
import com.microsoft.dagx.spi.types.domain.transfer.StatusChecker;
1111
import com.microsoft.dagx.spi.types.domain.transfer.StatusCheckerRegistry;
1212

1313
import java.util.HashMap;
1414
import java.util.Map;
1515

1616
public class StatusCheckerRegistryImpl implements StatusCheckerRegistry {
17-
private final Map<Class<? extends ProvisionedResource>, StatusChecker<? extends ProvisionedResource>> inMemoryMap;
17+
private final Map<Class<? extends ProvisionedDataDestinationResource>, StatusChecker<? extends ProvisionedDataDestinationResource>> inMemoryMap;
1818

1919
public StatusCheckerRegistryImpl() {
2020
inMemoryMap = new HashMap<>();
2121
}
2222

2323
@Override
24-
public void register(Class<? extends ProvisionedResource> provisionedResourceClass, StatusChecker<? extends ProvisionedResource> statusChecker) {
24+
public void register(Class<? extends ProvisionedDataDestinationResource> provisionedResourceClass, StatusChecker<? extends ProvisionedDataDestinationResource> statusChecker) {
2525
inMemoryMap.put(provisionedResourceClass, statusChecker);
2626
}
2727

28-
@SuppressWarnings("unchecked")
2928
@Override
30-
public <T extends ProvisionedResource> StatusChecker<T> resolve(Class<? extends ProvisionedResource> provisionedResourceClass) {
29+
public <T extends ProvisionedDataDestinationResource> StatusChecker<T> resolve(Class<? extends ProvisionedDataDestinationResource> provisionedResourceClass) {
3130
return (StatusChecker<T>) inMemoryMap.get(provisionedResourceClass);
3231
}
3332

3433
@Override
35-
public <T extends ProvisionedResource> StatusChecker<T> resolve(T resource) {
34+
public <T extends ProvisionedDataDestinationResource> StatusChecker<T> resolve(T resource) {
3635
return resolve(resource.getClass());
3736
}
3837
}

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/transfer/TransferProcessManagerImpl.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ private TransferInitiateResponse initiateRequest(TransferProcess.Type type, Data
8686
return TransferInitiateResponse.Builder.newInstance().id(process.getId()).status(ResponseStatus.OK).build();
8787
}
8888

89-
@SuppressWarnings("BusyWait")
9089
private void run() {
9190
while (active.get()) {
9291
try {
@@ -162,7 +161,7 @@ private int checkCompleted() {
162161

163162
//only check resources for which a checker was registered.
164163
// todo: maybe error out processes with uncheckable resources??
165-
final List<ProvisionedResource> resources = process.getProvisionedResourceSet().getResources().stream().filter(this::hasChecker).collect(Collectors.toList());
164+
List<ProvisionedResource> resources = process.getProvisionedResourceSet().getResources().stream().filter(this::hasChecker).collect(Collectors.toList());
166165

167166
//todo: comment this in if we want to error out uncheckable resources
168167
// var resourcesWithNoChecker = resources.stream().filter(resource -> statusCheckerRegistry.resolve(resource) == null).collect(Collectors.toList());
@@ -185,15 +184,19 @@ private int checkCompleted() {
185184
}
186185

187186
private boolean hasChecker(ProvisionedResource provisionedResource) {
188-
return statusCheckerRegistry.resolve(provisionedResource) != null;
187+
return provisionedResource instanceof ProvisionedDataDestinationResource && statusCheckerRegistry.resolve((ProvisionedDataDestinationResource) provisionedResource) != null;
189188
}
190189

191190
private boolean isComplete(ProvisionedResource resource) {
192-
var checker = statusCheckerRegistry.resolve(resource);
191+
if (!(resource instanceof ProvisionedDataDestinationResource)) {
192+
return false;
193+
}
194+
ProvisionedDataDestinationResource dataResource = (ProvisionedDataDestinationResource) resource;
195+
var checker = statusCheckerRegistry.resolve(dataResource);
193196
if (checker == null) {
194197
return true;
195198
}
196-
return checker.isComplete(resource);
199+
return checker.isComplete(dataResource);
197200
}
198201

199202
/**

extensions/transfer/transfer-core/src/test/java/com/microsoft/dagx/transfer/core/transfer/TransferProcessManagerImplConsumerTest.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ void setup() {
6666
@Test
6767
void run_shouldProvision() throws InterruptedException {
6868
//arrange
69-
final TransferProcess process = createTransferProcess(TransferProcessStates.INITIAL);
69+
TransferProcess process = createTransferProcess(TransferProcessStates.INITIAL);
7070
var cdl = new CountDownLatch(1);
7171
//prepare provision manager
7272
provisionManager.provision(anyObject(TransferProcess.class));
@@ -77,7 +77,7 @@ void run_shouldProvision() throws InterruptedException {
7777
replay(provisionManager);
7878

7979
//prepare process store
80-
final TransferProcessStore processStoreMock = niceMock(TransferProcessStore.class);
80+
TransferProcessStore processStoreMock = niceMock(TransferProcessStore.class);
8181
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.singletonList(process));
8282
processStoreMock.update(process);
8383
expectLastCall().times(1);
@@ -97,7 +97,7 @@ void run_shouldProvision() throws InterruptedException {
9797
@DisplayName("verifySend: check that the process is in REQUESTED state")
9898
void verifySend() throws InterruptedException {
9999
//arrange
100-
final TransferProcess process = createTransferProcess(TransferProcessStates.PROVISIONED);
100+
TransferProcess process = createTransferProcess(TransferProcessStates.PROVISIONED);
101101
var cdl = new CountDownLatch(1);
102102
//prepare provision manager
103103
expect(dispatcherRegistry.send(eq(Void.class), anyObject(), anyObject())).andAnswer(() -> {
@@ -107,7 +107,7 @@ void verifySend() throws InterruptedException {
107107
replay(dispatcherRegistry);
108108

109109
//prepare process store
110-
final TransferProcessStore processStoreMock = niceMock(TransferProcessStore.class);
110+
TransferProcessStore processStoreMock = niceMock(TransferProcessStore.class);
111111
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.emptyList());
112112
expect(processStoreMock.nextForState(eq(TransferProcessStates.PROVISIONED.code()), anyInt())).andReturn(Collections.singletonList(process));
113113
processStoreMock.update(process);
@@ -129,13 +129,13 @@ void verifySend() throws InterruptedException {
129129
@DisplayName("checkProvisioned: all resources belong to finite processes")
130130
void verifyCheckProvisioned_allAreFinite() throws InterruptedException {
131131
//arrange
132-
final TransferProcess process = createTransferProcess(TransferProcessStates.REQUESTED_ACK);
132+
TransferProcess process = createTransferProcess(TransferProcessStates.REQUESTED_ACK);
133133
process.getProvisionedResourceSet().addResource(new TestResource());
134134

135135
var cdl = new CountDownLatch(1);
136136

137137
//prepare process store
138-
final TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
138+
TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
139139
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.emptyList());
140140
expect(processStoreMock.nextForState(eq(TransferProcessStates.PROVISIONED.code()), anyInt())).andReturn(Collections.emptyList());
141141
expect(processStoreMock.nextForState(eq(TransferProcessStates.REQUESTED_ACK.code()), anyInt())).andReturn(Collections.singletonList(process));
@@ -162,16 +162,16 @@ void verifyCheckProvisioned_allAreFinite() throws InterruptedException {
162162
@DisplayName("checkProvisioned: all resources belong to non-finite processes")
163163
void verifyCheckProvisioned_allAreNonFinite() throws InterruptedException {
164164
//arrange
165-
final TransferType type = TransferType.Builder.transferType()
165+
TransferType type = TransferType.Builder.transferType()
166166
.isFinite(false).build();
167167

168-
final TransferProcess process = createTransferProcess(TransferProcessStates.REQUESTED_ACK, type);
168+
TransferProcess process = createTransferProcess(TransferProcessStates.REQUESTED_ACK, type);
169169
process.getProvisionedResourceSet().addResource(new TestResource());
170170

171171
var cdl = new CountDownLatch(1);
172172

173173
//prepare process store
174-
final TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
174+
TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
175175
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.emptyList());
176176
expect(processStoreMock.nextForState(eq(TransferProcessStates.PROVISIONED.code()), anyInt())).andReturn(Collections.emptyList());
177177
expect(processStoreMock.nextForState(eq(TransferProcessStates.REQUESTED_ACK.code()), anyInt())).andReturn(Collections.singletonList(process));
@@ -198,14 +198,14 @@ void verifyCheckProvisioned_allAreNonFinite() throws InterruptedException {
198198
@DisplayName("checkComplete: all ProvisionedResources are complete")
199199
void verifyCompleted_allCompleted() throws InterruptedException {
200200
//arrange
201-
final TransferProcess process = createTransferProcess(TransferProcessStates.REQUESTED_ACK);
201+
TransferProcess process = createTransferProcess(TransferProcessStates.REQUESTED_ACK);
202202
process.getProvisionedResourceSet().addResource(new TestResource());
203203
process.getProvisionedResourceSet().addResource(new TestResource());
204204

205205
var cdl = new CountDownLatch(1);
206206

207207
//prepare process store
208-
final TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
208+
TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
209209
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.emptyList());
210210
expect(processStoreMock.nextForState(eq(TransferProcessStates.PROVISIONED.code()), anyInt())).andReturn(Collections.emptyList());
211211
expect(processStoreMock.nextForState(eq(TransferProcessStates.REQUESTED_ACK.code()), anyInt())).andReturn(Collections.emptyList());
@@ -237,14 +237,14 @@ void verifyCompleted_allCompleted() throws InterruptedException {
237237
@DisplayName("checkComplete: not all ProvisionedResources are yet completed")
238238
void verifyCompleted_notAllYetCompleted() throws InterruptedException {
239239
//arrange
240-
final TransferProcess process = createTransferProcess(TransferProcessStates.IN_PROGRESS);
240+
TransferProcess process = createTransferProcess(TransferProcessStates.IN_PROGRESS);
241241
process.getProvisionedResourceSet().addResource(new TestResource());
242242
process.getProvisionedResourceSet().addResource(new TestResource());
243243

244244
var cdl = new CountDownLatch(1);
245245

246246
//prepare process store
247-
final TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
247+
TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
248248
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.emptyList());
249249
expect(processStoreMock.nextForState(eq(TransferProcessStates.PROVISIONED.code()), anyInt())).andReturn(Collections.emptyList());
250250
expect(processStoreMock.nextForState(eq(TransferProcessStates.REQUESTED_ACK.code()), anyInt())).andReturn(Collections.emptyList());
@@ -277,14 +277,14 @@ void verifyCompleted_notAllYetCompleted() throws InterruptedException {
277277
@DisplayName("checkComplete: Should ignore resources without StatusCheckers")
278278
void verifyCompleted_noCheckerForSomeResources() throws InterruptedException {
279279
//arrange
280-
final TransferProcess process = createTransferProcess(TransferProcessStates.IN_PROGRESS);
280+
TransferProcess process = createTransferProcess(TransferProcessStates.IN_PROGRESS);
281281
process.getProvisionedResourceSet().addResource(new TestResource());
282282
process.getProvisionedResourceSet().addResource(new TestResource());
283283

284284
var cdl = new CountDownLatch(1);
285285

286286
//prepare process store
287-
final TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
287+
TransferProcessStore processStoreMock = mock(TransferProcessStore.class);
288288
expect(processStoreMock.nextForState(eq(TransferProcessStates.INITIAL.code()), anyInt())).andReturn(Collections.emptyList());
289289
expect(processStoreMock.nextForState(eq(TransferProcessStates.PROVISIONED.code()), anyInt())).andReturn(Collections.emptyList());
290290
expect(processStoreMock.nextForState(eq(TransferProcessStates.REQUESTED_ACK.code()), anyInt())).andReturn(Collections.emptyList());
@@ -320,12 +320,12 @@ void verifyProvision_shouldNotStarve() throws InterruptedException {
320320
var numProcesses = TRANSFER_MANAGER_BATCHSIZE * 2;
321321

322322
//prepare process store
323-
final TransferProcessStore inMemoryProcessStore = new InMemoryTransferProcessStore();
323+
TransferProcessStore inMemoryProcessStore = new InMemoryTransferProcessStore();
324324

325325
//create a few processes
326326
var processes = new ArrayList<TransferProcess>();
327327
for (int i = 0; i < numProcesses; i++) {
328-
final TransferProcess process = createTransferProcess(TransferProcessStates.UNSAVED);
328+
TransferProcess process = createTransferProcess(TransferProcessStates.UNSAVED);
329329
processes.add(process);
330330
inMemoryProcessStore.create(process);
331331
}
@@ -369,7 +369,7 @@ private TransferProcess createTransferProcess(TransferProcessStates inState, Tra
369369

370370
String processId = UUID.randomUUID().toString();
371371

372-
final DataRequest mock = niceMock(DataRequest.class);
372+
DataRequest mock = niceMock(DataRequest.class);
373373
expect(mock.getTransferType()).andReturn(type).anyTimes();
374374
expect(mock.getId()).andReturn(processId).anyTimes();
375375
replay(mock);
@@ -382,6 +382,19 @@ private TransferProcess createTransferProcess(TransferProcessStates inState, Tra
382382
.build();
383383
}
384384

385-
private static class TestResource extends ProvisionedResource {
385+
private static class TestResource extends ProvisionedDataDestinationResource {
386+
protected TestResource() {
387+
super();
388+
}
389+
390+
@Override
391+
public String getResourceName() {
392+
return "test-resource";
393+
}
394+
395+
@Override
396+
public DataAddress createDataDestination() {
397+
return null;
398+
}
386399
}
387400
}

spi/src/main/java/com/microsoft/dagx/spi/types/domain/transfer/StatusChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* @param <T> the concrete type of ProvisionedResource
1515
*/
1616
@FunctionalInterface
17-
public interface StatusChecker<T extends ProvisionedResource> {
17+
public interface StatusChecker<T extends ProvisionedDataDestinationResource> {
1818
/**
1919
* checks whether a particular ProvisionedResource is "complete", i.e. whether the data transfer is finished.
2020
*

spi/src/main/java/com/microsoft/dagx/spi/types/domain/transfer/StatusCheckerRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
package com.microsoft.dagx.spi.types.domain.transfer;
88

99
public interface StatusCheckerRegistry {
10-
void register(Class<? extends ProvisionedResource> provisionedResourceClass, StatusChecker<? extends ProvisionedResource> statusChecker);
10+
void register(Class<? extends ProvisionedDataDestinationResource> provisionedResourceClass, StatusChecker<? extends ProvisionedDataDestinationResource> statusChecker);
1111

12-
<T extends ProvisionedResource> StatusChecker<T> resolve(Class<? extends ProvisionedResource> provisionedResourceClass);
12+
<T extends ProvisionedDataDestinationResource> StatusChecker<T> resolve(Class<? extends ProvisionedDataDestinationResource> provisionedResourceClass);
1313

14-
<T extends ProvisionedResource> StatusChecker<T> resolve(T resource);
14+
<T extends ProvisionedDataDestinationResource> StatusChecker<T> resolve(T resource);
1515
}

0 commit comments

Comments
 (0)