18
18
import com .fasterxml .jackson .core .JsonProcessingException ;
19
19
import com .fasterxml .jackson .databind .ObjectMapper ;
20
20
21
+ import org .gridsuite .study .notification .server .dto .Filters ;
22
+ import org .gridsuite .study .notification .server .dto .FiltersToAdd ;
23
+ import org .gridsuite .study .notification .server .dto .FiltersToRemove ;
21
24
import org .gridsuite .study .notification .server .dto .NetworkImpactsInfos ;
22
25
import org .slf4j .Logger ;
23
26
import org .slf4j .LoggerFactory ;
@@ -51,7 +54,9 @@ public class NotificationWebSocketHandler implements WebSocketHandler {
51
54
private static final String CATEGORY_BROKER_INPUT = NotificationWebSocketHandler .class .getName () + ".messages.input-broker" ;
52
55
private static final String CATEGORY_WS_OUTPUT = NotificationWebSocketHandler .class .getName () + ".messages.output-websocket" ;
53
56
static final String QUERY_STUDY_UUID = "studyUuid" ;
57
+ static final String FILTER_STUDY_UUID = QUERY_STUDY_UUID ;
54
58
static final String QUERY_UPDATE_TYPE = "updateType" ;
59
+ static final String FILTER_UPDATE_TYPE = QUERY_UPDATE_TYPE ;
55
60
static final String HEADER_USER_ID = "userId" ;
56
61
static final String HEADER_STUDY_UUID = "studyUuid" ;
57
62
static final String HEADER_UPDATE_TYPE = "updateType" ;
@@ -94,18 +99,13 @@ public Consumer<Flux<Message<NetworkImpactsInfos>>> consumeNotification() {
94
99
/**
95
100
* map from the broker flux to the filtered flux for one websocket client, extracting only relevant fields.
96
101
*/
97
- private Flux <WebSocketMessage > notificationFlux (WebSocketSession webSocketSession ,
98
- String filterStudyUuid ,
99
- String filterUpdateType ) {
100
- return flux .transform (f -> {
101
- Flux <Message <NetworkImpactsInfos >> res = f ;
102
- if (filterStudyUuid != null ) {
103
- res = res .filter (m -> filterStudyUuid .equals (m .getHeaders ().get (HEADER_STUDY_UUID )));
104
- }
105
- if (filterUpdateType != null ) {
106
- res = res .filter (m -> filterUpdateType .equals (m .getHeaders ().get (HEADER_UPDATE_TYPE )));
107
- }
108
- return res ;
102
+ private Flux <WebSocketMessage > notificationFlux (WebSocketSession webSocketSession ) {
103
+ return flux .filter (message -> {
104
+ String filterStudyUuid = (String ) webSocketSession .getAttributes ().get (FILTER_STUDY_UUID );
105
+ return filterStudyUuid == null || filterStudyUuid .equals (message .getHeaders ().get (HEADER_STUDY_UUID ));
106
+ }).filter (message -> {
107
+ String filterUpdateType = (String ) webSocketSession .getAttributes ().get (FILTER_UPDATE_TYPE );
108
+ return filterUpdateType == null || filterUpdateType .equals (message .getHeaders ().get (HEADER_UPDATE_TYPE ));
109
109
}).map (m -> {
110
110
try {
111
111
return jacksonObjectMapper .writeValueAsString (Map .of (
@@ -151,6 +151,45 @@ private Flux<WebSocketMessage> heartbeatFlux(WebSocketSession webSocketSession)
151
151
.pingMessage (dbf -> dbf .wrap ((webSocketSession .getId () + "-" + n ).getBytes (StandardCharsets .UTF_8 ))));
152
152
}
153
153
154
+ public Flux <WebSocketMessage > receive (WebSocketSession webSocketSession ) {
155
+ return webSocketSession .receive ()
156
+ .doOnNext (webSocketMessage -> {
157
+ try {
158
+ //if it's not the heartbeat
159
+ if (webSocketMessage .getType ().equals (WebSocketMessage .Type .TEXT )) {
160
+ String wsPayload = webSocketMessage .getPayloadAsText ();
161
+ LOGGER .debug ("Message received : {} by session {}" , wsPayload , webSocketSession .getId ());
162
+ Filters receivedFilters = jacksonObjectMapper .readValue (webSocketMessage .getPayloadAsText (), Filters .class );
163
+ handleReceivedFilters (webSocketSession , receivedFilters );
164
+ }
165
+ } catch (JsonProcessingException e ) {
166
+ LOGGER .error (e .toString (), e );
167
+ }
168
+ });
169
+ }
170
+
171
+ private void handleReceivedFilters (WebSocketSession webSocketSession , Filters filters ) {
172
+ if (filters .getFiltersToRemove () != null ) {
173
+ FiltersToRemove filtersToRemove = filters .getFiltersToRemove ();
174
+ if (Boolean .TRUE .equals (filtersToRemove .getRemoveUpdateType ())) {
175
+ webSocketSession .getAttributes ().remove (FILTER_UPDATE_TYPE );
176
+ }
177
+ if (Boolean .TRUE .equals (filtersToRemove .getRemoveStudyUuid ())) {
178
+ webSocketSession .getAttributes ().remove (FILTER_STUDY_UUID );
179
+ }
180
+ }
181
+ if (filters .getFiltersToAdd () != null ) {
182
+ FiltersToAdd filtersToAdd = filters .getFiltersToAdd ();
183
+ //because null is not allowed in ConcurrentHashMap and will cause the websocket to close
184
+ if (filtersToAdd .getUpdateType () != null ) {
185
+ webSocketSession .getAttributes ().put (FILTER_UPDATE_TYPE , filtersToAdd .getUpdateType ());
186
+ }
187
+ if (filtersToAdd .getStudyUuid () != null ) {
188
+ webSocketSession .getAttributes ().put (FILTER_STUDY_UUID , filtersToAdd .getStudyUuid ());
189
+ }
190
+ }
191
+ }
192
+
154
193
@ Override
155
194
public Mono <Void > handle (WebSocketSession webSocketSession ) {
156
195
var uri = webSocketSession .getHandshakeInfo ().getUri ();
@@ -159,15 +198,20 @@ public Mono<Void> handle(WebSocketSession webSocketSession) {
159
198
if (filterStudyUuid != null ) {
160
199
try {
161
200
filterStudyUuid = URLDecoder .decode (filterStudyUuid , StandardCharsets .UTF_8 .toString ());
201
+ webSocketSession .getAttributes ().put (FILTER_STUDY_UUID , filterStudyUuid );
162
202
} catch (UnsupportedEncodingException e ) {
163
203
throw new NotificationServerRuntimeException (e .getMessage ());
164
204
}
165
205
}
166
206
String filterUpdateType = parameters .getFirst (QUERY_UPDATE_TYPE );
207
+ if (filterUpdateType != null ) {
208
+ webSocketSession .getAttributes ().put (FILTER_UPDATE_TYPE , filterUpdateType );
209
+ }
210
+
167
211
LOGGER .debug ("New websocket connection for studyUuid={}, updateType={}" , filterStudyUuid , filterUpdateType );
168
212
return webSocketSession
169
- .send (notificationFlux (webSocketSession , filterStudyUuid , filterUpdateType )
213
+ .send (notificationFlux (webSocketSession )
170
214
.mergeWith (heartbeatFlux (webSocketSession )))
171
- .and (webSocketSession . receive ());
215
+ .and (receive (webSocketSession ));
172
216
}
173
217
}
0 commit comments