|
41 | 41 | import java.util.concurrent.CompletableFuture;
|
42 | 42 | import java.util.concurrent.ConcurrentHashMap;
|
43 | 43 | import java.util.function.Consumer;
|
| 44 | +import java.util.logging.Level; |
44 | 45 | import java.util.stream.Collectors;
|
45 | 46 |
|
46 | 47 | /**
|
@@ -144,76 +145,75 @@ private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUI
|
144 | 145 |
|
145 | 146 | return Mono.zip(network, contingencies)
|
146 | 147 | .flatMap(tuple -> {
|
147 |
| - SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); |
148 |
| - CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); |
149 |
| - if (resultUuid != null) { |
150 |
| - futures.put(resultUuid, future); |
151 |
| - } |
152 | 148 | if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) {
|
153 | 149 | return Mono.empty();
|
154 | 150 | } else {
|
| 151 | + SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); |
| 152 | + CompletableFuture<SecurityAnalysisResult> future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); |
| 153 | + if (resultUuid != null) { |
| 154 | + futures.put(resultUuid, future); |
| 155 | + } |
155 | 156 | return Mono.fromCompletionStage(future);
|
156 | 157 | }
|
157 | 158 | });
|
158 | 159 | }
|
159 | 160 |
|
160 | 161 | @Bean
|
161 |
| - public Consumer<Message<String>> consumeRun() { |
162 |
| - return message -> { |
163 |
| - |
164 |
| - SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); |
165 |
| - runRequests.add(resultContext.getResultUuid()); |
166 |
| - |
167 |
| - run(resultContext.getRunContext(), resultContext.getResultUuid()) |
168 |
| - .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) |
169 |
| - .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) |
170 |
| - .then(Mono.just(result))) |
171 |
| - .doOnSuccess(result -> { |
172 |
| - if (result != null) { // result available |
173 |
| - resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); |
174 |
| - LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); |
175 |
| - } else { // result not available : stop computation request |
176 |
| - if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { |
177 |
| - stoppedPublisherService.publish(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); |
178 |
| - } |
179 |
| - } |
180 |
| - }) |
181 |
| - .doOnError(throwable -> { |
182 |
| - if (!(throwable instanceof CancellationException)) { |
183 |
| - LOGGER.error(throwable.toString(), throwable); |
184 |
| - } |
185 |
| - }) |
186 |
| - .doFinally(s -> { |
187 |
| - futures.remove(resultContext.getResultUuid()); |
188 |
| - cancelComputationRequests.remove(resultContext.getResultUuid()); |
189 |
| - runRequests.remove(resultContext.getResultUuid()); |
190 |
| - }) |
191 |
| - .subscribe(); |
192 |
| - }; |
| 162 | + public Consumer<Flux<Message<String>>> consumeRun() { |
| 163 | + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) |
| 164 | + .flatMap(message -> { |
| 165 | + SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); |
| 166 | + runRequests.add(resultContext.getResultUuid()); |
| 167 | + |
| 168 | + return run(resultContext.getRunContext(), resultContext.getResultUuid()) |
| 169 | + .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) |
| 170 | + .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) |
| 171 | + .then(Mono.just(result))) |
| 172 | + .doOnSuccess(result -> { |
| 173 | + if (result != null) { // result available |
| 174 | + resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); |
| 175 | + LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); |
| 176 | + } |
| 177 | + }) |
| 178 | + .doFinally(s -> { |
| 179 | + futures.remove(resultContext.getResultUuid()); |
| 180 | + cancelComputationRequests.remove(resultContext.getResultUuid()); |
| 181 | + runRequests.remove(resultContext.getResultUuid()); |
| 182 | + }) |
| 183 | + .doOnError(throwable -> { |
| 184 | + if (!(throwable instanceof CancellationException)) { |
| 185 | + LOGGER.error("Exception in consumeRun", throwable); |
| 186 | + } |
| 187 | + }); |
| 188 | + }) |
| 189 | + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeRun", t)) |
| 190 | + .subscribe(); |
193 | 191 | }
|
194 | 192 |
|
195 | 193 | @Bean
|
196 |
| - public Consumer<Message<String>> consumeCancel() { |
197 |
| - return message -> { |
198 |
| - SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); |
| 194 | + public Consumer<Flux<Message<String>>> consumeCancel() { |
| 195 | + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) |
| 196 | + .flatMap(message -> { |
| 197 | + SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); |
199 | 198 |
|
200 |
| - if (runRequests.contains(cancelContext.getResultUuid())) { |
201 |
| - cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); |
202 |
| - } |
| 199 | + if (runRequests.contains(cancelContext.getResultUuid())) { |
| 200 | + cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); |
| 201 | + } |
203 | 202 |
|
204 |
| - // find the completableFuture associated with result uuid |
205 |
| - CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid()); |
206 |
| - if (future != null) { |
207 |
| - future.cancel(true); // cancel computation in progress |
208 |
| - |
209 |
| - resultRepository.delete(cancelContext.getResultUuid()) |
210 |
| - .doOnSuccess(unused -> { |
211 |
| - stoppedPublisherService.publish(cancelContext.getResultUuid(), cancelContext.getReceiver()); |
212 |
| - LOGGER.info("Security analysis stopped (resultUuid='{}')", cancelContext.getResultUuid()); |
213 |
| - }) |
214 |
| - .doOnError(throwable -> LOGGER.error(throwable.toString(), throwable)) |
215 |
| - .subscribe(); |
216 |
| - } |
217 |
| - }; |
| 203 | + // find the completableFuture associated with result uuid |
| 204 | + CompletableFuture<SecurityAnalysisResult> future = futures.get(cancelContext.getResultUuid()); |
| 205 | + if (future != null) { |
| 206 | + future.cancel(true); // cancel computation in progress |
| 207 | + |
| 208 | + return resultRepository.delete(cancelContext.getResultUuid()) |
| 209 | + .doOnSuccess(unused -> { |
| 210 | + stoppedPublisherService.publish(cancelContext.getResultUuid(), cancelContext.getReceiver()); |
| 211 | + LOGGER.info("Security analysis stopped (resultUuid='{}')", cancelContext.getResultUuid()); |
| 212 | + }); |
| 213 | + } |
| 214 | + return Mono.empty(); |
| 215 | + }) |
| 216 | + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeCancel", t)) |
| 217 | + .subscribe(); |
218 | 218 | }
|
219 | 219 | }
|
0 commit comments