Skip to content

Commit 14c05e7

Browse files
committed
When reconnecting, refresh the eventId based on the last-event-id and add SseEventResponseEntity<T> adapt setting retry/event attributes
1 parent 40d7c41 commit 14c05e7

File tree

11 files changed

+239
-18
lines changed

11 files changed

+239
-18
lines changed

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import java.io.OutputStream;
2222
import java.nio.charset.StandardCharsets;
2323

24+
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
26+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
2527

2628
import com.fasterxml.jackson.databind.JavaType;
2729

@@ -42,21 +44,76 @@ public int getOrder() {
4244

4345
@Override
4446
public void doEncodeResponse(OutputStream output, Object result) throws Exception {
45-
String buffer = "id: " + (writeIndex++) + "\n"
46-
+ "data: "
47-
+ RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(result)
48-
+ "\n\n";
49-
output.write(buffer.getBytes(StandardCharsets.UTF_8));
47+
StringBuilder bufferBuilder = new StringBuilder();
48+
if (result instanceof SseEventResponseEntity<?> responseEntity) {
49+
appendEventId(bufferBuilder, responseEntity.getEventId());
50+
appendEvent(bufferBuilder, responseEntity.getEvent());
51+
appendRetry(bufferBuilder, responseEntity.getRetry());
52+
appendData(bufferBuilder, responseEntity.getData());
53+
} else {
54+
appendEventId(bufferBuilder, writeIndex++);
55+
appendData(bufferBuilder, result);
56+
}
57+
bufferBuilder.append("\n");
58+
output.write(bufferBuilder.toString().getBytes(StandardCharsets.UTF_8));
5059
}
5160

5261
@Override
5362
public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
5463
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
64+
SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>();
5565
for (String line : buffer.split("\n")) {
66+
if (line.startsWith("eventId: ")) {
67+
responseEntity.eventId(Integer.parseInt(line.substring(9)));
68+
continue;
69+
}
70+
if (line.startsWith("event: ")) {
71+
responseEntity.event(line.substring(7));
72+
continue;
73+
}
74+
if (line.startsWith("retry: ")) {
75+
responseEntity.retry(Long.parseLong(line.substring(7)));
76+
continue;
77+
}
5678
if (line.startsWith("data: ")) {
57-
return RestObjectMapperFactory.getRestObjectMapper().readValue(line.substring(5), type);
79+
responseEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(line.substring(6), type));
5880
}
5981
}
60-
return null;
82+
return responseEntity;
83+
}
84+
85+
@Override
86+
public void refreshEventId(int index) {
87+
this.writeIndex = index;
88+
}
89+
90+
private void appendEventId(StringBuilder eventBuilder, Integer eventId) {
91+
if (eventId == null) {
92+
return;
93+
}
94+
eventBuilder.append("eventId: ").append(eventId.intValue()).append("\n");
95+
}
96+
97+
private void appendEvent(StringBuilder eventBuilder, String event) {
98+
if (StringUtils.isEmpty(event)) {
99+
return;
100+
}
101+
eventBuilder.append("event: ").append(event).append("\n");
102+
}
103+
104+
private void appendRetry(StringBuilder eventBuilder, Long retry) {
105+
if (retry == null) {
106+
return;
107+
}
108+
eventBuilder.append("retry: ").append(retry.longValue()).append("\n");
109+
}
110+
111+
private void appendData(StringBuilder eventBuilder, Object data) throws Exception {
112+
if (data == null) {
113+
throw new Exception("sse response data is null!");
114+
}
115+
eventBuilder.append("data: ")
116+
.append(RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(data))
117+
.append("\n");
61118
}
62119
}

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,7 @@ default Object decodeResponse(Buffer buffer, JavaType type) throws Exception {
7676
return doDecodeResponse(input, type);
7777
}
7878
}
79+
80+
default void refreshEventId(int index) {
81+
}
7982
}

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424

