Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit b754134

Browse files
added a test to verify the non-starvation feature
1 parent f959dcb commit b754134

File tree

4 files changed

+167
-4
lines changed

4 files changed

+167
-4
lines changed

extensions/transfer/transfer-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ plugins {
1111
dependencies {
1212
api(project(":spi"))
1313

14+
testImplementation(project(":extensions:transfer:transfer-store-memory"))
1415

1516
}
1617

extensions/transfer/transfer-core/src/test/java/com/microsoft/dagx/transfer/core/transfer/TransferProcessManagerImplConsumerTest.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
import com.microsoft.dagx.spi.transfer.provision.ResourceManifestGenerator;
1414
import com.microsoft.dagx.spi.transfer.store.TransferProcessStore;
1515
import com.microsoft.dagx.spi.types.domain.transfer.*;
16+
import com.microsoft.dagx.transfer.store.memory.InMemoryTransferProcessStore;
1617
import org.junit.jupiter.api.BeforeEach;
1718
import org.junit.jupiter.api.DisplayName;
1819
import org.junit.jupiter.api.Test;
1920

21+
import java.util.ArrayList;
2022
import java.util.Collections;
23+
import java.util.UUID;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.TimeUnit;
2326

@@ -27,10 +30,12 @@
2730
class TransferProcessManagerImplConsumerTest {
2831

2932
private static final long TIMEOUT = 5;
33+
private final static int TRANSFER_MANAGER_BATCHSIZE = 10;
3034
private TransferProcessManagerImpl transferProcessManager;
3135
private ProvisionManager provisionManager;
3236
private RemoteMessageDispatcherRegistry dispatcherRegistry;
3337
private StatusCheckerRegistry statusCheckerRegistry;
38+
private ExponentialWaitStrategy waitStrategyMock;
3439

3540
@BeforeEach
3641
void setup() {
@@ -40,11 +45,17 @@ void setup() {
4045
ResourceManifestGenerator manifestGenerator = mock(ResourceManifestGenerator.class);
4146

4247
statusCheckerRegistry = mock(StatusCheckerRegistry.class);
48+
49+
waitStrategyMock = partialMockBuilder(ExponentialWaitStrategy.class)
50+
.withConstructor(1000L)
51+
.addMockedMethod("success").strictMock();
52+
53+
4354
transferProcessManager = TransferProcessManagerImpl.Builder.newInstance()
4455
.provisionManager(provisionManager)
4556
.dataFlowManager(dataFlowManager)
46-
.waitStrategy(new ExponentialWaitStrategy(1000))
47-
.batchSize(10)
57+
.waitStrategy(waitStrategyMock)
58+
.batchSize(TRANSFER_MANAGER_BATCHSIZE)
4859
.dispatcherRegistry(dispatcherRegistry)
4960
.manifestGenerator(manifestGenerator)
5061
.monitor(mock(Monitor.class))
@@ -303,17 +314,68 @@ void verifyCompleted_noCheckerForSomeResources() throws InterruptedException {
303314
assertThat(process.getState()).describedAs("State should be COMPLETED").isEqualTo(TransferProcessStates.COMPLETED.code());
304315
}
305316

317+
@Test
318+
@DisplayName("Verify that no process 'starves' during two consecutive runs, when the batch size > number of processes")
319+
void verifyProvision_shouldNotStarve() throws InterruptedException {
320+
var numProcesses = TRANSFER_MANAGER_BATCHSIZE * 2;
321+
322+
//prepare process store
323+
final TransferProcessStore inMemoryProcessStore = new InMemoryTransferProcessStore();
324+
325+
//create a few processes
326+
var processes = new ArrayList<TransferProcess>();
327+
for (int i = 0; i < numProcesses; i++) {
328+
final TransferProcess process = createTransferProcess(TransferProcessStates.UNSAVED);
329+
processes.add(process);
330+
inMemoryProcessStore.create(process);
331+
}
332+
333+
var processesToProvision = new CountDownLatch(numProcesses); //all processes should be provisioned
334+
335+
//prepare provision manager
336+
provisionManager.provision(anyObject(TransferProcess.class));
337+
expectLastCall().andAnswer(() -> {
338+
processesToProvision.countDown();
339+
return null;
340+
}).anyTimes();
341+
replay(provisionManager);
342+
343+
// use the waitstrategy to count the number of iterations by making sure "success" was called exactly twice
344+
waitStrategyMock.success();
345+
expectLastCall().times(2);
346+
replay(waitStrategyMock);
347+
348+
349+
//act
350+
transferProcessManager.start(inMemoryProcessStore);
351+
352+
//assert
353+
assertThat(processesToProvision.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
354+
verify(provisionManager);
355+
verify(waitStrategyMock);
356+
assertThat(processes).describedAs("All transfer processes should be in PROVISIONING state").allSatisfy(p -> {
357+
var id = p.getId();
358+
var storedProcess = inMemoryProcessStore.find(id);
359+
assertThat(storedProcess).describedAs("Should exist in the TransferProcessStore").isNotNull();
360+
assertThat(storedProcess.getState()).isEqualTo(TransferProcessStates.PROVISIONING.code());
361+
});
362+
}
363+
306364
private TransferProcess createTransferProcess(TransferProcessStates inState) {
307365
return createTransferProcess(inState, new TransferType());
308366
}
309367

310368
private TransferProcess createTransferProcess(TransferProcessStates inState, TransferType type) {
369+
370+
String processId = UUID.randomUUID().toString();
371+
311372
final DataRequest mock = niceMock(DataRequest.class);
312373
expect(mock.getTransferType()).andReturn(type).anyTimes();
374+
expect(mock.getId()).andReturn(processId).anyTimes();
313375
replay(mock);
314376
return TransferProcess.Builder.newInstance()
315377
.state(inState.code())
316-
.id("test-process-id")
378+
.id("test-process-" + processId)
317379
.provisionedResourceSet(new ProvisionedResourceSet())
318380
.type(TransferProcess.Type.CLIENT)
319381
.dataRequest(mock)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.microsoft.dagx.transfer.demo.protocols;
2+
3+
import com.microsoft.dagx.junit.DagxExtension;
4+
import com.microsoft.dagx.spi.monitor.Monitor;
5+
import com.microsoft.dagx.spi.security.Vault;
6+
import com.microsoft.dagx.spi.security.VaultResponse;
7+
import com.microsoft.dagx.spi.transfer.TransferProcessManager;
8+
import com.microsoft.dagx.spi.transfer.TransferWaitStrategy;
9+
import com.microsoft.dagx.spi.types.domain.metadata.DataEntry;
10+
import com.microsoft.dagx.spi.types.domain.transfer.DataAddress;
11+
import com.microsoft.dagx.spi.types.domain.transfer.DataRequest;
12+
import com.microsoft.dagx.transfer.demo.protocols.spi.DemoProtocols;
13+
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.DestinationManager;
14+
import org.jetbrains.annotations.Nullable;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Disabled;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.ExtendWith;
19+
20+
import java.util.Map;
21+
import java.util.UUID;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
/**
27+
*
28+
*/
29+
@ExtendWith(DagxExtension.class)
30+
@Disabled
31+
class DemoProtocolsTransferExtensionTest {
32+
33+
/**
34+
* Perform a push stream flow using the loopback protocol.
35+
*
36+
* @param processManager the injected process manager
37+
* @param destinationManager the injected destination manager
38+
* @param monitor the injected runtime monitor
39+
*/
40+
@Test
41+
void verifyPushStreamFlow(TransferProcessManager processManager, DestinationManager destinationManager, Monitor monitor) throws InterruptedException {
42+
var latch = new CountDownLatch(1);
43+
44+
var destinationName = UUID.randomUUID().toString();
45+
destinationManager.registerObserver((name, payload) -> {
46+
monitor.info("Message: " + new String(payload));
47+
latch.countDown();
48+
});
49+
50+
var dataEntry = DataEntry.Builder.newInstance().id("test123").build();
51+
52+
var dataRequest = DataRequest.Builder.newInstance()
53+
.id(UUID.randomUUID().toString())
54+
.protocol("loopback")
55+
.destinationType(DemoProtocols.PUSH_STREAM)
56+
.dataEntry(dataEntry)
57+
.dataDestination(DataAddress.Builder.newInstance().type(DemoProtocols.PUSH_STREAM)
58+
.property(DemoProtocols.DESTINATION_NAME, destinationName).build())
59+
.connectorId("test").build();
60+
61+
processManager.initiateClientRequest(dataRequest);
62+
63+
latch.await(1, TimeUnit.MINUTES);
64+
}
65+
66+
/**
67+
* Fixture that obtains a reference to the runtime.
68+
*
69+
* @param extension the injected runtime instance
70+
*/
71+
@BeforeEach
72+
void before(DagxExtension extension) {
73+
// register a mock Vault
74+
extension.registerServiceMock(Vault.class, new MockVault());
75+
76+
// register a wait strategy of 1ms to speed up the interval between transfer manager iterations
77+
extension.registerServiceMock(TransferWaitStrategy.class, () -> 1);
78+
}
79+
80+
private static class MockVault implements Vault {
81+
private final Map<String, String> secrets = new ConcurrentHashMap<>();
82+
83+
@Override
84+
public @Nullable String resolveSecret(String key) {
85+
return secrets.get(key);
86+
}
87+
88+
@Override
89+
public VaultResponse storeSecret(String key, String value) {
90+
secrets.put(key, value);
91+
return VaultResponse.OK;
92+
}
93+
94+
@Override
95+
public VaultResponse deleteSecret(String key) {
96+
secrets.remove(key);
97+
return VaultResponse.OK;
98+
}
99+
}
100+
}

spi/src/main/java/com/microsoft/dagx/spi/types/domain/transfer/TransferProcess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public void updateStateTimestamp() {
250250
public String toString() {
251251
return "TransferProcess{" +
252252
"id='" + id + '\'' +
253-
", state=" + state +
253+
", state=" + TransferProcessStates.from(state) +
254254
", stateTimestamp=" + Instant.ofEpochMilli(stateTimestamp) +
255255
'}';
256256
}

0 commit comments

Comments
 (0)