Skip to content

Commit 10abce4

Browse files
committed
Cleaner modificationapplication contextx schedule implementation
1 parent 3e2e66d commit 10abce4

File tree

1 file changed

+38
-21
lines changed

1 file changed

+38
-21
lines changed

src/main/java/org/gridsuite/modification/server/service/NetworkModificationService.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -203,30 +203,47 @@ public CompletableFuture<NetworkModificationsResult> createNetworkModification(@
203203
* Apply modifications on several networks
204204
*/
205205
private CompletableFuture<List<Optional<NetworkModificationResult>>> applyModifications(UUID groupUuid, List<ModificationEntity> modifications, List<ModificationApplicationContext> applicationContexts) {
206-
// Do we want to do these in parallel (CompletableFuture.allOf) or sequential (sometimes called "Waterfall") ?
206+
// Do we want to do these all in parallel (CompletableFuture.allOf) or sequentially (like in Flux.concatMap) or something in between ?
207207
// sequentially like before for now
208-
// Is there a library or something to simplify this code ?
209-
List<Optional<NetworkModificationResult>> results = new ArrayList<>(applicationContexts.size());
210-
CompletableFuture<List<Optional<NetworkModificationResult>>> chainedFutures = CompletableFuture.completedFuture(results);
211-
for (ModificationApplicationContext modificationApplicationContext : applicationContexts) {
212-
// thencompose, this should add the computation result to the list and
213-
// and schedule the next computation in the same thread as the task
214-
// The list is accessed from different threads but not concurrently and
215-
// with happens-before semantics.
216-
chainedFutures = chainedFutures.thenCompose(accumulatingresults ->
208+
List<CompletableFuture<Optional<NetworkModificationResult>>> results = new ArrayList<>(applicationContexts.size());
209+
return scheduleApplyModifications(
210+
modificationApplicationContext ->
217211
applyModifications(
218-
modificationApplicationContext.networkUuid(),
219-
modificationApplicationContext.variantId(),
220-
new ModificationApplicationGroup(groupUuid,
221-
modifications.stream().filter(m -> !modificationApplicationContext.excludedModifications().contains(m.getId())).toList(),
222-
new ReportInfos(modificationApplicationContext.reportUuid(), modificationApplicationContext.reporterId())
223-
)).thenApply(result -> {
224-
accumulatingresults.add(result);
225-
return accumulatingresults;
226-
})
227-
);
212+
modificationApplicationContext.networkUuid(),
213+
modificationApplicationContext.variantId(),
214+
new ModificationApplicationGroup(groupUuid,
215+
modifications.stream().filter(m -> !modificationApplicationContext.excludedModifications().contains(m.getId())).toList(),
216+
new ReportInfos(modificationApplicationContext.reportUuid(), modificationApplicationContext.reporterId())
217+
)
218+
),
219+
applicationContexts, results
220+
).thenApply(unused ->
221+
results.stream().map(CompletableFuture::resultNow).toList());
222+
}
223+
224+
/**
225+
* @param results should pass an empty list to be filled with the results
226+
*/
227+
// The signature of this method is chosen so that we can implement easily sequential or parallel schedule
228+
// If we change it (for example to parallel scheduling), we should keep the exceptional behavior consistent,
229+
// call the apply function inside a thenCompose anyway to wrap its exceptions in exceptional future completions.
230+
private static CompletableFuture<Void> scheduleApplyModifications(
231+
Function<ModificationApplicationContext, CompletableFuture<Optional<NetworkModificationResult>>> func,
232+
List<ModificationApplicationContext> applicationContexts,
233+
List<CompletableFuture<Optional<NetworkModificationResult>>> results) {
234+
CompletableFuture<?> chainedFutures = CompletableFuture.completedFuture(null);
235+
for (ModificationApplicationContext applicationContext : applicationContexts) {
236+
chainedFutures = chainedFutures.thenCompose(unused -> {
237+
var cf = func.apply(applicationContext);
238+
// thencompose, this should add the computation result to the list and
239+
// and schedule the next computation in the same thread as the task
240+
// The list is accessed from different threads but not concurrently and
241+
// with happens-before semantics.
242+
results.add(cf);
243+
return cf;
244+
});
228245
}
229-
return chainedFutures;
246+
return chainedFutures.thenCompose(unused -> CompletableFuture.completedFuture(null));
230247
}
231248

232249
public Network cloneNetworkVariant(UUID networkUuid,

0 commit comments

Comments
 (0)