25+
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.servicecomb.common.rest.RestConst;
2627
import org.apache.servicecomb.common.rest.codec.RestCodec;
2728
import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
@@ -85,7 +86,7 @@ public CompletableFuture<Void> beforeSendResponseAsync(Invocation invocation, Ht
8586
if (isServerSendEvent(response)) {
8687
produceProcessor = new ProduceEventStreamProcessor();
8788
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
88-
return writeServerSendEvent(response, produceProcessor, responseEx);
89+
return writeServerSendEvent(invocation, response, produceProcessor, responseEx);
8990
}
9091

9192
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
@@ -109,9 +110,10 @@ public static boolean isServerSendEvent(Response response) {
109110
return response.getResult() instanceof Publisher<?>;
110111
}
111112

112-
private static CompletableFuture<Void> writeServerSendEvent(Response response, ProduceProcessor produceProcessor,
113-
HttpServletResponseEx responseEx) {
113+
private static CompletableFuture<Void> writeServerSendEvent(Invocation invocation, Response response,
114+
ProduceProcessor produceProcessor, HttpServletResponseEx responseEx) {
114115
responseEx.setChunked(true);
116+
refreshEventId(invocation.getRequestEx(), produceProcessor);
115117
CompletableFuture<Void> result = new CompletableFuture<>();
116118
Publisher<?> publisher = response.getResult();
117119
publisher.subscribe(new Subscriber<Object>() {
@@ -148,6 +150,14 @@ public void onComplete() {
148150
return result;
149151
}
150152

153+
private static void refreshEventId(HttpServletRequestEx requestEx, ProduceProcessor produceProcessor) {
154+
String lastEventId = requestEx.getHeader("last-event-id");
155+
if (StringUtils.isEmpty(lastEventId)) {
156+
return;
157+
}
158+
produceProcessor.refreshEventId(Integer.parseInt(lastEventId));
159+
}
160+
151161
private static CompletableFuture<Response> writeResponse(
152162
HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, Object data, Response response) {
153163
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {

demo/demo-cse-v2/consumer/src/main/java/org/apache/servicecomb/samples/ConsumerReactiveStreamController.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,23 @@
2020
import org.apache.servicecomb.core.annotation.Transport;
2121
import org.apache.servicecomb.provider.pojo.RpcReference;
2222
import org.apache.servicecomb.provider.rest.common.RestSchema;
23+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
2324
import org.reactivestreams.Publisher;
2425
import org.springframework.web.bind.annotation.GetMapping;
2526
import org.springframework.web.bind.annotation.RequestMapping;
27+
import org.springframework.web.bind.annotation.RequestParam;
28+
29+
import jakarta.servlet.http.HttpServletRequest;
2630

2731
@RestSchema(schemaId = "ReactiveStreamController")
2832
@RequestMapping(path = "/")
2933
public class ConsumerReactiveStreamController {
3034
interface ProviderReactiveStreamController {
31-
Publisher<String> sseString();
35+
Publisher<String> sseString(String param1);
3236

3337
Publisher<Model> sseModel();
38+
39+
Publisher<SseEventResponseEntity<Model>> sseResponseTest();
3440
}
3541

3642
@RpcReference(microserviceName = "provider", schemaId = "ReactiveStreamController")
@@ -71,13 +77,19 @@ public Model setName(String name) {
7177

7278
@GetMapping("/sseString")
7379
@Transport(name = "rest")
74-
public Publisher<String> sseString() {
75-
return controller.sseString();
80+
public Publisher<String> sseString(@RequestParam(name = "param1") String param1, HttpServletRequest request) {
81+
return controller.sseString(param1);
7682
}
7783

7884
@GetMapping("/sseModel")
7985
@Transport(name = "rest")
8086
public Publisher<Model> sseModel() {
8187
return controller.sseModel();
8288
}
89+
90+
@GetMapping("/sseResponseTest")
91+
@Transport(name = "rest")
92+
public Publisher<SseEventResponseEntity<Model>> sseResponseTest() {
93+
return controller.sseResponseTest();
94+
}
8395
}

demo/demo-cse-v2/gateway/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,20 @@
3333
<dependency>
3434
<groupId>org.apache.servicecomb</groupId>
3535
<artifactId>java-chassis-spring-boot-starter-standalone</artifactId>
36+
<exclusions>
37+
<exclusion>
38+
<groupId>ch.qos.logback</groupId>
39+
<artifactId>logback-core</artifactId>
40+
</exclusion>
41+
<exclusion>
42+
<groupId>ch.qos.logback</groupId>
43+
<artifactId>logback-classic</artifactId>
44+
</exclusion>
45+
<exclusion>
46+
<groupId>org.apache.logging.log4j</groupId>
47+
<artifactId>log4j-to-slf4j</artifactId>
48+
</exclusion>
49+
</exclusions>
3650
</dependency>
3751
<dependency>
3852
<groupId>org.apache.servicecomb</groupId>

demo/demo-cse-v2/provider/src/main/java/org/apache/servicecomb/samples/ReactiveStreamController.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919

2020
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import org.apache.servicecomb.core.annotation.Transport;
2324
import org.apache.servicecomb.provider.rest.common.RestSchema;
25+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
2426
import org.reactivestreams.Publisher;
2527
import org.springframework.web.bind.annotation.GetMapping;
2628
import org.springframework.web.bind.annotation.RequestMapping;
29+
import org.springframework.web.bind.annotation.RequestParam;
2730

2831
import io.reactivex.rxjava3.core.Flowable;
2932

@@ -65,13 +68,24 @@ public Model setName(String name) {
6568
}
6669

6770
@GetMapping("/sseString")
68-
public Publisher<String> sseString() {
69-
return Flowable.fromArray("a", "b", "c");
71+
public Publisher<String> sseString(@RequestParam(name = "param1") String param1) {
72+
return Flowable.fromArray("a", "b", "c", param1);
7073
}
7174

7275
@GetMapping("/sseModel")
7376
public Publisher<Model> sseModel() {
7477
return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
7578
.map(item -> new Model("jack", item.intValue()));
7679
}
80+
81+
@GetMapping("/sseResponseTest")
82+
public Publisher<SseEventResponseEntity<Model>> sseResponseTest() {
83+
AtomicInteger index = new AtomicInteger(0);
84+
return Flowable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
85+
.map(item -> new SseEventResponseEntity<Model>()
86+
.event("testEvent" + index)
87+
.eventId(index.getAndIncrement())
88+
.retry(System.currentTimeMillis())
89+
.data(new Model("jack", item.intValue())));
90+
}
7791
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.servicecomb.swagger.sse;
19+
20+
import jakarta.validation.constraints.NotNull;
21+
22+
public class SseEventResponseEntity<T> {
23+
/**
24+
* event id
25+
*/
26+
private Integer eventId;
27+
28+
/**
29+
* event type
30+
*/
31+
private String event;
32+
33+
/**
34+
* reconnection time
35+
*/
36+
private Long retry;
37+
38+
/**
39+
* business data
40+
*/
41+
@NotNull
42+
private Object data;
43+
44+
public SseEventResponseEntity<T> eventId(int eventId) {
45+
this.eventId = eventId;
46+
return this;
47+
}
48+
49+
public SseEventResponseEntity<T> event(String event) {
50+
this.event = event;
51+
return this;
52+
}
53+
54+
public SseEventResponseEntity<T> retry(long retry) {
55+
this.retry = retry;
56+
return this;
57+
}
58+
59+
public SseEventResponseEntity<T> data(T data) throws Exception {
60+
this.data = data;
61+
return this;
62+
}
63+
64+
public Integer getEventId() {
65+
return eventId;
66+
}
67+
68+
public String getEvent() {
69+
return event;
70+
}
71+
72+
public Long getRetry() {
73+
return retry;
74+
}
75+
76+
public Object getData() {
77+
return data;
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.servicecomb.swagger.generator.springmvc.processor.response;
18+
19+
import org.apache.servicecomb.swagger.generator.core.processor.response.DefaultResponseTypeProcessor;
20+
import org.apache.servicecomb.swagger.sse.SseEventResponseEntity;
21+
import org.springframework.http.ResponseEntity;
22+
23+
public class SseEventResponseEntityProcessor extends DefaultResponseTypeProcessor {
24+
public SseEventResponseEntityProcessor() {
25+
extractActualType = true;
26+
}
27+
28+
@Override
29+
public Class<?> getProcessType() {
30+
return SseEventResponseEntity.class;
31+
}
32+
}

swagger/swagger-generator/generator-springmvc/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@
1515
# limitations under the License.
1616
#
1717

18-
org.apache.servicecomb.swagger.generator.springmvc.processor.response.ResponseEntityProcessor
18+
org.apache.servicecomb.swagger.generator.springmvc.processor.response.ResponseEntityProcessor
19+
org.apache.servicecomb.swagger.generator.springmvc.processor.response.SseEventResponseEntityProcessor

transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ protected Response extractResponse(Invocation invocation, HttpServletResponseEx
105105
produceProcessor = ProduceProcessorManager.INSTANCE.findDefaultProcessor();
106106
}
107107

108-
109108
try {
110109
if (responseEx.getAttribute(Const.FLOWABLE_CLIENT_RESPONSE) == null) {
111110
result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(), responseType);

0 commit comments

Comments
 (0)