Skip to content

Commit 45625a0

Browse files
committed
move testcase to spring-boot-springmvc
1 parent 58050a5 commit 45625a0

File tree

6 files changed

+227
-126
lines changed

6 files changed

+227
-126
lines changed

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,19 @@ public Object doDecodeResponse(InputStream input, JavaType type) throws Exceptio
7979
responseEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(line.substring(6), type));
8080
}
8181
}
82+
if (isNotResponseEntity(responseEntity)) {
83+
writeIndex++;
84+
return responseEntity.getData();
85+
}
8286
return responseEntity;
8387
}
8488

89+
private boolean isNotResponseEntity(SseEventResponseEntity<?> responseEntity) {
90+
return StringUtils.isEmpty(responseEntity.getEvent())
91+
&& responseEntity.getRetry() == null
92+
&& (responseEntity.getEventId() != null && responseEntity.getEventId() == writeIndex);
93+
}
94+
8595
@Override
8696
public void refreshEventId(int index) {
8797
this.writeIndex = index;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.springboot.springmvc.client;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.servicecomb.demo.CategorizedTestCase;
25+
import org.apache.servicecomb.demo.TestMgr;
26+
import org.apache.servicecomb.demo.model.Model;
27+
import org.apache.servicecomb.springboot.springmvc.client.ThirdSvcConfiguration.ReactiveStreamClient;
28+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
29+
import org.reactivestreams.Publisher;
30+
import org.reactivestreams.Subscriber;
31+
import org.reactivestreams.Subscription;
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.beans.factory.annotation.Qualifier;
34+
import org.springframework.stereotype.Component;
35+
36+
@Component
37+
public class ReactiveStreamIT implements CategorizedTestCase {
38+
@Autowired
39+
@Qualifier("reactiveStreamProvider")
40+
ReactiveStreamClient reactiveStreamProvider;
41+
42+
@Override
43+
public void testRestTransport() throws Exception {
44+
testSseString(reactiveStreamProvider);
45+
testSseStringWithParam(reactiveStreamProvider);
46+
testSseModel(reactiveStreamProvider);
47+
testSseResponseEntity(reactiveStreamProvider);
48+
}
49+
50+
private void testSseString(ReactiveStreamClient client) throws Exception {
51+
Publisher<String> result = client.sseString();
52+
TestMgr.check("abc", buildBufferString(result));
53+
}
54+
55+
private void testSseStringWithParam(ReactiveStreamClient client) throws Exception {
56+
Publisher<String> result = client.sseStringWithParam("d");
57+
TestMgr.check("abcd", buildBufferString(result));
58+
}
59+
60+
private String buildBufferString(Publisher<String> result) throws Exception {
61+
StringBuilder buffer = new StringBuilder();
62+
CountDownLatch countDownLatch = new CountDownLatch(1);
63+
result.subscribe(new Subscriber<>() {
64+
Subscription subscription;
65+
66+
@Override
67+
public void onSubscribe(Subscription s) {
68+
subscription = s;
69+
subscription.request(1);
70+
}
71+
72+
@Override
73+
public void onNext(String s) {
74+
buffer.append(s);
75+
subscription.request(1);
76+
}
77+
78+
@Override
79+
public void onError(Throwable t) {
80+
subscription.cancel();
81+
countDownLatch.countDown();
82+
}
83+
84+
@Override
85+
public void onComplete() {
86+
countDownLatch.countDown();
87+
}
88+
});
89+
countDownLatch.await(10, TimeUnit.SECONDS);
90+
return buffer.toString();
91+
}
92+
93+
private void testSseModel(ReactiveStreamClient client) throws Exception {
94+
Publisher<Model> result = client.sseModel();
95+
CountDownLatch countDownLatch = new CountDownLatch(1);
96+
StringBuilder buffer = new StringBuilder();
97+
result.subscribe(new Subscriber<>() {
98+
Subscription subscription;
99+
100+
@Override
101+
public void onSubscribe(Subscription s) {
102+
subscription = s;
103+
subscription.request(1);
104+
}
105+
106+
@Override
107+
public void onNext(Model s) {
108+
buffer.append(s.getName()).append(s.getAge());
109+
subscription.request(1);
110+
}
111+
112+
@Override
113+
public void onError(Throwable t) {
114+
subscription.cancel();
115+
countDownLatch.countDown();
116+
}
117+
118+
@Override
119+
public void onComplete() {
120+
countDownLatch.countDown();
121+
}
122+
});
123+
countDownLatch.await(10, TimeUnit.SECONDS);
124+
TestMgr.check("jack0jack1jack2jack3jack4", buffer.toString());
125+
}
126+
127+
private void testSseResponseEntity(ReactiveStreamClient client) throws Exception {
128+
Publisher<SseEventResponseEntity<Model>> result = client.sseResponseEntity();
129+
CountDownLatch countDownLatch = new CountDownLatch(1);
130+
StringBuilder buffer = new StringBuilder();
131+
result.subscribe(new Subscriber<>() {
132+
Subscription subscription;
133+
134+
@Override
135+
public void onSubscribe(Subscription s) {
136+
subscription = s;
137+
subscription.request(1);
138+
}
139+
140+
@Override
141+
public void onNext(SseEventResponseEntity<Model> responseEntity) {
142+
if (!StringUtils.isEmpty(responseEntity.getEvent())) {
143+
buffer.append(responseEntity.getEvent());
144+
}
145+
buffer.append(((Model) responseEntity.getData()).getName())
146+
.append(((Model) responseEntity.getData()).getAge());
147+
subscription.request(1);
148+
}
149+
150+
@Override
151+
public void onError(Throwable t) {
152+
subscription.cancel();
153+
countDownLatch.countDown();
154+
}
155+
156+
@Override
157+
public void onComplete() {
158+
countDownLatch.countDown();
159+
}
160+
});
161+
countDownLatch.await(10, TimeUnit.SECONDS);
162+
TestMgr.check("test0jack0test1jack1test2jack2", buffer.toString());
163+
}
164+
}

demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/TestReactiveStream.java

Lines changed: 0 additions & 118 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.springboot.springmvc.client;
19+
20+
import org.apache.servicecomb.demo.model.Model;
21+
import org.apache.servicecomb.provider.pojo.Invoker;
22+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
23+
import org.reactivestreams.Publisher;
24+
import org.springframework.context.annotation.Bean;
25+
import org.springframework.context.annotation.Configuration;
26+
import org.springframework.web.bind.annotation.GetMapping;
27+
import org.springframework.web.bind.annotation.RequestMapping;
28+
29+
@Configuration
30+
public class ThirdSvcConfiguration {
31+
@RequestMapping(path = "/")
32+
public interface ReactiveStreamClient {
33+
@GetMapping("/sseString")
34+
Publisher<String> sseString();
35+
36+
@GetMapping("/sseStringWithParam")
37+
Publisher<String> sseStringWithParam(String param);
38+
39+
@GetMapping("/sseModel")
40+
Publisher<Model> sseModel();
41+
42+
@GetMapping("/sseResponseEntity")
43+
Publisher<SseEventResponseEntity<Model>> sseResponseEntity();
44+
}
45+
46+
@Bean("reactiveStreamProvider")
47+
public ReactiveStreamClient reactiveStreamProvider() {
48+
return Invoker.createProxy("springmvc", "ReactiveStreamController", ReactiveStreamClient.class);
49+
}
50+
}

demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ReactiveStreamController.java renamed to demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.servicecomb.demo.springmvc.server;
18-
17+
package org.apache.servicecomb.springboot.springmvc.server;
1918

2019
import java.util.concurrent.TimeUnit;
2120
import java.util.concurrent.atomic.AtomicInteger;
2221

23-
import org.apache.servicecomb.core.annotation.Transport;
2422
import org.apache.servicecomb.demo.model.Model;
2523
import org.apache.servicecomb.provider.rest.common.RestSchema;
2624
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
@@ -35,26 +33,22 @@
3533
@RequestMapping(path = "/")
3634
public class ReactiveStreamController {
3735
@GetMapping("/sseString")
38-
@Transport(name = "rest")
3936
public Publisher<String> sseString() {
4037
return Flowable.fromArray("a", "b", "c");
4138
}
4239

4340
@GetMapping("/sseStringWithParam")
44-
@Transport(name = "rest")
4541
public Publisher<String> sseStringWithParam(@RequestParam(name = "name") String name) {
4642
return Flowable.fromArray("a", "b", "c", name);
4743
};
4844

4945
@GetMapping("/sseModel")
50-
@Transport(name = "rest")
5146
public Publisher<Model> sseModel() {
5247
return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
5348
.map(item -> new Model("jack", item.intValue()));
5449
}
5550

5651
@GetMapping("/sseResponseEntity")
57-
@Transport(name = "rest")
5852
public Publisher<SseEventResponseEntity<Model>> sseResponseEntity() {
5953
AtomicInteger index = new AtomicInteger(0);
6054
return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)

0 commit comments

Comments
 (0)