22
33import io .rsocket .RSocketClientTest .ClientSocketRule ;
44import io .rsocket .util .EmptyPayload ;
5+ import java .nio .channels .ClosedChannelException ;
6+ import java .time .Duration ;
7+ import java .util .Arrays ;
8+ import java .util .function .Function ;
59import org .junit .Rule ;
610import org .junit .Test ;
711import org .junit .runner .RunWith ;
1115import reactor .core .publisher .Mono ;
1216import reactor .test .StepVerifier ;
1317
14- import java .nio .channels .ClosedChannelException ;
15- import java .time .Duration ;
16- import java .util .Arrays ;
17- import java .util .function .Function ;
18-
1918@ RunWith (Parameterized .class )
2019public class RSocketClientTerminationTest {
2120
22- @ Rule
23- public final ClientSocketRule rule = new ClientSocketRule ();
21+ @ Rule public final ClientSocketRule rule = new ClientSocketRule ();
2422 private Function <RSocket , ? extends Publisher <?>> interaction ;
2523
2624 public RSocketClientTerminationTest (Function <RSocket , ? extends Publisher <?>> interaction ) {
@@ -31,9 +29,7 @@ public RSocketClientTerminationTest(Function<RSocket, ? extends Publisher<?>> in
3129 public void testCurrentStreamIsTerminatedOnConnectionClose () {
3230 RSocketClient rSocket = rule .socket ;
3331
34- Mono .delay (Duration .ofSeconds (1 ))
35- .doOnNext (v -> rule .connection .dispose ())
36- .subscribe ();
32+ Mono .delay (Duration .ofSeconds (1 )).doOnNext (v -> rule .connection .dispose ()).subscribe ();
3733
3834 StepVerifier .create (interaction .apply (rSocket ))
3935 .expectError (ClosedChannelException .class )
0 commit comments