17
17
import com .fasterxml .jackson .core .JsonProcessingException ;
18
18
import com .fasterxml .jackson .databind .ObjectMapper ;
19
19
import com .fasterxml .jackson .databind .ObjectWriter ;
20
- import com .google .common .collect .ImmutableSet ;
21
20
22
21
import io .micrometer .core .instrument .MeterRegistry ;
23
22
import io .micrometer .core .instrument .simple .SimpleMeterRegistry ;
@@ -133,7 +132,7 @@ private void withFilters(String filterStudyUuid, String filterUpdateType, boolea
133
132
}
134
133
135
134
var notificationWebSocketHandler = new NotificationWebSocketHandler (objectMapper , meterRegistry , Integer .MAX_VALUE );
136
- var atomicRef = new AtomicReference <FluxSink <Message <NetworkImpactsInfos >>>();
135
+ var atomicRef = new AtomicReference <FluxSink <Message <String >>>();
137
136
var flux = Flux .create (atomicRef ::set );
138
137
notificationWebSocketHandler .consumeNotification ().accept (flux );
139
138
var sink = atomicRef .get ();
@@ -148,7 +147,7 @@ private void withFilters(String filterStudyUuid, String filterUpdateType, boolea
148
147
}
149
148
}
150
149
151
- List <GenericMessage <NetworkImpactsInfos >> refMessages = Stream .<Map <String , Object >>of (
150
+ List <GenericMessage <String >> refMessages = Stream .<Map <String , Object >>of (
152
151
Map .of (HEADER_STUDY_UUID , "foo" , HEADER_UPDATE_TYPE , "oof" ),
153
152
Map .of (HEADER_STUDY_UUID , "bar" , HEADER_UPDATE_TYPE , "oof" ),
154
153
Map .of (HEADER_STUDY_UUID , "baz" , HEADER_UPDATE_TYPE , "oof" ),
@@ -174,7 +173,7 @@ private void withFilters(String filterStudyUuid, String filterUpdateType, boolea
174
173
HEADER_PARENT_NODE , UUID .randomUUID ().toString (), HEADER_REMOVE_CHILDREN , true ),
175
174
176
175
Map .of (HEADER_STUDY_UUID , "" , HEADER_UPDATE_TYPE , "indexation_status_updated" , HEADER_INDEXATION_STATUS , "INDEXED" ))
177
- .map (map -> new GenericMessage <>(new NetworkImpactsInfos ( ImmutableSet . of (), ImmutableSet . of ( new EquipmentDeletionInfos ()), ImmutableSet . of ()) , map ))
176
+ .map (map -> new GenericMessage <>("" , map ))
178
177
.collect (Collectors .toList ());
179
178
180
179
@ SuppressWarnings ("unchecked" )
@@ -300,7 +299,7 @@ public void testWsReceiveFilters() throws JsonProcessingException {
300
299
when (ws2 .getAttributes ()).thenReturn (map );
301
300
302
301
var notificationWebSocketHandler = new NotificationWebSocketHandler (new ObjectMapper (), meterRegistry , 60 );
303
- var flux = Flux .<Message <NetworkImpactsInfos >>empty ();
302
+ var flux = Flux .<Message <String >>empty ();
304
303
notificationWebSocketHandler .consumeNotification ().accept (flux );
305
304
notificationWebSocketHandler .receive (ws2 ).subscribe ();
306
305
@@ -327,7 +326,7 @@ public void testWsRemoveFilters() throws JsonProcessingException {
327
326
assertEquals ("updateType" , ws2 .getAttributes ().get (FILTER_UPDATE_TYPE ));
328
327
assertEquals ("studyUuid" , ws2 .getAttributes ().get (FILTER_STUDY_UUID ));
329
328
var notificationWebSocketHandler = new NotificationWebSocketHandler (new ObjectMapper (), meterRegistry , Integer .MAX_VALUE );
330
- var flux = Flux .<Message <NetworkImpactsInfos >>empty ();
329
+ var flux = Flux .<Message <String >>empty ();
331
330
notificationWebSocketHandler .consumeNotification ().accept (flux );
332
331
notificationWebSocketHandler .receive (ws2 ).subscribe ();
333
332
@@ -348,7 +347,7 @@ public void testWsReceiveEmptyFilters() throws JsonProcessingException {
348
347
when (ws2 .getAttributes ()).thenReturn (map );
349
348
350
349
var notificationWebSocketHandler = new NotificationWebSocketHandler (new ObjectMapper (), meterRegistry , Integer .MAX_VALUE );
351
- var flux = Flux .<Message <NetworkImpactsInfos >>empty ();
350
+ var flux = Flux .<Message <String >>empty ();
352
351
notificationWebSocketHandler .consumeNotification ().accept (flux );
353
352
notificationWebSocketHandler .receive (ws2 ).subscribe ();
354
353
@@ -366,7 +365,7 @@ public void testWsReceiveUnprocessableFilter() {
366
365
when (ws2 .getAttributes ()).thenReturn (map );
367
366
368
367
var notificationWebSocketHandler = new NotificationWebSocketHandler (new ObjectMapper (), meterRegistry , 60 );
369
- var flux = Flux .<Message <NetworkImpactsInfos >>empty ();
368
+ var flux = Flux .<Message <String >>empty ();
370
369
notificationWebSocketHandler .consumeNotification ().accept (flux );
371
370
notificationWebSocketHandler .receive (ws2 ).subscribe ();
372
371
@@ -379,7 +378,7 @@ public void testHeartbeat() {
379
378
setUpUriComponentBuilder ("userId" );
380
379
381
380
var notificationWebSocketHandler = new NotificationWebSocketHandler (null , meterRegistry , 1 );
382
- var flux = Flux .<Message <NetworkImpactsInfos >>empty ();
381
+ var flux = Flux .<Message <String >>empty ();
383
382
notificationWebSocketHandler .consumeNotification ().accept (flux );
384
383
notificationWebSocketHandler .handle (ws );
385
384
@@ -394,13 +393,13 @@ public void testDiscard() {
394
393
setUpUriComponentBuilder ("userId" );
395
394
396
395
var notificationWebSocketHandler = new NotificationWebSocketHandler (objectMapper , meterRegistry , Integer .MAX_VALUE );
397
- var atomicRef = new AtomicReference <FluxSink <Message <NetworkImpactsInfos >>>();
396
+ var atomicRef = new AtomicReference <FluxSink <Message <String >>>();
398
397
var flux = Flux .create (atomicRef ::set );
399
398
notificationWebSocketHandler .consumeNotification ().accept (flux );
400
399
var sink = atomicRef .get ();
401
400
Map <String , Object > headers = Map .of (HEADER_STUDY_UUID , "foo" , HEADER_UPDATE_TYPE , "oof" );
402
401
403
- sink .next (new GenericMessage <>(new NetworkImpactsInfos ( ImmutableSet . of (), ImmutableSet . of ( new EquipmentDeletionInfos ()), ImmutableSet . of ()) , headers )); // should be discarded, no client connected
402
+ sink .next (new GenericMessage <>("" , headers )); // should be discarded, no client connected
404
403
405
404
notificationWebSocketHandler .handle (ws );
406
405
@@ -411,7 +410,7 @@ public void testDiscard() {
411
410
Disposable d1 = out1 .map (WebSocketMessage ::getPayloadAsText ).subscribe (messages1 ::add );
412
411
d1 .dispose ();
413
412
414
- sink .next (new GenericMessage <>(new NetworkImpactsInfos ( ImmutableSet . of (), ImmutableSet . of ( new EquipmentDeletionInfos ()), ImmutableSet . of ()) , headers )); // should be discarded, first client disconnected
413
+ sink .next (new GenericMessage <>("" , headers )); // should be discarded, first client disconnected
415
414
416
415
notificationWebSocketHandler .handle (ws );
417
416
0 commit comments