Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
40d7c41
[#4873] Support for SSE interface RPC calls and register SSE schema
chengyouling Jul 8, 2025
14c05e7
When reconnecting, refresh the eventId based on the last-event-id and…
chengyouling Jul 23, 2025
7eda5d9
fixed test problems
chengyouling Jul 23, 2025
517480b
fixed style problem
chengyouling Jul 23, 2025
500e8b3
add testcase
chengyouling Jul 24, 2025
5f3a593
add testcase
chengyouling Jul 24, 2025
a7135bd
only test rest
chengyouling Jul 24, 2025
58050a5
test reactive testcase
chengyouling Jul 25, 2025
45625a0
move testcase to spring-boot-springmvc
chengyouling Jul 25, 2025
af92830
for test
chengyouling Jul 25, 2025
0332c62
adjust Flowable result
chengyouling Jul 29, 2025
0cd0c47
fixed testcase
chengyouling Jul 29, 2025
0e3e646
adjust flowable response decode
chengyouling Jul 30, 2025
9f7a9dc
for test
chengyouling Jul 30, 2025
7076c84
add flushBuffer
chengyouling Jul 30, 2025
76da23e
change using flushStreamBuffer for test
chengyouling Jul 30, 2025
2b32da4
fixed testcase
chengyouling Jul 30, 2025
255dee9
add response parsing
chengyouling Jul 31, 2025
7fb5f75
fixed Multiple transfer-encoding=chunked in Rest Over Servlet
chengyouling Jul 31, 2025
cfbe610
fixed review options,compatible use \r\n \n \r,add PublisherProduce…
chengyouling Aug 12, 2025
10255be
delete not use EVENTS_MEDIA_TYPE
chengyouling Aug 12, 2025
1adfc0b
add warn for encoding
chengyouling Aug 13, 2025
4c2c34e
adjust eventId to id, resolution concurrency using writeIndex problem
chengyouling Aug 14, 2025
63e6c1a
Compatibility settings for multiple data
chengyouling Aug 15, 2025
0812b8b
Compatible with both client and server, addressing scenarios of incon…
chengyouling Aug 18, 2025
e38bb52
add catch for subscribe
chengyouling Aug 18, 2025
0da8280
Compatible with both packet aggregation and packet fragmentation scen…
chengyouling Aug 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- name: Set up jdk
uses: actions/setup-java@v3
with:
java-version: '21'
java-version: '17'
distribution: 'temurin'
- name: Set up Maven
uses: stCarolas/[email protected]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e)

if (!(response.getResult() instanceof ServerWebSocket)) {
try {
responseEx.flushBuffer();
responseEx.endResponse();
} catch (Throwable flushException) {
LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
getMicroserviceQualifiedName(), requestEx.getRequestURI(), flushException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ protected Invocation sendCreateInvocationException(Throwable throwable) {
requestEx.getRequestURI(), e);
}

flushResponse("UNKNOWN_OPERATION");
endResponse("UNKNOWN_OPERATION");
return null;
}

@Override
protected void sendResponse(Invocation invocation, Response response) {
if (isDownloadFileResponseType(invocation, response)) {
responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()))
.whenComplete((r, e) -> flushResponse(invocation.getMicroserviceQualifiedName()));
.whenComplete((r, e) -> endResponse(invocation.getMicroserviceQualifiedName()));
return;
}

flushResponse(invocation.getMicroserviceQualifiedName());
endResponse(invocation.getMicroserviceQualifiedName());
}

