11package tech .ydb .coordination ;
22
3+ import org .jetbrains .annotations .NotNull ;
34import org .junit .Assert ;
45import org .junit .Before ;
56import org .mockito .Mockito ;
67import org .mockito .invocation .InvocationOnMock ;
78import org .mockito .stubbing .Answer ;
8- import org .mockito .stubbing .OngoingStubbing ;
99import org .slf4j .Logger ;
1010import org .slf4j .LoggerFactory ;
1111
2121import java .util .ArrayList ;
2222import java .util .List ;
2323import java .util .Queue ;
24- import java .util .concurrent .CompletableFuture ;
25- import java .util .concurrent .ConcurrentLinkedQueue ;
26- import java .util .concurrent .ScheduledExecutorService ;
27- import java .util .concurrent .ScheduledFuture ;
24+ import java .util .concurrent .*;
2825
2926public class CoordinationSessionBaseMockedTest {
3027 private static final Logger logger = LoggerFactory .getLogger (CoordinationSessionBaseMockedTest .class );
3128
3229 private final ScheduledExecutorService scheduler = Mockito .mock (ScheduledExecutorService .class );
3330 private final GrpcTransport transport = Mockito .mock (GrpcTransport .class );
3431 private final ScheduledFuture <?> emptyFuture = Mockito .mock (ScheduledFuture .class );
35- private final GrpcReadWriteStream <SessionResponse , SessionRequest > writeStream =
36- Mockito .mock (GrpcReadWriteStream .class );
3732 private final SchedulerAssert schedulerHelper = new SchedulerAssert ();
3833
3934 protected final CoordinationClient client = CoordinationClient .newClient (transport );
4035
41- private volatile MockedWriteStream streamMock = null ;
42-
4336 @ Before
4437 public void beforeEach () {
4538 Mockito .when (transport .getScheduler ()).thenReturn (scheduler );
46- Mockito .when (transport .readWriteStreamCall (Mockito .eq (CoordinationServiceGrpc .getSessionMethod ()), Mockito .any ()))
47- .thenReturn (writeStream ); // create mocked stream
48-
49- // Every writeStream.start updates mockedWriteStream
50- Mockito .when (writeStream .start (Mockito .any ()))
51- .thenAnswer (defaultStreamMockAnswer ());
52-
53- // Every writeStream.sendNext add message from client to mockedWriteStream.sent list
54- Mockito .doAnswer ((Answer <Void >) (InvocationOnMock iom ) -> {
55- streamMock .sent .add (iom .getArgument (0 , SessionRequest .class ));
56- return null ;
57- }).when (writeStream ).sendNext (Mockito .any ());
5839
5940 Mockito .when (scheduler .schedule (Mockito .any (Runnable .class ), Mockito .anyLong (), Mockito .any ()))
6041 .thenAnswer ((InvocationOnMock iom ) -> {
@@ -64,35 +45,41 @@ public void beforeEach() {
6445 });
6546 }
6647
67- protected MockedWriteStream currentStream () {
68- return streamMock ;
69- }
70-
7148 protected SchedulerAssert getScheduler () {
7249 return schedulerHelper ;
7350 }
7451
75- protected OngoingStubbing <CompletableFuture <Status >> mockStreams () {
76- return Mockito .when (writeStream .start (Mockito .any ()));
77- }
52+ protected StreamMock mockStream () {
53+ StreamMock streamMock = new StreamMock ();
7854
79- protected Answer <CompletableFuture <Status >> errorStreamMockAnswer (StatusCode code ) {
80- return (iom ) -> {
81- streamMock = null ;
82- return CompletableFuture .completedFuture (Status .of (code ));
83- };
84- }
55+ GrpcReadWriteStream <SessionResponse , SessionRequest > readWriteStream = Mockito .mock (GrpcReadWriteStream .class );
56+
57+ Mockito .when (readWriteStream .start (Mockito .any ())).thenAnswer (
58+ (InvocationOnMock iom ) -> {
59+ streamMock .setObserver (iom .getArgument (0 ));
60+ return streamMock .streamFuture ;
61+ }
62+ ).thenThrow (new RuntimeException ("Unexpected second start call" ));
8563
86- protected Answer <CompletableFuture <Status >> defaultStreamMockAnswer () {
87- return (InvocationOnMock iom ) -> {
88- streamMock = new MockedWriteStream (iom .getArgument (0 ));
89- return streamMock .streamFuture ;
90- };
64+ Mockito .doAnswer ((Answer <Void >) (InvocationOnMock iom ) -> {
65+ streamMock .sent .add (iom .getArgument (0 , SessionRequest .class ));
66+ return null ;
67+ }).when (readWriteStream ).sendNext (Mockito .any ());
68+
69+ Mockito .when (transport .readWriteStreamCall (Mockito .eq (CoordinationServiceGrpc .getSessionMethod ()), Mockito .any ()))
70+ .thenReturn (readWriteStream );
71+ return streamMock ;
9172 }
9273
93- protected static class SchedulerAssert {
74+ protected static class SchedulerAssert implements Executor {
9475 private final Queue <Runnable > tasks = new ConcurrentLinkedQueue <>();
9576
77+ @ Override
78+ public void execute (@ NotNull Runnable command ) {
79+ logger .debug ("scheduling command: " + command );
80+ tasks .add (command );
81+ }
82+
9683 public SchedulerAssert hasNoTasks () {
9784 Assert .assertTrue (tasks .isEmpty ());
9885 return this ;
@@ -116,16 +103,25 @@ public SchedulerAssert executeNextTasks(int count) {
116103 }
117104 }
118105
119- protected class MockedWriteStream {
120- private final GrpcReadWriteStream .Observer <SessionResponse > observer ;
121- private final CompletableFuture <Status > streamFuture = new CompletableFuture <>();
106+ protected class StreamMock {
107+ private final CompletableFuture <Status > streamFuture ;
122108 private final List <SessionRequest > sent = new ArrayList <>();
123109 private volatile int sentIdx = 0 ;
124110
125- public MockedWriteStream (GrpcReadWriteStream .Observer <SessionResponse > observer ) {
111+ private volatile GrpcReadWriteStream .Observer <SessionResponse > observer = null ;
112+
113+ public StreamMock () {
114+ streamFuture = new CompletableFuture <>();
115+ }
116+
117+ public void setObserver (GrpcReadWriteStream .Observer <SessionResponse > observer ) {
126118 this .observer = observer ;
127119 }
128120
121+ public void complete (StatusCode statusCode ) {
122+ streamFuture .complete (Status .of (statusCode ));
123+ }
124+
129125 public void complete (Status status ) {
130126 streamFuture .complete (status );
131127 }
@@ -151,6 +147,34 @@ public void responseSemaphoreAlreadyExists() {
151147 )
152148 )
153149 .build ();
150+ response (msg );
151+ }
152+
153+ public void responseSessionStarted (long sessionId ) {
154+ SessionResponse msg = SessionResponse .newBuilder ()
155+ .setSessionStarted (
156+ SessionResponse .SessionStarted .newBuilder ()
157+ .setSessionId (sessionId )
158+ .build ()
159+ )
160+ .build ();
161+ response (msg );
162+ }
163+
164+ public void responseAcquiredSuccessfully (long requestId ) {
165+ SessionResponse msg = SessionResponse .newBuilder ()
166+ .setAcquireSemaphoreResult (
167+ SessionResponse .AcquireSemaphoreResult .newBuilder ()
168+ .setReqId (requestId )
169+ .setAcquired (true )
170+ .setStatus (StatusCodesProtos .StatusIds .StatusCode .SUCCESS )
171+ )
172+ .build ();
173+ response (msg );
174+ }
175+
176+ private void response (SessionResponse msg ) {
177+ Assert .assertNotNull (observer );
154178 observer .onNext (msg );
155179 }
156180
@@ -163,9 +187,33 @@ public Checker(SessionRequest msg) {
163187 this .msg = msg ;
164188 }
165189
190+ public SessionRequest get () {
191+ return msg ;
192+ }
193+
166194 public Checker isAcquireSemaphore () {
167195 Assert .assertTrue ("next msg must be acquire semaphore" , msg .hasAcquireSemaphore ());
168196 return this ;
169197 }
198+
199+ public Checker isEphemeralSemaphore () {
200+ Assert .assertTrue ("next msg must be acquire ephemeral semaphore" , msg .getAcquireSemaphore ().getEphemeral ());
201+ return this ;
202+ }
203+
204+ public Checker hasSemaphoreName (String semaphoreName ) {
205+ Assert .assertEquals ("invalid semaphore name" , semaphoreName , msg .getAcquireSemaphore ().getName ());
206+ return this ;
207+ }
208+
209+ public Checker isSessionStart () {
210+ Assert .assertTrue ("next msg must be session start" , msg .hasSessionStart ());
211+ return this ;
212+ }
213+
214+ public Checker hasPath (String coordinationNodePath ) {
215+ Assert .assertEquals ("invalid coordination node path" , coordinationNodePath , msg .getSessionStart ().getPath ());
216+ return this ;
217+ }
170218 }
171219}
0 commit comments