Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit f959dcb

Browse files
update TransferProcess after every check
1 parent 65f6170 commit f959dcb

File tree

6 files changed

+85
-18
lines changed

6 files changed

+85
-18
lines changed

extensions/transfer/transfer-core/src/main/java/com/microsoft/dagx/transfer/core/transfer/TransferProcessManagerImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ private int checkProvisioned() {
139139
} else {
140140
process.transitionStreaming();
141141
}
142-
transferProcessStore.update(process);
143142
} else {
144143
monitor.debug("Process " + process.getId() + " does not yet have provisioned resources, will stay in " + TransferProcessStates.REQUESTED_ACK);
145144
}
145+
transferProcessStore.update(process);
146146
}
147147

148148
return requestAcked.size();
@@ -164,7 +164,6 @@ private int checkCompleted() {
164164
// todo: maybe error out processes with uncheckable resources??
165165
final List<ProvisionedResource> resources = process.getProvisionedResourceSet().getResources().stream().filter(this::hasChecker).collect(Collectors.toList());
166166

167-
168167
//todo: comment this in if we want to error out uncheckable resources
169168
// var resourcesWithNoChecker = resources.stream().filter(resource -> statusCheckerRegistry.resolve(resource) == null).collect(Collectors.toList());
170169
// if (!resourcesWithNoChecker.isEmpty()) {
@@ -178,10 +177,9 @@ private int checkCompleted() {
178177
// update the process once ALL resources are completed
179178
if (resources.stream().allMatch(this::isComplete)) {
180179
process.transitionCompleted();
181-
transferProcessStore.update(process);
182180
monitor.debug("Process " + process.getId() + " is now " + TransferProcessStates.COMPLETED);
183181
}
184-
182+
transferProcessStore.update(process);
185183
}
186184
return processesInProgress.size();
187185
}
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import static org.assertj.core.api.Assertions.assertThat;
2525
import static org.easymock.EasyMock.*;
2626

