Skip to content

Commit 500e8b3

Browse files
committed
add testcase
1 parent 517480b commit 500e8b3

File tree

7 files changed

+340
-5
lines changed

7 files changed

+340
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.springboot.springmvc.client;
19+
20+
import org.apache.servicecomb.provider.pojo.Invoker;
21+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
22+
import org.reactivestreams.Publisher;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.web.bind.annotation.GetMapping;
26+
import org.springframework.web.bind.annotation.RequestMapping;
27+
import org.springframework.web.bind.annotation.RequestParam;
28+
29+
@Configuration
30+
public class ReactiveStreamConfiguration {
31+
@RequestMapping(path = "/")
32+
public interface ReactiveStreamClient {
33+
class Model {
34+
private String name;
35+
36+
private int age;
37+
38+
public Model() {
39+
40+
}
41+
42+
public Model(String name, int age) {
43+
this.name = name;
44+
this.age = age;
45+
}
46+
47+
public int getAge() {
48+
return age;
49+
}
50+
51+
public Model setAge(int age) {
52+
this.age = age;
53+
return this;
54+
}
55+
56+
public String getName() {
57+
return name;
58+
}
59+
60+
public Model setName(String name) {
61+
this.name = name;
62+
return this;
63+
}
64+
}
65+
66+
@GetMapping("/sseString")
67+
Publisher<String> sseString();
68+
69+
@GetMapping("/sseStringWithParam")
70+
Publisher<String> sseStringWithParam(@RequestParam(name = "name") String name);
71+
72+
@GetMapping("/sseModel")
73+
Publisher<Model> sseModel();
74+
75+
@GetMapping("/sseResponseEntity")
76+
Publisher<SseEventResponseEntity<Model>> sseResponseEntity();
77+
}
78+
79+
@Bean("reactiveStreamProvider")
80+
public ReactiveStreamClient reactiveStreamProvider() {
81+
return Invoker.createProxy("springmvc", "ReactiveStreamController", ReactiveStreamClient.class);
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.springboot.springmvc.client;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.apache.servicecomb.demo.CategorizedTestCase;
24+
import org.apache.servicecomb.demo.TestMgr;
25+
import org.apache.servicecomb.springboot.springmvc.client.ReactiveStreamConfiguration.ReactiveStreamClient;
26+
import org.apache.servicecomb.springboot.springmvc.client.ReactiveStreamConfiguration.ReactiveStreamClient.Model;
27+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
28+
import org.reactivestreams.Publisher;
29+
import org.reactivestreams.Subscriber;
30+
import org.reactivestreams.Subscription;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.beans.factory.annotation.Qualifier;
33+
import org.springframework.stereotype.Component;
34+
35+
@Component
36+
public class ReactiveStreamIT implements CategorizedTestCase {
37+
@Autowired
38+
@Qualifier("reactiveStreamProvider")
39+
ReactiveStreamClient reactiveStreamProvider;
40+
41+
@Override
42+
public void testRestTransport() throws Exception {
43+
testSseString(reactiveStreamProvider);
44+
testSseStringWithParam(reactiveStreamProvider);
45+
testSseModel(reactiveStreamProvider);
46+
testSseResponseEntity(reactiveStreamProvider);
47+
}
48+
49+
private void testSseResponseEntity(ReactiveStreamClient client) throws Exception {
50+
Publisher<SseEventResponseEntity<Model>> result = client.sseResponseEntity();
51+
StringBuilder buffer = new StringBuilder();
52+
CountDownLatch countDownLatch = new CountDownLatch(1);
53+
result.subscribe(new Subscriber<>() {
54+
Subscription subscription;
55+
56+
@Override
57+
public void onSubscribe(Subscription s) {
58+
subscription = s;
59+
subscription.request(1);
60+
}
61+
62+
@Override
63+
public void onNext(SseEventResponseEntity<Model> s) {
64+
buffer.append(s.getEvent())
65+
.append(((Model) s.getData()).getName())
66+
.append(((Model) s.getData()).getAge());
67+
subscription.request(1);
68+
}
69+
70+
@Override
71+
public void onError(Throwable t) {
72+
subscription.cancel();
73+
countDownLatch.countDown();
74+
}
75+
76+
@Override
77+
public void onComplete() {
78+
countDownLatch.countDown();
79+
}
80+
});
81+
countDownLatch.await(10, TimeUnit.SECONDS);
82+
TestMgr.check("test0jack0test1jack1test2jack2", buffer.toString());
83+
}
84+
85+
private void testSseModel(ReactiveStreamClient client) throws Exception {
86+
Publisher<Model> result = client.sseModel();
87+
StringBuilder buffer = new StringBuilder();
88+
CountDownLatch countDownLatch = new CountDownLatch(1);
89+
result.subscribe(new Subscriber<>() {
90+
Subscription subscription;
91+
92+
@Override
93+
public void onSubscribe(Subscription s) {
94+
subscription = s;
95+
subscription.request(1);
96+
}
97+
98+
@Override
99+
public void onNext(Model s) {
100+
buffer.append(s.getName()).append(s.getAge());
101+
subscription.request(1);
102+
}
103+
104+
@Override
105+
public void onError(Throwable t) {
106+
subscription.cancel();
107+
countDownLatch.countDown();
108+
}
109+
110+
@Override
111+
public void onComplete() {
112+
countDownLatch.countDown();
113+
}
114+
});
115+
countDownLatch.await(10, TimeUnit.SECONDS);
116+
TestMgr.check("jack0jack1jack2jack3jack4", buffer.toString());
117+
}
118+
119+
private void testSseString(ReactiveStreamClient client) throws Exception {
120+
Publisher<String> result = client.sseString();
121+
TestMgr.check("abc", buildStringBuffer(result));
122+
}
123+
124+
private void testSseStringWithParam(ReactiveStreamClient client) throws Exception {
125+
Publisher<String> result = client.sseStringWithParam("d");
126+
TestMgr.check("abcd", buildStringBuffer(result));
127+
}
128+
129+
private String buildStringBuffer(Publisher<String> result) throws Exception {
130+
StringBuilder buffer = new StringBuilder();
131+
CountDownLatch countDownLatch = new CountDownLatch(1);
132+
result.subscribe(new Subscriber<>() {
133+
Subscription subscription;
134+
135+
@Override
136+
public void onSubscribe(Subscription s) {
137+
subscription = s;
138+
subscription.request(1);
139+
}
140+
141+
@Override
142+
public void onNext(String s) {
143+
buffer.append(s);
144+
subscription.request(1);
145+
}
146+
147+
@Override
148+
public void onError(Throwable t) {
149+
subscription.cancel();
150+
countDownLatch.countDown();
151+
}
152+
153+
@Override
154+
public void onComplete() {
155+
countDownLatch.countDown();
156+
}
157+
});
158+
countDownLatch.await(10, TimeUnit.SECONDS);
159+
return buffer.toString();
160+
}
161+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.servicecomb.springboot.springmvc.server;
18+
19+
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import org.apache.servicecomb.provider.rest.common.RestSchema;
24+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
25+
import org.reactivestreams.Publisher;
26+
import org.springframework.web.bind.annotation.GetMapping;
27+
import org.springframework.web.bind.annotation.RequestMapping;
28+
import org.springframework.web.bind.annotation.RequestParam;
29+
30+
import io.reactivex.rxjava3.core.Flowable;
31+
32+
@RestSchema(schemaId = "ReactiveStreamController")
33+
@RequestMapping(path = "/")
34+
public class ReactiveStreamController {
35+
public static class Model {
36+
private String name;
37+
38+
private int age;
39+
40+
public Model() {
41+
42+
}
43+
44+
public Model(String name, int age) {
45+
this.name = name;
46+
this.age = age;
47+
}
48+
49+
public int getAge() {
50+
return age;
51+
}
52+
53+
public Model setAge(int age) {
54+
this.age = age;
55+
return this;
56+
}
57+
58+
public String getName() {
59+
return name;
60+
}
61+
62+
public Model setName(String name) {
63+
this.name = name;
64+
return this;
65+
}
66+
}
67+
68+
@GetMapping("/sseString")
69+
public Publisher<String> sseString() {
70+
return Flowable.fromArray("a", "b", "c");
71+
}
72+
73+
@GetMapping("/sseStringWithParam")
74+
public Publisher<String> sseStringWithParam(@RequestParam(name = "name") String name) {
75+
return Flowable.fromArray("a", "b", "c", name);
76+
};
77+
78+
@GetMapping("/sseModel")
79+
public Publisher<Model> sseModel() {
80+
return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
81+
.map(item -> new Model("jack", item.intValue()));
82+
}
83+
84+
@GetMapping("/sseResponseEntity")
85+
public Publisher<SseEventResponseEntity<Model>> sseResponseEntity() {
86+
AtomicInteger index = new AtomicInteger(0);
87+
return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)
88+
.map(item -> new SseEventResponseEntity<Model>()
89+
.event("test" + index)
90+
.eventId(index.getAndIncrement())
91+
.retry(System.currentTimeMillis())
92+
.data(new Model("jack", item.intValue())));
93+
};
94+
}

swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
package org.apache.servicecomb.swagger.generator.core.processor.response;
1919

2020
import java.lang.reflect.Type;
21-
import java.util.Arrays;
2221
import java.util.List;
2322

2423
import org.apache.servicecomb.swagger.generator.OperationGenerator;
25-
import org.apache.servicecomb.swagger.generator.SwaggerConst;
2624
import org.apache.servicecomb.swagger.generator.SwaggerGenerator;
2725
import org.apache.servicecomb.swagger.jakarta.ModelConvertersAdapterJakarta;
2826
import org.reactivestreams.Publisher;

swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/sse/SseEventResponseEntity.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class SseEventResponseEntity<T> {
3939
* business data
4040
*/
4141
@NotNull
42-
private Object data;
42+
private T data;
4343

4444
public SseEventResponseEntity<T> eventId(int eventId) {
4545
this.eventId = eventId;

swagger/swagger-generator/generator-springmvc/src/main/java/org/apache/servicecomb/swagger/generator/springmvc/processor/response/SseEventResponseEntityProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.servicecomb.swagger.generator.core.processor.response.DefaultResponseTypeProcessor;
2020
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
21-
import org.springframework.http.ResponseEntity;
2221

2322
public class SseEventResponseEntityProcessor extends DefaultResponseTypeProcessor {
2423
public SseEventResponseEntityProcessor() {

transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ protected Response extractResponse(Invocation invocation, HttpServletResponseEx
108108
try {
109109
if (responseEx.getAttribute(Const.FLOWABLE_CLIENT_RESPONSE) == null) {
110110
result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(), responseType);
111-
}else {
111+
} else {
112112
Flowable<Buffer> flowable = (Flowable<Buffer>) responseEx.getAttribute(Const.FLOWABLE_CLIENT_RESPONSE);
113113
ProduceProcessor finalProduceProcessor = produceProcessor;
114114
result = flowable.map(buffer -> extractFlowableBody(finalProduceProcessor, responseType, buffer));

0 commit comments

Comments
 (0)