Skip to content

Commit af92830

Browse files
committed
for test
1 parent 45625a0 commit af92830

File tree

3 files changed

+35
-13
lines changed

3 files changed

+35
-13
lines changed

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@
2020
import java.io.InputStream;
2121
import java.io.OutputStream;
2222
import java.nio.charset.StandardCharsets;
23+
import java.util.Arrays;
2324

2425
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
2627
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2730

2831
import com.fasterxml.jackson.databind.JavaType;
2932

3033
import jakarta.ws.rs.core.MediaType;
3134

3235
public class ProduceEventStreamProcessor implements ProduceProcessor {
36+
private static final Logger LOGGER = LoggerFactory.getLogger(ProduceEventStreamProcessor.class);
37+
3338
private int writeIndex = 0;
3439

3540
@Override
@@ -61,37 +66,35 @@ public void doEncodeResponse(OutputStream output, Object result) throws Exceptio
6166
@Override
6267
public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
6368
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
69+
LOGGER.info("=========doDecodeResponse buffer===================>" + buffer + "stack: {}", Arrays.toString(
70+
new Exception().getStackTrace()));
6471
SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>();
72+
boolean isResponseEntity = false;
6573
for (String line : buffer.split("\n")) {
6674
if (line.startsWith("eventId: ")) {
6775
responseEntity.eventId(Integer.parseInt(line.substring(9)));
6876
continue;
6977
}
7078
if (line.startsWith("event: ")) {
7179
responseEntity.event(line.substring(7));
80+
isResponseEntity = true;
7281
continue;
7382
}
7483
if (line.startsWith("retry: ")) {
7584
responseEntity.retry(Long.parseLong(line.substring(7)));
85+
isResponseEntity = true;
7686
continue;
7787
}
7888
if (line.startsWith("data: ")) {
7989
responseEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(line.substring(6), type));
8090
}
8191
}
82-
if (isNotResponseEntity(responseEntity)) {
83-
writeIndex++;
92+
if (!isResponseEntity) {
8493
return responseEntity.getData();
8594
}
8695
return responseEntity;
8796
}
8897

89-
private boolean isNotResponseEntity(SseEventResponseEntity<?> responseEntity) {
90-
return StringUtils.isEmpty(responseEntity.getEvent())
91-
&& responseEntity.getRetry() == null
92-
&& (responseEntity.getEventId() != null && responseEntity.getEventId() == writeIndex);
93-
}
94-
9598
@Override
9699
public void refreshEventId(int index) {
97100
this.writeIndex = index;

demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@
2929
import org.reactivestreams.Publisher;
3030
import org.reactivestreams.Subscriber;
3131
import org.reactivestreams.Subscription;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3234
import org.springframework.beans.factory.annotation.Autowired;
3335
import org.springframework.beans.factory.annotation.Qualifier;
3436
import org.springframework.stereotype.Component;
3537

3638
@Component
3739
public class ReactiveStreamIT implements CategorizedTestCase {
40+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamIT.class);
41+
3842
@Autowired
3943
@Qualifier("reactiveStreamProvider")
4044
ReactiveStreamClient reactiveStreamProvider;
@@ -58,8 +62,8 @@ private void testSseStringWithParam(ReactiveStreamClient client) throws Exceptio
5862
}
5963

6064
private String buildBufferString(Publisher<String> result) throws Exception {
61-
StringBuilder buffer = new StringBuilder();
6265
CountDownLatch countDownLatch = new CountDownLatch(1);
66+
StringBuilder buffer = new StringBuilder();
6367
result.subscribe(new Subscriber<>() {
6468
Subscription subscription;
6569

@@ -71,18 +75,21 @@ public void onSubscribe(Subscription s) {
7175

7276
@Override
7377
public void onNext(String s) {
78+
LOGGER.info("=========buildBufferString result===================>" + s);
7479
buffer.append(s);
7580
subscription.request(1);
7681
}
7782

7883
@Override
7984
public void onError(Throwable t) {
85+
LOGGER.info("=========buildBufferString onError===================>");
8086
subscription.cancel();
8187
countDownLatch.countDown();
8288
}
8389

8490
@Override
8591
public void onComplete() {
92+
LOGGER.info("=========buildBufferString onComplete===================>");
8693
countDownLatch.countDown();
8794
}
8895
});
@@ -106,17 +113,20 @@ public void onSubscribe(Subscription s) {
106113
@Override
107114
public void onNext(Model s) {
108115
buffer.append(s.getName()).append(s.getAge());
116+
LOGGER.info("=========testSseModel result===================>" + buffer);
109117
subscription.request(1);
110118
}
111119

112120
@Override
113121
public void onError(Throwable t) {
122+
LOGGER.info("=========testSseModel error===================>");
114123
subscription.cancel();
115124
countDownLatch.countDown();
116125
}
117126

118127
@Override
119128
public void onComplete() {
129+
LOGGER.info("=========testSseModel onComplete===================>");
120130
countDownLatch.countDown();
121131
}
122132
});
@@ -144,17 +154,20 @@ public void onNext(SseEventResponseEntity<Model> responseEntity) {
144154
}
145155
buffer.append(((Model) responseEntity.getData()).getName())
146156
.append(((Model) responseEntity.getData()).getAge());
157+
LOGGER.info("=========testSseResponseEntity result===================>" + buffer);
147158
subscription.request(1);
148159
}
149160

150161
@Override
151162
public void onError(Throwable t) {
163+
LOGGER.info("=========testSseResponseEntity error===================>");
152164
subscription.cancel();
153165
countDownLatch.countDown();
154166
}
155167

156168
@Override
157169
public void onComplete() {
170+
LOGGER.info("=========testSseResponseEntity onComplete===================>");
158171
countDownLatch.countDown();
159172
}
160173
});

demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.servicecomb.springboot.springmvc.server;
1818

19+
import java.util.Arrays;
20+
import java.util.List;
1921
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.atomic.AtomicInteger;
2123

@@ -34,24 +36,28 @@
3436
public class ReactiveStreamController {
3537
@GetMapping("/sseString")
3638
public Publisher<String> sseString() {
37-
return Flowable.fromArray("a", "b", "c");
39+
List<String> data = Arrays.asList("a", "b", "c");
40+
return Flowable.intervalRange(0, 3, 5, 1, TimeUnit.SECONDS)
41+
.map(item -> data.get(item.intValue()));
3842
}
3943

4044
@GetMapping("/sseStringWithParam")
4145
public Publisher<String> sseStringWithParam(@RequestParam(name = "name") String name) {
42-
return Flowable.fromArray("a", "b", "c", name);
46+
List<String> data = Arrays.asList("a", "b", "c", name);
47+
return Flowable.intervalRange(0, 4, 5, 1, TimeUnit.SECONDS)
48+
.map(item -> data.get(item.intValue()));
4349
};
4450

4551
@GetMapping("/sseModel")
4652
public Publisher<Model> sseModel() {
47-
return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
53+
return Flowable.intervalRange(0, 5, 5, 1, TimeUnit.SECONDS)
4854
.map(item -> new Model("jack", item.intValue()));
4955
}
5056

5157
@GetMapping("/sseResponseEntity")
5258
public Publisher<SseEventResponseEntity<Model>> sseResponseEntity() {
5359
AtomicInteger index = new AtomicInteger(0);
54-
return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS)
60+
return Flowable.intervalRange(0, 3, 5, 1, TimeUnit.SECONDS)
5561
.map(item -> new SseEventResponseEntity<Model>()
5662
.event("test" + index)
5763
.eventId(index.getAndIncrement())

0 commit comments

Comments
 (0)