Skip to content

Commit 4dafdb0

Browse files
authored
Regroup equipmentDeletion notifications (#52)
Signed-off-by: Ayoub LABIDI <[email protected]>
1 parent c5fd10f commit 4dafdb0

File tree

5 files changed

+61
-20
lines changed

5 files changed

+61
-20
lines changed

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@
100100
<groupId>org.springframework.cloud</groupId>
101101
<artifactId>spring-cloud-stream</artifactId>
102102
</dependency>
103+
<dependency>
104+
<groupId>org.projectlombok</groupId>
105+
<artifactId>lombok</artifactId>
106+
</dependency>
103107

104108
<!-- Runtime dependencies -->
105109
<dependency>

src/main/java/org/gridsuite/study/notification/server/NotificationWebSocketHandler.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import com.fasterxml.jackson.core.JsonProcessingException;
1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
21+
import org.gridsuite.study.notification.server.dto.NetworkImpactsInfos;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224
import org.springframework.beans.factory.annotation.Value;
@@ -56,8 +58,7 @@ public class NotificationWebSocketHandler implements WebSocketHandler {
5658
static final String HEADER_TIMESTAMP = "timestamp";
5759
static final String HEADER_ERROR = "error";
5860
static final String HEADER_SUBSTATIONS_IDS = "substationsIds";
59-
static final String HEADER_DELETED_EQUIPMENT_ID = "deletedEquipmentId";
60-
static final String HEADER_DELETED_EQUIPMENT_TYPE = "deletedEquipmentType";
61+
static final String HEADER_DELETED_EQUIPMENTS = "deletedEquipments";
6162
static final String HEADER_NODE = "node";
6263
static final String HEADER_NODES = "nodes";
6364
static final String HEADER_PARENT_NODE = "parentNode";
@@ -75,12 +76,12 @@ public NotificationWebSocketHandler(ObjectMapper jacksonObjectMapper, @Value("${
7576
this.heartbeatInterval = heartbeatInterval;
7677
}
7778

78-
Flux<Message<String>> flux;
79+
Flux<Message<NetworkImpactsInfos>> flux;
7980

8081
@Bean
81-
public Consumer<Flux<Message<String>>> consumeNotification() {
82+
public Consumer<Flux<Message<NetworkImpactsInfos>>> consumeNotification() {
8283
return f -> {
83-
ConnectableFlux<Message<String>> c = f.log(CATEGORY_BROKER_INPUT, Level.FINE).publish();
84+
ConnectableFlux<Message<NetworkImpactsInfos>> c = f.log(CATEGORY_BROKER_INPUT, Level.FINE).publish();
8485
this.flux = c;
8586
c.connect();
8687
// Force connect 1 fake subscriber to consumme messages as they come.
@@ -97,7 +98,7 @@ private Flux<WebSocketMessage> notificationFlux(WebSocketSession webSocketSessio
9798
String filterStudyUuid,
9899
String filterUpdateType) {
99100
return flux.transform(f -> {
100-
Flux<Message<String>> res = f;
101+
Flux<Message<NetworkImpactsInfos>> res = f;
101102
if (filterStudyUuid != null) {
102103
res = res.filter(m -> filterStudyUuid.equals(m.getHeaders().get(HEADER_STUDY_UUID)));
103104
}
@@ -124,8 +125,6 @@ private static Map<String, Object> toResultHeader(Map<String, Object> messageHea
124125
passHeader(messageHeader, resHeader, HEADER_STUDY_UUID);
125126
passHeader(messageHeader, resHeader, HEADER_ERROR);
126127
passHeader(messageHeader, resHeader, HEADER_SUBSTATIONS_IDS);
127-
passHeader(messageHeader, resHeader, HEADER_DELETED_EQUIPMENT_ID);
128-
passHeader(messageHeader, resHeader, HEADER_DELETED_EQUIPMENT_TYPE);
129128
passHeader(messageHeader, resHeader, HEADER_PARENT_NODE);
130129
passHeader(messageHeader, resHeader, HEADER_INSERT_MODE);
131130
passHeader(messageHeader, resHeader, HEADER_REMOVE_CHILDREN);
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Copyright (c) 2022, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
8+
package org.gridsuite.study.notification.server.dto;
9+
10+
import lombok.Getter;
11+
import lombok.NoArgsConstructor;
12+
13+
@Getter
14+
@NoArgsConstructor
15+
public class EquipmentDeletionInfos {
16+
String equipmentId;
17+
String equipmentType;
18+
}
19+
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright (c) 2022, RTE (http://www.rte-france.com)
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
6+
*/
7+
8+
package org.gridsuite.study.notification.server.dto;
9+
10+
import java.util.Set;
11+
12+
import lombok.AllArgsConstructor;
13+
import lombok.Getter;
14+
15+
@Getter
16+
@AllArgsConstructor
17+
public class NetworkImpactsInfos {
18+
private Set<String> impactedSubstationsIds;
19+
private Set<EquipmentDeletionInfos> deletedEquipments;
20+
}

src/test/java/org/gridsuite/study/notification/server/NotificationWebSocketHandlerTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
import com.fasterxml.jackson.core.JsonProcessingException;
2121
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import com.google.common.collect.ImmutableSet;
23+
24+
import org.gridsuite.study.notification.server.dto.EquipmentDeletionInfos;
25+
import org.gridsuite.study.notification.server.dto.NetworkImpactsInfos;
2226
import org.junit.Before;
2327
import org.junit.Test;
2428
import org.mockito.ArgumentCaptor;
@@ -52,7 +56,6 @@ public class NotificationWebSocketHandlerTest {
5256
private ObjectMapper objectMapper;
5357
private WebSocketSession ws;
5458
private HandshakeInfo handshakeinfo;
55-
private Flux<Message<String>> flux;
5659

5760
@Before
5861
public void setup() {
@@ -106,13 +109,13 @@ private void withFilters(String filterStudyUuid, String filterUpdateType) {
106109
setUpUriComponentBuilder(connectedUserId, filterStudyUuid, filterUpdateType);
107110

108111
var notificationWebSocketHandler = new NotificationWebSocketHandler(objectMapper, Integer.MAX_VALUE);
109-
var atomicRef = new AtomicReference<FluxSink<Message<String>>>();
112+
var atomicRef = new AtomicReference<FluxSink<Message<NetworkImpactsInfos>>>();
110113
var flux = Flux.create(atomicRef::set);
111114
notificationWebSocketHandler.consumeNotification().accept(flux);
112115
var sink = atomicRef.get();
113116
notificationWebSocketHandler.handle(ws);
114117

115-
List<GenericMessage<String>> refMessages = Stream.<Map<String, Object>>of(
118+
List<GenericMessage<NetworkImpactsInfos>> refMessages = Stream.<Map<String, Object>>of(
116119
Map.of(HEADER_STUDY_UUID, "foo", HEADER_UPDATE_TYPE, "oof"),
117120
Map.of(HEADER_STUDY_UUID, "bar", HEADER_UPDATE_TYPE, "oof"),
118121
Map.of(HEADER_STUDY_UUID, "baz", HEADER_UPDATE_TYPE, "oof"),
@@ -126,8 +129,6 @@ private void withFilters(String filterStudyUuid, String filterUpdateType) {
126129
Map.of(HEADER_STUDY_UUID, "foo bar/bar", HEADER_UPDATE_TYPE, "foobar"),
127130
Map.of(HEADER_STUDY_UUID, "bar", HEADER_UPDATE_TYPE, "studies", HEADER_ERROR, "error_message"),
128131
Map.of(HEADER_STUDY_UUID, "bar", HEADER_UPDATE_TYPE, "rab", HEADER_SUBSTATIONS_IDS, "s1"),
129-
Map.of(HEADER_STUDY_UUID, "bar", HEADER_UPDATE_TYPE, "rab", HEADER_DELETED_EQUIPMENT_ID, "id1"),
130-
Map.of(HEADER_STUDY_UUID, "bar", HEADER_UPDATE_TYPE, "rab", HEADER_DELETED_EQUIPMENT_TYPE, "type1"),
131132

132133
Map.of(HEADER_STUDY_UUID, "public_" + connectedUserId, HEADER_UPDATE_TYPE, "oof", HEADER_USER_ID, connectedUserId),
133134
Map.of(HEADER_STUDY_UUID, "public_" + otherUserId, HEADER_UPDATE_TYPE, "rab", HEADER_USER_ID, otherUserId),
@@ -138,7 +139,7 @@ private void withFilters(String filterStudyUuid, String filterUpdateType) {
138139
Map.of(HEADER_STUDY_UUID, "nodes", HEADER_UPDATE_TYPE, "update", HEADER_NODE, UUID.randomUUID().toString()),
139140
Map.of(HEADER_STUDY_UUID, "nodes", HEADER_UPDATE_TYPE, "delete", HEADER_NODES, List.of(UUID.randomUUID().toString()),
140141
HEADER_PARENT_NODE, UUID.randomUUID().toString(), HEADER_REMOVE_CHILDREN, true))
141-
.map(map -> new GenericMessage<>("", map))
142+
.map(map -> new GenericMessage<>(new NetworkImpactsInfos(ImmutableSet.of(), ImmutableSet.of(new EquipmentDeletionInfos())), map))
142143
.collect(Collectors.toList());
143144

144145
@SuppressWarnings("unchecked")
@@ -180,8 +181,6 @@ private Map<String, Object> toResultHeader(Map<String, Object> messageHeader) {
180181
passHeaderRef(messageHeader, resHeader, HEADER_STUDY_UUID);
181182
passHeaderRef(messageHeader, resHeader, HEADER_ERROR);
182183
passHeaderRef(messageHeader, resHeader, HEADER_SUBSTATIONS_IDS);
183-
passHeaderRef(messageHeader, resHeader, HEADER_DELETED_EQUIPMENT_ID);
184-
passHeaderRef(messageHeader, resHeader, HEADER_DELETED_EQUIPMENT_TYPE);
185184
passHeaderRef(messageHeader, resHeader, HEADER_NEW_NODE);
186185
passHeaderRef(messageHeader, resHeader, HEADER_NODE);
187186
passHeaderRef(messageHeader, resHeader, HEADER_NODES);
@@ -230,7 +229,7 @@ public void testHeartbeat() {
230229
setUpUriComponentBuilder("userId");
231230

232231
var notificationWebSocketHandler = new NotificationWebSocketHandler(null, 1);
233-
var flux = Flux.<Message<String>>empty();
232+
var flux = Flux.<Message<NetworkImpactsInfos>>empty();
234233
notificationWebSocketHandler.consumeNotification().accept(flux);
235234
notificationWebSocketHandler.handle(ws);
236235

@@ -245,13 +244,13 @@ public void testDiscard() {
245244
setUpUriComponentBuilder("userId");
246245

247246
var notificationWebSocketHandler = new NotificationWebSocketHandler(objectMapper, Integer.MAX_VALUE);
248-
var atomicRef = new AtomicReference<FluxSink<Message<String>>>();
247+
var atomicRef = new AtomicReference<FluxSink<Message<NetworkImpactsInfos>>>();
249248
var flux = Flux.create(atomicRef::set);
250249
notificationWebSocketHandler.consumeNotification().accept(flux);
251250
var sink = atomicRef.get();
252251
Map<String, Object> headers = Map.of(HEADER_STUDY_UUID, "foo", HEADER_UPDATE_TYPE, "oof");
253252

254-
sink.next(new GenericMessage<>("", headers)); // should be discarded, no client connected
253+
sink.next(new GenericMessage<>(new NetworkImpactsInfos(ImmutableSet.of(), ImmutableSet.of(new EquipmentDeletionInfos())), headers)); // should be discarded, no client connected
255254

256255
notificationWebSocketHandler.handle(ws);
257256

@@ -262,7 +261,7 @@ public void testDiscard() {
262261
Disposable d1 = out1.map(WebSocketMessage::getPayloadAsText).subscribe(messages1::add);
263262
d1.dispose();
264263

265-
sink.next(new GenericMessage<>("", headers)); // should be discarded, first client disconnected
264+
sink.next(new GenericMessage<>(new NetworkImpactsInfos(ImmutableSet.of(), ImmutableSet.of(new EquipmentDeletionInfos())), headers)); // should be discarded, first client disconnected
266265

267266
notificationWebSocketHandler.handle(ws);
268267

0 commit comments

Comments
 (0)