|
36 | 36 | import java.util.concurrent.CompletableFuture; |
37 | 37 | import java.util.concurrent.ConcurrentHashMap; |
38 | 38 | import java.util.function.Consumer; |
| 39 | +import java.util.logging.Level; |
39 | 40 | import java.util.stream.Collectors; |
40 | 41 |
|
41 | 42 | import static org.gridsuite.securityanalysis.server.service.SecurityAnalysisStoppedPublisherService.CANCEL_MESSAGE; |
@@ -141,79 +142,78 @@ private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUI |
141 | 142 |
|
142 | 143 | return Mono.zip(network, contingencies) |
143 | 144 | .flatMap(tuple -> { |
144 | | - SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); |
145 | | - CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); |
146 | | - if (resultUuid != null) { |
147 | | - futures.put(resultUuid, future); |
148 | | - } |
149 | 145 | if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { |
150 | 146 | return Mono.empty(); |
151 | 147 | } else { |
| 148 | + SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); |
| 149 | + CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); |
| 150 | + if (resultUuid != null) { |
| 151 | + futures.put(resultUuid, future); |
| 152 | + } |
152 | 153 | return Mono.fromCompletionStage(future); |
153 | 154 | } |
154 | 155 | }); |
155 | 156 | } |
156 | 157 |
|
157 | 158 | @Bean |
158 | | - public Consumer<Message<String>> consumeRun() { |
159 | | - return message -> { |
160 | | - |
161 | | - SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); |
162 | | - runRequests.add(resultContext.getResultUuid()); |
163 | | - |
164 | | - run(resultContext.getRunContext(), resultContext.getResultUuid()) |
165 | | - .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) |
166 | | - .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) |
167 | | - .then(Mono.just(result))) |
168 | | - .doOnSuccess(result -> { |
169 | | - if (result != null) { // result available |
170 | | - resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); |
171 | | - LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); |
172 | | - } else { // result not available : stop computation request |
173 | | - if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { |
174 | | - stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); |
175 | | - } |
176 | | - } |
177 | | - }) |
178 | | - .onErrorResume(throwable -> { |
179 | | - if (!(throwable instanceof CancellationException)) { |
180 | | - LOGGER.error(FAIL_MESSAGE, throwable); |
181 | | - stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage()); |
182 | | - return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty()); |
183 | | - } |
184 | | - return Mono.empty(); |
185 | | - }) |
186 | | - .doFinally(s -> { |
187 | | - futures.remove(resultContext.getResultUuid()); |
188 | | - cancelComputationRequests.remove(resultContext.getResultUuid()); |
189 | | - runRequests.remove(resultContext.getResultUuid()); |
190 | | - }) |
191 | | - .subscribe(); |
192 | | - }; |
| 159 | + public Consumer<Flux<Message<String>>> consumeRun() { |
| 160 | + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) |
| 161 | + .flatMap(message -> { |
| 162 | + SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); |
| 163 | + runRequests.add(resultContext.getResultUuid()); |
| 164 | + |
| 165 | + return run(resultContext.getRunContext(), resultContext.getResultUuid()) |
| 166 | + .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) |
| 167 | + .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) |
| 168 | + .then(Mono.just(result))) |
| 169 | + .doOnSuccess(result -> { |
| 170 | + if (result != null) { // result available |
| 171 | + resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); |
| 172 | + LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); |
| 173 | + } |
| 174 | + }) |
| 175 | + .onErrorResume(throwable -> { |
| 176 | + if (!(throwable instanceof CancellationException)) { |
| 177 | + LOGGER.error(FAIL_MESSAGE, throwable); |
| 178 | + stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage()); |
| 179 | + return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty()); |
| 180 | + } |
| 181 | + return Mono.empty(); |
| 182 | + }) |
| 183 | + .doFinally(s -> { |
| 184 | + futures.remove(resultContext.getResultUuid()); |
| 185 | + cancelComputationRequests.remove(resultContext.getResultUuid()); |
| 186 | + runRequests.remove(resultContext.getResultUuid()); |
| 187 | + }); |
| 188 | + }) |
| 189 | + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeRun", t)) |
| 190 | + .subscribe(); |
193 | 191 | } |
194 | 192 |
|
195 | 193 | @Bean |
196 | | - public Consumer<Message<String>> consumeCancel() { |
197 | | - return message -> { |
198 | | - SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); |
| 194 | + public Consumer<Flux<Message<String>>> consumeCancel() { |
| 195 | + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) |
| 196 | + .flatMap(message -> { |
| 197 | + SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); |
199 | 198 |
|
200 | | - if (runRequests.contains(cancelContext.getResultUuid())) { |
201 | | - cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); |
202 | | - } |
| 199 | + if (runRequests.contains(cancelContext.getResultUuid())) { |
| 200 | + cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); |
| 201 | + } |
203 | 202 |
|
204 | | - // find the completableFuture associated with result uuid |
205 | | - CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid()); |
206 | | - if (future != null) { |
207 | | - future.cancel(true); // cancel computation in progress |
208 | | - |
209 | | - resultRepository.delete(cancelContext.getResultUuid()) |
210 | | - .doOnSuccess(unused -> { |
211 | | - stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver()); |
212 | | - LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid()); |
213 | | - }) |
214 | | - .doOnError(throwable -> LOGGER.error(throwable.toString(), throwable)) |
215 | | - .subscribe(); |
216 | | - } |
217 | | - }; |
| 203 | + // find the completableFuture associated with result uuid |
| 204 | + CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid()); |
| 205 | + if (future != null) { |
| 206 | + future.cancel(true); // cancel computation in progress |
| 207 | + |
| 208 | + return resultRepository.delete(cancelContext.getResultUuid()) |
| 209 | + .doOnSuccess(unused -> { |
| 210 | + stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver()); |
| 211 | + LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid()); |
| 212 | + }); |
| 213 | + } |
| 214 | + return Mono.empty(); |
| 215 | + }) |
| 216 | + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeCancel", t)) |
| 217 | + .subscribe(); |
218 | 218 | } |
219 | 219 | } |
0 commit comments