2929import org .reactivestreams .Publisher ;
3030import org .reactivestreams .Subscriber ;
3131import org .reactivestreams .Subscription ;
32+ import org .slf4j .Logger ;
33+ import org .slf4j .LoggerFactory ;
3234import org .springframework .beans .factory .annotation .Autowired ;
3335import org .springframework .beans .factory .annotation .Qualifier ;
3436import org .springframework .stereotype .Component ;
3537
3638@ Component
3739public class ReactiveStreamIT implements CategorizedTestCase {
40+ private static final Logger LOGGER = LoggerFactory .getLogger (ReactiveStreamIT .class );
41+
3842 @ Autowired
3943 @ Qualifier ("reactiveStreamProvider" )
4044 ReactiveStreamClient reactiveStreamProvider ;
@@ -59,7 +63,6 @@ private void testSseStringWithParam(ReactiveStreamClient client) throws Exceptio
5963
6064 private String buildBufferString (Publisher <String > result ) throws Exception {
6165 CountDownLatch countDownLatch = new CountDownLatch (1 );
62- countDownLatch .await (20 , TimeUnit .SECONDS );
6366 StringBuilder buffer = new StringBuilder ();
6467 result .subscribe (new Subscriber <>() {
6568 Subscription subscription ;
@@ -72,18 +75,21 @@ public void onSubscribe(Subscription s) {
7275
7376 @ Override
7477 public void onNext (String s ) {
78+ LOGGER .info ("=========buildBufferString result===================>" + s );
7579 buffer .append (s );
7680 subscription .request (1 );
7781 }
7882
7983 @ Override
8084 public void onError (Throwable t ) {
85+ LOGGER .info ("=========buildBufferString onError===================>" );
8186 subscription .cancel ();
8287 countDownLatch .countDown ();
8388 }
8489
8590 @ Override
8691 public void onComplete () {
92+ LOGGER .info ("=========buildBufferString onComplete===================>" );
8793 countDownLatch .countDown ();
8894 }
8995 });
@@ -94,7 +100,6 @@ public void onComplete() {
94100 private void testSseModel (ReactiveStreamClient client ) throws Exception {
95101 Publisher <Model > result = client .sseModel ();
96102 CountDownLatch countDownLatch = new CountDownLatch (1 );
97- countDownLatch .await (20 , TimeUnit .SECONDS );
98103 StringBuilder buffer = new StringBuilder ();
99104 result .subscribe (new Subscriber <>() {
100105 Subscription subscription ;
@@ -108,17 +113,20 @@ public void onSubscribe(Subscription s) {
108113 @ Override
109114 public void onNext (Model s ) {
110115 buffer .append (s .getName ()).append (s .getAge ());
116+ LOGGER .info ("=========testSseModel result===================>" + buffer );
111117 subscription .request (1 );
112118 }
113119
114120 @ Override
115121 public void onError (Throwable t ) {
122+ LOGGER .info ("=========testSseModel error===================>" );
116123 subscription .cancel ();
117124 countDownLatch .countDown ();
118125 }
119126
120127 @ Override
121128 public void onComplete () {
129+ LOGGER .info ("=========testSseModel onComplete===================>" );
122130 countDownLatch .countDown ();
123131 }
124132 });
@@ -129,7 +137,6 @@ public void onComplete() {
129137 private void testSseResponseEntity (ReactiveStreamClient client ) throws Exception {
130138 Publisher <SseEventResponseEntity <Model >> result = client .sseResponseEntity ();
131139 CountDownLatch countDownLatch = new CountDownLatch (1 );
132- countDownLatch .await (20 , TimeUnit .SECONDS );
133140 StringBuilder buffer = new StringBuilder ();
134141 result .subscribe (new Subscriber <>() {
135142 Subscription subscription ;
@@ -147,17 +154,20 @@ public void onNext(SseEventResponseEntity<Model> responseEntity) {
147154 }
148155 buffer .append (((Model ) responseEntity .getData ()).getName ())
149156 .append (((Model ) responseEntity .getData ()).getAge ());
157+ LOGGER .info ("=========testSseResponseEntity result===================>" + buffer );
150158 subscription .request (1 );
151159 }
152160
153161 @ Override
154162 public void onError (Throwable t ) {
163+ LOGGER .info ("=========testSseResponseEntity error===================>" );
155164 subscription .cancel ();
156165 countDownLatch .countDown ();
157166 }
158167
159168 @ Override
160169 public void onComplete () {
170+ LOGGER .info ("=========testSseResponseEntity onComplete===================>" );
161171 countDownLatch .countDown ();
162172 }
163173 });
0 commit comments