private void flushResponse(String operationName) {
private void endResponse(String operationName) {
try {
responseEx.flushBuffer();
responseEx.endResponse();
} catch (Throwable flushException) {
LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
operationName, requestEx.getRequestURI(), flushException);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.servicecomb.common.rest.codec.produce;

import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;

import com.fasterxml.jackson.databind.JavaType;

import jakarta.ws.rs.core.MediaType;

public class ProduceEventStreamProcessor implements ProduceProcessor {
public static final List<String> DEFAULT_DELIMITERS = Arrays.asList("\r\n", "\n", "\r");

@Override
public String getName() {
return MediaType.SERVER_SENT_EVENTS;
}

@Override
public int getOrder() {
return 0;
}

@Override
public void doEncodeResponse(OutputStream output, Object result) throws Exception {
StringBuilder bufferBuilder = new StringBuilder();
if (result instanceof SseEventResponseEntity<?> responseEntity) {
appendEventId(bufferBuilder, responseEntity.getEventId());
appendEvent(bufferBuilder, responseEntity.getEvent());
appendRetry(bufferBuilder, responseEntity.getRetry());
appendData(bufferBuilder, responseEntity.getData());
bufferBuilder.append("\n");
output.write(bufferBuilder.toString().getBytes(StandardCharsets.UTF_8));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议保留 else 分支, 打印告警日志 ---- 如果 result 类型不是 SseEventResponseEntity, 我们不应该悄无声息地把错误放过去了.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

@Override
public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
List<String> lines = new ArrayList<>();
splitStringByDelimiters(buffer, lines);
SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>();
for (String line : lines) {
if (line.startsWith("eventId:")) {
responseEntity.eventId(Integer.parseInt(line.substring("eventId:".length()).trim()));
continue;
}
if (line.startsWith("event:")) {
responseEntity.event(line.substring("event:".length()).trim());
continue;
}
if (line.startsWith("retry:")) {
responseEntity.retry(Long.parseLong(line.substring("retry:".length()).trim()));
continue;
}
if (line.startsWith("data:")) {
responseEntity.data(RestObjectMapperFactory.getRestObjectMapper()
.readValue(line.substring("data:".length()).trim(), type));
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里要不要加个消息合法性校验, 比如之前集成测试遇到过的消息粘包导致的解析问题. 如果字段重复了, 打一个告警日志会好分析很多.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return responseEntity;
}

private void splitStringByDelimiters(String str, List<String> lines) {
boolean isContainsDelimiters = false;
for (String split : DEFAULT_DELIMITERS) {
if (str.contains(split)) {
isContainsDelimiters = true;
splitStrings(str.split(split), lines);
}
}
if (!isContainsDelimiters) {
lines.add(str);
}
}

private void splitStrings(String[] strings, List<String> lines) {
for (String str : strings) {
if (StringUtils.isEmpty(str)) {
continue;
}
splitStringByDelimiters(str, lines);
}
}

private void appendEventId(StringBuilder eventBuilder, Integer eventId) {
if (eventId == null) {
return;
}
eventBuilder.append("eventId: ").append(eventId.intValue()).append("\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

协议规定的消息ID的名字是"id", 不是"eventId", 这里需要改一下

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

private void appendEvent(StringBuilder eventBuilder, String event) {
if (StringUtils.isEmpty(event)) {
return;
}
eventBuilder.append("event: ").append(event).append("\n");
}

private void appendRetry(StringBuilder eventBuilder, Long retry) {
if (retry == null) {
return;
}
eventBuilder.append("retry: ").append(retry.longValue()).append("\n");
}

private void appendData(StringBuilder eventBuilder, Object data) throws Exception {
if (data == null) {
throw new Exception("sse response data is null!");
}
eventBuilder.append("data: ")
.append(RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(data))
.append("\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class RestOperationMeta {
// 快速构建URL path
private URLPathBuilder pathBuilder;

protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS;

public void init(OperationMeta operationMeta) {
this.operationMeta = operationMeta;

Expand Down Expand Up @@ -258,6 +260,12 @@ protected void doCreateProduceProcessors(Class<?> serialViewClass) {
ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass));
} else {
for (String produce : produces) {
if (produce.equals(EVENTS_MEDIA_TYPE)) {
// When the produce type is event-stream, the ProduceEventStreamProcessor implementation class corresponding
// to event-stream is not added, and it is set to the default type ProduceJsonProcessor.
// In case of an exception, the response result is parsed.
continue;
}
if (produce.contains(";")) {
produce = produce.substring(0, produce.indexOf(";"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.apache.servicecomb.swagger.invocation.Response;
import org.springframework.stereotype.Component;

import io.netty.buffer.Unpooled;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;

@Component
public class RestServerCodecFilter implements ProducerFilter {
Expand Down Expand Up @@ -103,7 +103,7 @@ public static CompletableFuture<Response> encodeResponse(Response response, bool
}

responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) {
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
produceProcessor.encodeResponse(output, response.getResult());

responseEx.setBodyBuffer(output.getBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter.isDownloadFileResponseType;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.servicecomb.common.rest.RestConst;
import org.apache.servicecomb.common.rest.codec.RestCodec;
import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
Expand All @@ -36,15 +38,22 @@
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.config.DynamicPropertyFactory;

import io.netty.buffer.Unpooled;
import io.vertx.core.buffer.Buffer;

public class ServerRestArgsFilter implements HttpServerFilter {
private static final boolean enabled = DynamicPropertyFactory.getInstance().getBooleanProperty
("servicecomb.http.filter.server.serverRestArgs.enabled", true).get();

private static final Logger LOGGER = LoggerFactory.getLogger(ServerRestArgsFilter.class);

@Override
public int getOrder() {
return -100;
Expand Down Expand Up @@ -74,10 +83,15 @@ public CompletableFuture<Void> beforeSendResponseAsync(Invocation invocation, Ht
return responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()));
}

responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
if (isServerSendEvent(response)) {
produceProcessor = new ProduceEventStreamProcessor();
responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
return writeServerSendEvent(invocation, response, produceProcessor, responseEx);
}

responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8");
CompletableFuture<Void> future = new CompletableFuture<>();
try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) {
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
if (failed) {
produceProcessor.encodeResponse(output, ((InvocationException) response.getResult()).getErrorData());
} else {
Expand All @@ -91,4 +105,71 @@ public CompletableFuture<Void> beforeSendResponseAsync(Invocation invocation, Ht
}
return future;
}

public static boolean isServerSendEvent(Response response) {
return response.getResult() instanceof Publisher<?>;
}

private static CompletableFuture<Void> writeServerSendEvent(Invocation invocation, Response response,
ProduceProcessor produceProcessor, HttpServletResponseEx responseEx) {
responseEx.setChunkedForEvent(true);
CompletableFuture<Void> result = new CompletableFuture<>();
Publisher<?> publisher = response.getResult();
publisher.subscribe(new Subscriber<Object>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
s.request(1);
subscription = s;
}

@Override
public void onNext(Object o) {
writeResponse(responseEx, produceProcessor, o, response).whenComplete((r, e) -> {
if (e != null) {
subscription.cancel();
result.completeExceptionally(e);
return;
}
subscription.request(1);
});
}

@Override
public void onError(Throwable t) {
result.completeExceptionally(t);
}

@Override
public void onComplete() {
result.complete(null);
}
});
return result;
}

private static CompletableFuture<Response> writeResponse(
HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, Object data, Response response) {
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
produceProcessor.encodeResponse(output, data);
CompletableFuture<Response> result = new CompletableFuture<>();
responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
if (e != null) {
result.completeExceptionally(e);
}
try {
responseEx.flushBuffer();
} catch (IOException ex) {
LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
}
});
result.complete(response);
return result;
} catch (Throwable e) {
LOGGER.error("internal service error must be fixed.", e);
responseEx.setStatus(500);
return CompletableFuture.failedFuture(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
#

org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor
org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor
org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void setStatus(int sc) {
}

@Override
public void flushBuffer() {
public void endResponse() {
endCount.value = endCount.value + 1;
}

Expand Down Expand Up @@ -956,7 +956,7 @@ public void setStatus(int sc) {
}

@Override
public void flushBuffer() {
public void endResponse() {
endCount.value = endCount.value + 1;
}

Expand Down
Loading
Loading