|
21 | 21 | import java.util.concurrent.TimeUnit; |
22 | 22 |
|
23 | 23 | import org.apache.commons.lang3.StringUtils; |
| 24 | +import org.apache.servicecomb.provider.springmvc.reference.RestTemplateBuilder; |
24 | 25 | import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; |
25 | 26 | import org.apache.servicecomb.demo.CategorizedTestCase; |
26 | 27 | import org.apache.servicecomb.demo.TestMgr; |
|
31 | 32 | import org.reactivestreams.Subscription; |
32 | 33 | import org.springframework.beans.factory.annotation.Autowired; |
33 | 34 | import org.springframework.beans.factory.annotation.Qualifier; |
| 35 | +import org.springframework.http.HttpEntity; |
| 36 | +import org.springframework.http.HttpHeaders; |
| 37 | +import org.springframework.http.HttpMethod; |
| 38 | +import org.springframework.http.ResponseEntity; |
34 | 39 | import org.springframework.stereotype.Component; |
| 40 | +import org.springframework.web.client.RestTemplate; |
| 41 | + |
| 42 | +import jakarta.ws.rs.core.MediaType; |
35 | 43 |
|
36 | 44 | @Component |
37 | 45 | public class ReactiveStreamIT implements CategorizedTestCase { |
38 | 46 | @Autowired |
39 | 47 | @Qualifier("reactiveStreamProvider") |
40 | 48 | ReactiveStreamClient reactiveStreamProvider; |
41 | 49 |
|
| 50 | + private RestTemplate restTemplate = RestTemplateBuilder.create(); |
| 51 | + |
42 | 52 | @Override |
43 | 53 | public void testRestTransport() throws Exception { |
44 | 54 | testSseString(reactiveStreamProvider); |
45 | 55 | testSseStringWithParam(reactiveStreamProvider); |
46 | 56 | testSseModel(reactiveStreamProvider); |
47 | 57 | testSseResponseEntity(reactiveStreamProvider); |
48 | 58 | testSseMultipleData(reactiveStreamProvider); |
| 59 | + sseStringWithAccept(); |
49 | 60 | } |
50 | 61 |
|
51 | 62 | private void testSseString(ReactiveStreamClient client) throws Exception { |
@@ -91,6 +102,48 @@ public void onComplete() { |
91 | 102 | return buffer.toString(); |
92 | 103 | } |
93 | 104 |
|
| 105 | + private void sseStringWithAccept() throws Exception { |
| 106 | + HttpHeaders headers = new HttpHeaders(); |
| 107 | + headers.add(HttpHeaders.ACCEPT, MediaType.SERVER_SENT_EVENTS); |
| 108 | + HttpEntity<?> requestEntity = new HttpEntity<>(headers); |
| 109 | + ResponseEntity<Publisher> responseEntity = restTemplate |
| 110 | + .exchange("cse://springmvc/sseString", HttpMethod.GET, requestEntity, Publisher.class); |
| 111 | + Publisher result = responseEntity.getBody(); |
| 112 | + CountDownLatch countDownLatch = new CountDownLatch(1); |
| 113 | + StringBuilder buffer = new StringBuilder(); |
| 114 | + result.subscribe(new Subscriber<>() { |
| 115 | + Subscription subscription; |
| 116 | + |
| 117 | + @Override |
| 118 | + public void onSubscribe(Subscription s) { |
| 119 | + subscription = s; |
| 120 | + subscription.request(1); |
| 121 | + } |
| 122 | + |
| 123 | + @Override |
| 124 | + public void onNext(Object entity) { |
| 125 | + SseEventResponseEntity<String> response = (SseEventResponseEntity<String>) entity; |
| 126 | + for (String str : response.getData()) { |
| 127 | + buffer.append(str); |
| 128 | + } |
| 129 | + subscription.request(1); |
| 130 | + } |
| 131 | + |
| 132 | + @Override |
| 133 | + public void onError(Throwable t) { |
| 134 | + subscription.cancel(); |
| 135 | + countDownLatch.countDown(); |
| 136 | + } |
| 137 | + |
| 138 | + @Override |
| 139 | + public void onComplete() { |
| 140 | + countDownLatch.countDown(); |
| 141 | + } |
| 142 | + }); |
| 143 | + countDownLatch.await(10, TimeUnit.SECONDS); |
| 144 | + TestMgr.check("abc", buffer.toString()); |
| 145 | + } |
| 146 | + |
94 | 147 | private void testSseModel(ReactiveStreamClient client) throws Exception { |
95 | 148 | Publisher<Model> result = client.sseModel(); |
96 | 149 | CountDownLatch countDownLatch = new CountDownLatch(1); |
|
0 commit comments