@@ -197,23 +197,30 @@ class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer(
197197 val handler = Flow [Message ]
198198 .watchTermination()(Keep .right)
199199 .mapMaterializedValue(handlerTermination.completeWith(_))
200- .map(m => TextMessage .Strict (s " Echo [ $m ] " ))
200+ .map(m => TextMessage .Strict (s " Echo [ ${m.asTextMessage.getStrictText} ] " ))
201201
202202 val bindingFuture = Http ().newServerAt(" localhost" , 0 ).bindSync(_.attribute(webSocketUpgrade).get.handleMessages(handler, None ))
203203 val binding = Await .result(bindingFuture, 3 .seconds.dilated)
204204 val myPort = binding.localAddress.getPort
205205
206- val ((switch, connection), completion) =
207- Source .maybe
206+ val clientMessageOut = TestPublisher .probe[Message ]()
207+ val clientMessageIn = TestSubscriber .probe[Message ]()
208+
209+ val switch =
210+ Source .fromPublisher(clientMessageOut)
208211 .viaMat {
209212 Http ().webSocketClientLayer(WebSocketRequest (" ws://localhost:" + myPort))
210213 .atop(TLSPlacebo ())
211214 .atopMat(KillSwitches .singleBidi[ByteString , ByteString ])(Keep .right)
212- .joinMat (Tcp ().outgoingConnection(new InetSocketAddress (" localhost" , myPort), halfClose = true ))( Keep .both )
215+ .join (Tcp ().outgoingConnection(new InetSocketAddress (" localhost" , myPort), halfClose = true ))
213216 }(Keep .right)
214- .toMat(Sink .ignore) (Keep .both )
217+ .toMat(Sink .fromSubscriber(clientMessageIn)) (Keep .left )
215218 .run()
216- connection.futureValue
219+
220+ // simulate message exchange to make sure handler has been installed
221+ clientMessageOut.sendNext(TextMessage (" Test" ))
222+ clientMessageIn.requestNext(TextMessage (" Echo [Test]" ))
223+
217224 switch.abort(new IllegalStateException (" Connection aborted" ))
218225
219226 // Should fail, not complete:
0 commit comments