@@ -52,7 +52,6 @@ public class NotificationWebSocketHandler implements WebSocketHandler {
52
52
static final String QUERY_UPDATE_TYPE = "updateType" ;
53
53
static final String HEADER_USER_ID = "userId" ;
54
54
static final String HEADER_STUDY_UUID = "studyUuid" ;
55
- static final String HEADER_IS_PUBLIC_STUDY = "isPublicStudy" ;
56
55
static final String HEADER_UPDATE_TYPE = "updateType" ;
57
56
static final String HEADER_TIMESTAMP = "timestamp" ;
58
57
static final String HEADER_ERROR = "error" ;
@@ -95,14 +94,10 @@ public Consumer<Flux<Message<String>>> consumeNotification() {
95
94
* map from the broker flux to the filtered flux for one websocket client, extracting only relevant fields.
96
95
*/
97
96
private Flux <WebSocketMessage > notificationFlux (WebSocketSession webSocketSession ,
98
- String userId ,
99
97
String filterStudyUuid ,
100
98
String filterUpdateType ) {
101
99
return flux .transform (f -> {
102
100
Flux <Message <String >> res = f ;
103
- if (userId != null ) {
104
- res = res .filter (m -> m .getHeaders ().get (HEADER_ERROR ) == null || userId .equals (m .getHeaders ().get (HEADER_USER_ID )));
105
- }
106
101
if (filterStudyUuid != null ) {
107
102
res = res .filter (m -> filterStudyUuid .equals (m .getHeaders ().get (HEADER_STUDY_UUID )));
108
103
}
@@ -138,6 +133,7 @@ private static Map<String, Object> toResultHeader(Map<String, Object> messageHea
138
133
passHeader (messageHeader , resHeader , HEADER_NODES );
139
134
passHeader (messageHeader , resHeader , HEADER_NEW_NODE );
140
135
passHeader (messageHeader , resHeader , HEADER_MOVED_NODE );
136
+ passHeader (messageHeader , resHeader , HEADER_USER_ID ); // to filter the display of error messages in the front end
141
137
142
138
return resHeader ;
143
139
}
@@ -159,7 +155,6 @@ private Flux<WebSocketMessage> heartbeatFlux(WebSocketSession webSocketSession)
159
155
@ Override
160
156
public Mono <Void > handle (WebSocketSession webSocketSession ) {
161
157
var uri = webSocketSession .getHandshakeInfo ().getUri ();
162
- String userId = webSocketSession .getHandshakeInfo ().getHeaders ().getFirst (HEADER_USER_ID );
163
158
MultiValueMap <String , String > parameters = UriComponentsBuilder .fromUri (uri ).build (true ).getQueryParams ();
164
159
String filterStudyUuid = parameters .getFirst (QUERY_STUDY_UUID );
165
160
if (filterStudyUuid != null ) {
@@ -172,7 +167,7 @@ public Mono<Void> handle(WebSocketSession webSocketSession) {
172
167
String filterUpdateType = parameters .getFirst (QUERY_UPDATE_TYPE );
173
168
LOGGER .debug ("New websocket connection for studyUuid={}, updateType={}" , filterStudyUuid , filterUpdateType );
174
169
return webSocketSession
175
- .send (notificationFlux (webSocketSession , userId , filterStudyUuid , filterUpdateType )
170
+ .send (notificationFlux (webSocketSession , filterStudyUuid , filterUpdateType )
176
171
.mergeWith (heartbeatFlux (webSocketSession )))
177
172
.and (webSocketSession .receive ());
178
173
}
0 commit comments