Skip to content

Commit 9f66f6a

Browse files
authored
Produce contingency list as a flux to avoid 'DataBufferLimitException' for very large lists (#10)
Signed-off-by: Slimane AMAR <[email protected]>
1 parent e6944e2 commit 9f66f6a

File tree

4 files changed

+70
-39
lines changed

4 files changed

+70
-39
lines changed

src/main/java/org/gridsuite/securityanalysis/server/service/ActionsService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
import org.springframework.core.ParameterizedTypeReference;
1313
import org.springframework.stereotype.Service;
1414
import org.springframework.web.reactive.function.client.WebClient;
15-
import reactor.core.publisher.Mono;
15+
import reactor.core.publisher.Flux;
1616

17-
import java.util.List;
1817
import java.util.Objects;
1918
import java.util.UUID;
2019

@@ -39,7 +38,7 @@ public ActionsService(WebClient webClient) {
3938
this.webClient = Objects.requireNonNull(webClient);
4039
}
4140

42-
public Mono<List<Contingency>> getContingencyList(String name, UUID networkUuid) {
41+
public Flux<Contingency> getContingencyList(String name, UUID networkUuid) {
4342
Objects.requireNonNull(name);
4443
Objects.requireNonNull(networkUuid);
4544
return webClient.get()
@@ -48,6 +47,7 @@ public Mono<List<Contingency>> getContingencyList(String name, UUID networkUuid)
4847
.queryParam("networkUuid", networkUuid.toString())
4948
.build(name))
5049
.retrieve()
51-
.bodyToMono(new ParameterizedTypeReference<>() { });
50+
.bodyToFlux(new ParameterizedTypeReference<>() {
51+
});
5252
}
5353
}

src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@
3131
import reactor.core.publisher.Mono;
3232
import reactor.core.scheduler.Schedulers;
3333

34-
import java.util.ArrayList;
35-
import java.util.List;
36-
import java.util.Map;
37-
import java.util.Objects;
38-
import java.util.Set;
39-
import java.util.UUID;
34+
import java.util.*;
4035
import java.util.concurrent.CancellationException;
4136
import java.util.concurrent.CompletableFuture;
4237
import java.util.concurrent.ConcurrentHashMap;
@@ -138,8 +133,7 @@ private Mono<SecurityAnalysisResult> run(SecurityAnalysisRunContext context, UUI
138133
Mono<Network> network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids());
139134

140135
Mono<List<Contingency>> contingencies = Flux.fromIterable(context.getContingencyListNames())
141-
.flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid())
142-
.flatMapMany(Flux::fromIterable))
136+
.flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid()))
143137
.collectList();
144138

145139
return Mono.zip(network, contingencies)

src/test/java/org/gridsuite/securityanalysis/server/SecurityAnalysisControllerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import com.powsybl.network.store.client.NetworkStoreService;
1212
import com.powsybl.network.store.client.PreloadingStrategy;
1313
import org.gridsuite.securityanalysis.server.service.ActionsService;
14-
import org.gridsuite.securityanalysis.server.service.UuidGeneratorService;
1514
import org.gridsuite.securityanalysis.server.service.SecurityAnalysisConfigService;
1615
import org.gridsuite.securityanalysis.server.service.SecurityAnalysisService;
16+
import org.gridsuite.securityanalysis.server.service.UuidGeneratorService;
1717
import org.junit.Before;
1818
import org.junit.Test;
1919
import org.junit.runner.RunWith;
@@ -31,7 +31,7 @@
3131
import org.springframework.test.context.junit4.SpringRunner;
3232
import org.springframework.test.web.reactive.server.WebTestClient;
3333
import org.springframework.web.reactive.config.EnableWebFlux;
34-
import reactor.core.publisher.Mono;
34+
import reactor.core.publisher.Flux;
3535

3636
import java.util.UUID;
3737