27-
public class TransferManagerImplConsumerTest {
27+
class TransferProcessManagerImplConsumerTest {
2828

2929
private static final long TIMEOUT = 5;
3030
private TransferProcessManagerImpl transferProcessManager;
@@ -243,6 +243,8 @@ void verifyCompleted_notAllYetCompleted() throws InterruptedException {
243243
cdl.countDown();
244244
return Collections.emptyList();
245245
}).anyTimes();
246+
processStoreMock.update(eq(process));
247+
expectLastCall().anyTimes();
246248
replay(processStoreMock);
247249

248250
// prepare statuschecker registry

extensions/transfer/transfer-store-memory/src/main/java/com/microsoft/dagx/transfer/store/memory/InMemoryTransferProcessStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ public String processIdForTransferId(String id) {
4747
public @NotNull List<TransferProcess> nextForState(int state, int max) {
4848
return readLock(() -> {
4949
var set = stateCache.get(state);
50-
return set == null ? Collections.emptyList() : set.stream().limit(max).map(TransferProcess::copy).collect(toList());
50+
return set == null ? Collections.emptyList() : set.stream()
51+
.sorted(Comparator.comparingLong(TransferProcess::getStateTimestamp)) //order by state timestamp, oldest first
52+
.limit(max)
53+
.map(TransferProcess::copy)
54+
.collect(toList());
5155
});
5256
}
5357

@@ -67,6 +71,7 @@ public void create(TransferProcess process) {
6771
@Override
6872
public void update(TransferProcess process) {
6973
writeLock(() -> {
74+
process.updateStateTimestamp();
7075
delete(process.getId());
7176
TransferProcess internalCopy = process.copy();
7277
processesByExternalId.put(process.getDataRequest().getId(), internalCopy);

extensions/transfer/transfer-store-memory/src/test/java/com/microsoft/dagx/transfer/store/memory/InMemoryTransferProcessStoreTest.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,18 @@
55

66
package com.microsoft.dagx.transfer.store.memory;
77

8-
import com.microsoft.dagx.spi.types.domain.transfer.DataRequest;
9-
import com.microsoft.dagx.spi.types.domain.transfer.ResourceManifest;
10-
import com.microsoft.dagx.spi.types.domain.transfer.TransferProcess;
11-
import com.microsoft.dagx.spi.types.domain.transfer.TransferProcessStates;
8+
import com.microsoft.dagx.spi.types.domain.transfer.*;
129
import org.junit.jupiter.api.Assertions;
1310
import org.junit.jupiter.api.BeforeEach;
1411
import org.junit.jupiter.api.Test;
1512

1613
import java.util.List;
1714
import java.util.UUID;
1815

19-
import static org.junit.jupiter.api.Assertions.assertEquals;
20-
import static org.junit.jupiter.api.Assertions.assertNotNull;
21-
import static org.junit.jupiter.api.Assertions.assertNotSame;
22-
import static org.junit.jupiter.api.Assertions.assertNull;
23-
import static org.junit.jupiter.api.Assertions.assertTrue;
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.easymock.EasyMock.niceMock;
18+
import static org.easymock.EasyMock.replay;
19+
import static org.junit.jupiter.api.Assertions.*;
2420

2521
/**
2622
*
@@ -108,6 +104,46 @@ void verifyMutlipleRequets() {
108104

109105
}
110106

107+
@Test
108+
void verifyOrderingByTimestamp() {
109+
for (int i = 0; i < 100; i++) {
110+
final TransferProcess process = createProcess("test-process-" + i);
111+
store.create(process);
112+
}
113+
114+
final List<TransferProcess> processes = store.nextForState(TransferProcessStates.INITIAL.code(), 50);
115+
116+
assertThat(processes).hasSize(50);
117+
assertThat(processes).allMatch(p -> p.getStateTimestamp() > 0);
118+
}
119+
120+
@Test
121+
void verifyNextForState_avoidsStarvation() throws InterruptedException {
122+
for (int i = 0; i < 10; i++) {
123+
final TransferProcess process = createProcess("test-process-" + i);
124+
store.create(process);
125+
}
126+
127+
var list1 = store.nextForState(TransferProcessStates.INITIAL.code(), 5);
128+
Thread.sleep(50); //simulate a short delay to generate different timestamps
129+
list1.forEach(tp -> store.update(tp));
130+
var list2 = store.nextForState(TransferProcessStates.INITIAL.code(), 5);
131+
assertThat(list1).isNotEqualTo(list2).doesNotContainAnyElementsOf(list2);
132+
}
133+
134+
private TransferProcess createProcess(String name) {
135+
final DataRequest mock = niceMock(DataRequest.class);
136+
replay(mock);
137+
return TransferProcess.Builder.newInstance()
138+
.type(TransferProcess.Type.CLIENT)
139+
.id(name)
140+
.stateTimestamp(0)
141+
.state(TransferProcessStates.UNSAVED.code())
142+
.provisionedResourceSet(new ProvisionedResourceSet())
143+
.dataRequest(mock)
144+
.build();
145+
}
146+
111147

112148
@BeforeEach
113149
void setUp() {

spi/src/main/java/com/microsoft/dagx/spi/transfer/store/TransferProcessStore.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ public interface TransferProcessStore {
2222
@Nullable
2323
String processIdForTransferId(String id);
2424

25+
/**
26+
* Returns a list of TransferProcesses that are in a specific state.
27+
* <p>
28+
* Implementors MUST handle the starvation scenario, i.e. when the number of processes is greater than the number
29+
* passedin via {@code max}.
30+
* E.g. database-based implementations should perform a query along the lines of {@code SELECT ... ORDER BY TransferProcess#stateTimestamp}.
31+
* Then, after the check, users of this method must update the {@code TransferProcess#stateTimestamp} even if the process
32+
* remains unchanged.
33+
* Some database frameworks such as Spring have automatic lastChanged columns.
34+
*
35+
* @param state The state that the processes of interest should be in.
36+
* @param max The maximum amount of result items.
37+
* @return A list of TransferProcesses (at most _max_) that are in the desired state.
38+
*/
2539
@NotNull
2640
List<TransferProcess> nextForState(int state, int max);
2741

spi/src/main/java/com/microsoft/dagx/spi/types/domain/transfer/TransferProcess.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,14 @@ public void transitionError(@Nullable String errorDetail) {
197197
state = TransferProcessStates.ERROR.code();
198198
this.errorDetail = errorDetail;
199199
stateCount = 1;
200-
stateTimestamp = Instant.now().toEpochMilli();
200+
updateStateTimestamp();
201201
}
202202

203203

204204
public void rollbackState(TransferProcessStates state) {
205205
this.state = state.code();
206206
stateCount = 1;
207-
stateTimestamp = Instant.now().toEpochMilli();
207+
updateStateTimestamp();
208208
}
209209

210210
public TransferProcess copy() {
@@ -222,7 +222,7 @@ private void transition(TransferProcessStates end, TransferProcessStates... star
222222
}
223223
stateCount = state == end.code() ? stateCount + 1 : 1;
224224
state = end.code();
225-
stateTimestamp = Instant.now().toEpochMilli();
225+
updateStateTimestamp();
226226
}
227227

228228
@Override
@@ -242,6 +242,18 @@ public int hashCode() {
242242
return Objects.hash(id);
243243
}
244244

245+
public void updateStateTimestamp() {
246+
stateTimestamp = Instant.now().toEpochMilli();
247+
}
248+
249+
@Override
250+
public String toString() {
251+
return "TransferProcess{" +
252+
"id='" + id + '\'' +
253+
", state=" + state +
254+
", stateTimestamp=" + Instant.ofEpochMilli(stateTimestamp) +
255+
'}';
256+
}
245257

246258
public enum Type {
247259
CLIENT, PROVIDER

0 commit comments

Comments
 (0)