@@ -93,9 +93,9 @@ public void setUp() {
9393

9494
// action service mocking
9595
given(actionsService.getContingencyList(CONTINGENCY_LIST_NAME, NETWORK_UUID))
96-
.willReturn(Mono.just(SecurityAnalysisFactoryMock.CONTINGENCIES));
96+
.willReturn(Flux.fromIterable(SecurityAnalysisFactoryMock.CONTINGENCIES));
9797
given(actionsService.getContingencyList(CONTINGENCY_LIST2_NAME, NETWORK_UUID))
98-
.willReturn(Mono.just(SecurityAnalysisFactoryMock.CONTINGENCIES));
98+
.willReturn(Flux.fromIterable(SecurityAnalysisFactoryMock.CONTINGENCIES));
9999

100100
// UUID service mocking to always generate the same result UUID
101101
given(uuidGeneratorService.generate()).willReturn(RESULT_UUID);

src/test/java/org/gridsuite/securityanalysis/server/service/ActionsServiceTest.java

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package org.gridsuite.securityanalysis.server.service;
88

9-
import com.fasterxml.jackson.core.JsonProcessingException;
109
import com.fasterxml.jackson.databind.ObjectMapper;
1110
import com.powsybl.contingency.BranchContingency;
1211
import com.powsybl.contingency.Contingency;
@@ -16,9 +15,11 @@
1615
import okhttp3.mockwebserver.MockWebServer;
1716
import okhttp3.mockwebserver.RecordedRequest;
1817
import org.gridsuite.securityanalysis.server.WebFluxConfig;
18+
import org.junit.After;
1919
import org.junit.Before;
2020
import org.junit.Test;
2121
import org.junit.runner.RunWith;
22+
import org.springframework.http.HttpStatus;
2223
import org.springframework.http.MediaType;
2324
import org.springframework.http.codec.json.Jackson2JsonDecoder;
2425
import org.springframework.http.codec.json.Jackson2JsonEncoder;
@@ -27,30 +28,41 @@
2728
import org.springframework.web.reactive.function.client.WebClient;
2829

2930
import java.io.IOException;
30-
import java.io.UncheckedIOException;
3131
import java.util.List;
32+
import java.util.Objects;
3233
import java.util.UUID;
34+
import java.util.stream.Collectors;
35+
import java.util.stream.IntStream;
3336

3437
import static org.junit.Assert.assertEquals;
3538

3639
/**
3740
* @author Geoffroy Jamgotchian <geoffroy.jamgotchian at rte-france.com>
41+
* @author Slimane Amar <slimane.amar at rte-france.com>
3842
*/
3943
@RunWith(SpringRunner.class)
4044
public class ActionsServiceTest {
4145

46+
private static final int DATA_BUFFER_LIMIT = 256 * 1024; // AbstractJackson2Decoder.maxInMemorySize
47+
4248
private static final String NETWORK_UUID = "7928181c-7977-4592-ba19-88027e4254e4";
4349

4450
private static final String LIST_NAME = "myList";
4551

52+
private static final String VERY_LARGE_LIST_NAME = "veryLargelist";
53+
4654
private static final Contingency CONTINGENCY = new Contingency("c1", new BranchContingency("b1"));
4755

4856
private final ObjectMapper objectMapper = WebFluxConfig.createObjectMapper();
4957

5058
private WebClient.Builder webClientBuilder;
5159

60+
private MockWebServer server;
61+
62+
private ActionsService actionsService;
63+
5264
@Before
53-
public void setUp() {
65+
public void setUp() throws IOException {
5466
webClientBuilder = WebClient.builder();
5567
ExchangeStrategies strategies = ExchangeStrategies
5668
.builder()
@@ -60,41 +72,66 @@ public void setUp() {
6072

6173
}).build();
6274
webClientBuilder.exchangeStrategies(strategies);
75+
76+
actionsService = new ActionsService(webClientBuilder, initMockWebServer());
6377
}
6478

65-
@Test
66-
public void test() throws IOException {
67-
MockWebServer server = new MockWebServer();
79+
@After
80+
public void tearDown() {
81+
try {
82+
server.shutdown();
83+
} catch (Exception e) {
84+
// Nothing to do
85+
}
86+
}
87+
88+
private String initMockWebServer() throws IOException {
89+
server = new MockWebServer();
6890
server.start();
91+
92+
String jsonExpected = objectMapper.writeValueAsString(List.of(CONTINGENCY));
93+
String veryLargeJsonExpected = objectMapper.writeValueAsString(createVeryLargeList());
94+
6995
final Dispatcher dispatcher = new Dispatcher() {
7096
@Override
7197
public MockResponse dispatch(RecordedRequest request) {
72-
if (("/" + ActionsService.ACTIONS_API_VERSION + "/contingency-lists/" + LIST_NAME + "/export?networkUuid=" + NETWORK_UUID)
73-
.equals(request.getPath())) {
74-
try {
75-
String json = objectMapper.writeValueAsString(List.of(CONTINGENCY));
76-
return new MockResponse()
77-
.setResponseCode(200)
78-
.setBody(json)
79-
.addHeader("Content-Type", "application/json; charset=utf-8");
80-
} catch (JsonProcessingException e) {
81-
throw new UncheckedIOException(e);
82-
}
98+
String requestPath = Objects.requireNonNull(request.getPath());
99+
if (requestPath.equals(String.format("/v1/contingency-lists/%s/export?networkUuid=%s", LIST_NAME, NETWORK_UUID))) {
100+
return new MockResponse().setResponseCode(HttpStatus.OK.value())
101+
.setBody(jsonExpected)
102+
.addHeader("Content-Type", "application/json; charset=utf-8");
103+
} else if (requestPath.equals(String.format("/v1/contingency-lists/%s/export?networkUuid=%s", VERY_LARGE_LIST_NAME, NETWORK_UUID))) {
104+
return new MockResponse().setResponseCode(HttpStatus.OK.value())
105+
.setBody(veryLargeJsonExpected)
106+
.addHeader("Content-Type", "application/json; charset=utf-8");
107+
} else {
108+
return new MockResponse().setResponseCode(HttpStatus.NOT_FOUND.value()).setBody("Path not supported: " + request.getPath());
83109
}
84-
return new MockResponse().setResponseCode(404);
85110
}
86111
};
112+
87113
server.setDispatcher(dispatcher);
88114

89-
// get server base URL
115+
// Ask the server for its URL. You'll need this to make HTTP requests.
90116
HttpUrl baseHttpUrl = server.url("");
91-
String baseUrl = baseHttpUrl.toString().substring(0, baseHttpUrl.toString().length() - 1);
92117

93-
ActionsService actionsService = new ActionsService(webClientBuilder, baseUrl);
118+
return baseHttpUrl.toString().substring(0, baseHttpUrl.toString().length() - 1);
119+
}
120+
121+
private List<Contingency> createVeryLargeList() {
122+
return IntStream.range(0, DATA_BUFFER_LIMIT).mapToObj(i -> new Contingency("l" + i, new BranchContingency("l" + i))).collect(Collectors.toList());
123+
}
94124

95-
List<Contingency> list = actionsService.getContingencyList(LIST_NAME, UUID.fromString(NETWORK_UUID)).block();
125+
@Test
126+
public void test() {
127+
List<Contingency> list = actionsService.getContingencyList(LIST_NAME, UUID.fromString(NETWORK_UUID)).collectList().block();
96128
assertEquals(List.of(CONTINGENCY), list);
129+
}
97130

98-
server.shutdown();
131+
@Test
132+
public void testVeryLargeList() {
133+
// DataBufferLimitException should not be thrown with this message : "Exceeded limit on max bytes to buffer : DATA_BUFFER_LIMIT"
134+
List<Contingency> list = actionsService.getContingencyList(VERY_LARGE_LIST_NAME, UUID.fromString(NETWORK_UUID)).collectList().block();
135+
assertEquals(createVeryLargeList(), list);
99136
}
100137
}

0 commit comments

Comments
 (0)