diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 3d5f372d76f..5788af888b0 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -36,7 +36,7 @@ jobs: - name: Set up jdk uses: actions/setup-java@v3 with: - java-version: '21' + java-version: '17' distribution: 'temurin' - name: Set up Maven uses: stCarolas/setup-maven@v4.5 diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java index 1f3d18766cd..ede3e94d4e9 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java @@ -351,7 +351,7 @@ protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) if (!(response.getResult() instanceof ServerWebSocket)) { try { - responseEx.flushBuffer(); + responseEx.endResponse(); } catch (Throwable flushException) { LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}", getMicroserviceQualifiedName(), requestEx.getRequestURI(), flushException); diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java index 70c3a5a3a01..673e45ee294 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java @@ -53,7 +53,7 @@ protected Invocation sendCreateInvocationException(Throwable throwable) { requestEx.getRequestURI(), e); } - flushResponse("UNKNOWN_OPERATION"); + endResponse("UNKNOWN_OPERATION"); return null; } @@ -61,16 +61,16 @@ protected Invocation sendCreateInvocationException(Throwable throwable) { protected void sendResponse(Invocation invocation, Response response) { if (isDownloadFileResponseType(invocation, response)) { responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult())) - .whenComplete((r, e) -> flushResponse(invocation.getMicroserviceQualifiedName())); + .whenComplete((r, e) -> endResponse(invocation.getMicroserviceQualifiedName())); return; } - flushResponse(invocation.getMicroserviceQualifiedName()); + endResponse(invocation.getMicroserviceQualifiedName()); } - private void flushResponse(String operationName) { + private void endResponse(String operationName) { try { - responseEx.flushBuffer(); + responseEx.endResponse(); } catch (Throwable flushException) { LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}", operationName, requestEx.getRequestURI(), flushException); diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java new file mode 100644 index 00000000000..109a460cd9d --- /dev/null +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.common.rest.codec.produce; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory; +import org.apache.servicecomb.foundation.vertx.stream.BufferInputStream; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JavaType; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.reactivex.rxjava3.core.Flowable; +import io.vertx.core.buffer.Buffer; +import jakarta.ws.rs.core.MediaType; + +public class ProduceEventStreamProcessor implements ProduceProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(ProduceEventStreamProcessor.class); + + private static final String CR_STR = "\r"; + + private static final byte[] CR = CR_STR.getBytes(StandardCharsets.UTF_8); + + private static final String LF_STR = "\n"; + + private static final byte[] LF = LF_STR.getBytes(StandardCharsets.UTF_8); + + private static final String CRLF_STR = "\r\n"; + + private static final byte[] CRLF = CRLF_STR.getBytes(StandardCharsets.UTF_8); + + private String lineDelimiter; + + private byte[] lineDelimiterBytes; + + private int writeIndex = 0; + + @Override + public String getName() { + return MediaType.SERVER_SENT_EVENTS; + } + + @Override + public int getOrder() { + return 0; + } + + @Override + public void doEncodeResponse(OutputStream output, Object result) throws Exception { + StringBuilder eventBuilder = new StringBuilder(); + if (result instanceof SseEventResponseEntity responseEntity) { + appendId(eventBuilder, responseEntity.getId()); + appendEvent(eventBuilder, responseEntity.getEvent()); + appendRetry(eventBuilder, responseEntity.getRetry()); + appendData(eventBuilder, responseEntity.getData()); + eventBuilder.append("\n"); + output.write(eventBuilder.toString().getBytes(StandardCharsets.UTF_8)); + } else { + LOGGER.warn("Does not support encoding objects other than SseEventResponseEntity!"); + } + } + + private enum ProcessStatus { + DETERMINE_LINE_DELIMITER, + MATCHING_CR, + MATCHING_LF, + MATCHING_CRLF, + MATCHING_LINE, + END_OF_MESSAGE, + /** + * The whole SSE stream is closed. + * Be careful: there may be remaining buffer should be processed. + */ + END_OF_STREAM + } + + private ProcessStatus loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER; + + private int matchingDelimiterIndex = 0; + + final ByteBuf buffer = Unpooled.buffer(); + + private SseEventResponseEntity currentEntity = new SseEventResponseEntity<>(); + + private List> entityList = new ArrayList<>(); + + private JavaType type; + + @Override + public List> doDecodeResponse(InputStream input, JavaType type) throws Exception { + this.type = type; + final byte[] readCache = new byte[Math.min(128, input.available())]; + int bytesRead; + while ((bytesRead = input.read(readCache)) > 0) { + processAllBytes(readCache, bytesRead); + } + final List> resultList = entityList; + entityList = new ArrayList<>(); + return resultList; + } + + private void processAllBytes(byte[] readCache, int cacheEndPos) { + int lastProcessedPosition = innerLoop(readCache, 0, cacheEndPos); + while (lastProcessedPosition < cacheEndPos) { + lastProcessedPosition = innerLoop(readCache, lastProcessedPosition, cacheEndPos); + } + } + + private int innerLoop(final byte[] readCache, final int startPos, final int cacheEndPos) { + if (startPos >= cacheEndPos) { + return cacheEndPos; + } + switch (loopStatus) { + case MATCHING_CR -> { + return tryToMatchDelimiterCR(readCache, startPos, cacheEndPos); + } + case MATCHING_CRLF -> { + return tryToMatchDelimiterCRLF(readCache, startPos, cacheEndPos); + } + case MATCHING_LF -> { + return tryToMatchDelimiterLF(readCache, startPos, cacheEndPos); + } + case DETERMINE_LINE_DELIMITER -> { + return searchFirstLineDelimiter(readCache, startPos, cacheEndPos); + } + case MATCHING_LINE -> { + return bufferReadCacheAndProcessLines(readCache, startPos, cacheEndPos); + } + case END_OF_STREAM -> { + return processLeftBuffer(cacheEndPos); + } + default -> throw new IllegalStateException("unexpected case"); + } + } + + private int processLeftBuffer(int cacheEndPos) { + final byte[] bytes = readAllBytesFromBuffer(buffer); + final String bufferStr = new String(bytes, StandardCharsets.UTF_8); + processStringBuffer(bufferStr); + return cacheEndPos; + } + + private int bufferReadCacheAndProcessLines(byte[] readCache, int startPos, int cacheEndPos) { + buffer.writeBytes(readCache, startPos, cacheEndPos - startPos); + processAllAvailableBufferLines(); + return cacheEndPos; + } + + private int tryToMatchDelimiterCR(byte[] readCache, int startPos, int cacheEndPos) { + int bytesProcessed = 0; + for (; matchingDelimiterIndex < CR.length && startPos + bytesProcessed < cacheEndPos; ++bytesProcessed) { + if (readCache[startPos + bytesProcessed] == CR[matchingDelimiterIndex]) { + buffer.writeByte(readCache[startPos + bytesProcessed]); + ++matchingDelimiterIndex; + } else { + loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER; + matchingDelimiterIndex = 0; + return startPos + bytesProcessed; + } + } + if (matchingDelimiterIndex == CR.length) { + // matched all CR bytes, attempting to further match CRLF. + loopStatus = ProcessStatus.MATCHING_CRLF; + } + return startPos + bytesProcessed; + } + + private int tryToMatchDelimiterCRLF(byte[] readCache, int startPos, int cacheEndPos) { + // If you enter this branch, it means that at least CR should be used as the line break character. + int bytesProcessed = 0; + for (; matchingDelimiterIndex < CRLF.length && startPos + bytesProcessed < cacheEndPos; ++bytesProcessed) { + if (readCache[startPos + bytesProcessed] == CRLF[matchingDelimiterIndex]) { + buffer.writeByte(readCache[startPos + bytesProcessed]); + ++matchingDelimiterIndex; + } else { + determineDelimiter(CR_STR, CR); + return startPos + bytesProcessed; + } + } + if (matchingDelimiterIndex == CRLF.length) { + determineDelimiter(CRLF_STR, CRLF); + } + return startPos + bytesProcessed; + } + + private int tryToMatchDelimiterLF(byte[] readCache, int startPos, int cacheEndPos) { + int bytesProcessed = 0; + for (; matchingDelimiterIndex < LF.length && startPos + bytesProcessed < cacheEndPos; ++bytesProcessed) { + if (readCache[startPos + bytesProcessed] == LF[matchingDelimiterIndex]) { + buffer.writeByte(readCache[startPos + bytesProcessed]); + ++matchingDelimiterIndex; + } else { + loopStatus = ProcessStatus.DETERMINE_LINE_DELIMITER; + matchingDelimiterIndex = 0; + return startPos + bytesProcessed; + } + } + if (matchingDelimiterIndex == LF.length) { + determineDelimiter(LF_STR, LF); + } + return startPos + bytesProcessed; + } + + private void determineDelimiter(String delimiterStr, byte[] delimiterBytes) { + lineDelimiter = delimiterStr; + lineDelimiterBytes = delimiterBytes; + matchingDelimiterIndex = 0; + loopStatus = ProcessStatus.MATCHING_LINE; + } + + private int searchFirstLineDelimiter(byte[] readCache, int startPos, int cacheEndPos) { + for (int i = startPos; i < cacheEndPos; ++i) { + if (readCache[i] == CR[0]) { + loopStatus = ProcessStatus.MATCHING_CR; + matchingDelimiterIndex = 0; + return i; + } else if (readCache[i] == LF[0]) { + loopStatus = ProcessStatus.MATCHING_LF; + matchingDelimiterIndex = 0; + return i; + } else { + buffer.writeByte(readCache[i]); + } + } + return cacheEndPos; + } + + private void processAllAvailableBufferLines() { + while (buffer.readableBytes() > 0) { + final byte[] bytes = readALineOfBytesFromBuffer(buffer); + if (bytes == null || bytes.length == 0) { + return; + } + final String bufferStr = new String(bytes, StandardCharsets.UTF_8); + processStringBuffer(bufferStr); + } + } + + private void processStringBuffer(String bufferStr) { + int cursor = 0; + int delimiterIdx; + while ((delimiterIdx = bufferStr.indexOf(lineDelimiter, cursor)) >= 0) { + final String line = bufferStr.substring(cursor, delimiterIdx); + processStringLine(line); + cursor = delimiterIdx + lineDelimiter.length(); + } + if (cursor < bufferStr.length()) { + buffer.writeBytes(bufferStr.substring(cursor).getBytes(StandardCharsets.UTF_8)); + } + } + + private void processStringLine(String line) { + if (StringUtils.isBlank(line)) { + if (currentEntity.isEmpty()) { + return; + } + entityList.add(currentEntity); + currentEntity = new SseEventResponseEntity<>(); + return; + } + final String[] split = line.split(":", 2); + if (split.length < 2) { + LOGGER.error("get a line of sse event without colon! stream is breaking!"); + throw new IllegalStateException("get a line of sse event without colon!"); + } + switch (split[0]) { + case "event" -> { + if (StringUtils.isNotBlank(split[1])) { + currentEntity.event(split[1].trim()); + } + } + case "id" -> { + if (StringUtils.isNotBlank(split[1])) { + currentEntity.id(Integer.parseInt(split[1].trim())); + } + } + case "data" -> { + try { + currentEntity.data(RestObjectMapperFactory.getRestObjectMapper().readValue(split[1].trim(), type)); + } catch (JsonProcessingException e) { + LOGGER.error("failed to process data of sse event: [{}]", e.getMessage()); + throw new IllegalStateException("failed to process data of sse event", e); + } + } + case "retry" -> { + if (StringUtils.isNotBlank(split[1])) { + currentEntity.retry(Long.parseLong(split[1].trim())); + } + } + default -> { + LOGGER.debug("unrecognized sse message line! ignored string segment length=[{}]", line.length()); + } + } + } + + private byte[] readALineOfBytesFromBuffer(ByteBuf buffer) { + matchingDelimiterIndex = 0; + try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(buffer.readableBytes())) { + while (buffer.readableBytes() > 0 && matchingDelimiterIndex < lineDelimiterBytes.length) { + final byte b = buffer.readByte(); + if (b == lineDelimiterBytes[matchingDelimiterIndex]) { + ++matchingDelimiterIndex; + } + bos.write(b); + } + if (matchingDelimiterIndex < lineDelimiterBytes.length) { + // The newline character was not matched, so this part of the buffer does not constitute a complete line of + // content and needs to remain in the buffer, waiting for the next segment to arrive for processing. + buffer.writeBytes(bos.toByteArray()); + return null; + } + matchingDelimiterIndex = 0; + return bos.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException("impossible error while closing ByteArrayOutputStream", e); + } + } + + private byte[] readAllBytesFromBuffer(ByteBuf buffer) { + try (final ByteArrayOutputStream bos = new ByteArrayOutputStream(buffer.readableBytes())) { + buffer.readBytes(bos, buffer.readableBytes()); + return bos.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException("impossible error while closing ByteArrayOutputStream", e); + } + } + + private void appendId(StringBuilder eventBuilder, Integer eventId) { + int id = eventId != null ? eventId : writeIndex++; + eventBuilder.append("id: ").append(id).append("\n"); + } + + private void appendEvent(StringBuilder eventBuilder, String event) { + if (StringUtils.isEmpty(event)) { + return; + } + eventBuilder.append("event: ").append(event).append("\n"); + } + + private void appendRetry(StringBuilder eventBuilder, Long retry) { + if (retry == null) { + return; + } + eventBuilder.append("retry: ").append(retry.longValue()).append("\n"); + } + + private void appendData(StringBuilder eventBuilder, List datas) throws Exception { + if (CollectionUtils.isEmpty(datas)) { + throw new Exception("sse response data is null!"); + } + for (Object data : datas) { + eventBuilder.append("data: ") + .append(RestObjectMapperFactory.getRestObjectMapper().writeValueAsString(data)) + .append("\n"); + } + } + + @Override + public Publisher> decodeResponse(Buffer buffer, JavaType type) throws Exception { + if (buffer.length() == 0) { + return Flowable.empty(); + } + + try (BufferInputStream input = new BufferInputStream(buffer.getByteBuf())) { + final List> list = doDecodeResponse(input, type); + return Flowable.fromIterable(list); + } + } + + public Publisher> close() throws Exception { + if (type == null) { + return Flowable.empty(); + } + try (final ByteArrayInputStream input = new ByteArrayInputStream( + (lineDelimiter + lineDelimiter).getBytes(StandardCharsets.UTF_8))) { + // Write two additional newline characters into the buffer to ensure that the processor completes + // processing all remaining content in the buffer. + final List> list = doDecodeResponse(input, type); + return Flowable.fromIterable(list); + } + } +} diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java index 5061ef783cc..9da6d06f32d 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java @@ -88,6 +88,8 @@ public class RestOperationMeta { // 快速构建URL path private URLPathBuilder pathBuilder; + protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS; + public void init(OperationMeta operationMeta) { this.operationMeta = operationMeta; @@ -258,6 +260,12 @@ protected void doCreateProduceProcessors(Class serialViewClass) { ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass)); } else { for (String produce : produces) { + if (produce.contains(EVENTS_MEDIA_TYPE)) { + // When the produce type is event-stream, the ProduceEventStreamProcessor implementation class corresponding + // to event-stream is not added, and it is set to the default type ProduceJsonProcessor. + // In case of an exception, the response result is parsed. + continue; + } if (produce.contains(";")) { produce = produce.substring(0, produce.indexOf(";")); } diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java index eaf62b632a4..f556ffa0507 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/RestServerCodecFilter.java @@ -44,8 +44,8 @@ import org.apache.servicecomb.swagger.invocation.Response; import org.springframework.stereotype.Component; -import io.netty.buffer.Unpooled; import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; @Component public class RestServerCodecFilter implements ProducerFilter { @@ -103,7 +103,7 @@ public static CompletableFuture encodeResponse(Response response, bool } responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8"); - try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) { + try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) { produceProcessor.encodeResponse(output, response.getResult()); responseEx.setBodyBuffer(output.getBuffer()); diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java index bef5b48a477..281769d756d 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java @@ -19,11 +19,13 @@ import static org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter.isDownloadFileResponseType; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.servicecomb.common.rest.RestConst; import org.apache.servicecomb.common.rest.codec.RestCodec; +import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; import org.apache.servicecomb.common.rest.definition.RestOperationMeta; import org.apache.servicecomb.common.rest.filter.HttpServerFilter; @@ -36,15 +38,22 @@ import org.apache.servicecomb.swagger.invocation.Response; import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.netflix.config.DynamicPropertyFactory; -import io.netty.buffer.Unpooled; +import io.vertx.core.buffer.Buffer; public class ServerRestArgsFilter implements HttpServerFilter { private static final boolean enabled = DynamicPropertyFactory.getInstance().getBooleanProperty ("servicecomb.http.filter.server.serverRestArgs.enabled", true).get(); + private static final Logger LOGGER = LoggerFactory.getLogger(ServerRestArgsFilter.class); + @Override public int getOrder() { return -100; @@ -74,10 +83,15 @@ public CompletableFuture beforeSendResponseAsync(Invocation invocation, Ht return responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult())); } - responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8"); + if (isServerSendEvent(response)) { + produceProcessor = new ProduceEventStreamProcessor(); + responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8"); + return writeServerSendEvent(invocation, response, produceProcessor, responseEx); + } + responseEx.setContentType(produceProcessor.getName() + "; charset=utf-8"); CompletableFuture future = new CompletableFuture<>(); - try (BufferOutputStream output = new BufferOutputStream(Unpooled.compositeBuffer())) { + try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) { if (failed) { produceProcessor.encodeResponse(output, ((InvocationException) response.getResult()).getErrorData()); } else { @@ -91,4 +105,76 @@ public CompletableFuture beforeSendResponseAsync(Invocation invocation, Ht } return future; } + + public static boolean isServerSendEvent(Response response) { + return response.getResult() instanceof Publisher; + } + + private static CompletableFuture writeServerSendEvent(Invocation invocation, Response response, + ProduceProcessor produceProcessor, HttpServletResponseEx responseEx) { + responseEx.setChunkedForEvent(true); + CompletableFuture result = new CompletableFuture<>(); + Publisher publisher = response.getResult(); + publisher.subscribe(new Subscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + subscription = s; + } + + @Override + public void onNext(Object o) { + try { + writeResponse(responseEx, produceProcessor, o, response).whenComplete((r, e) -> { + if (e != null) { + subscription.cancel(); + result.completeExceptionally(e); + return; + } + subscription.request(1); + }); + } catch (Throwable e) { + LOGGER.warn("Failed to subscribe event: {}", o, e); + result.completeExceptionally(e); + } + } + + @Override + public void onError(Throwable t) { + result.completeExceptionally(t); + } + + @Override + public void onComplete() { + result.complete(null); + } + }); + return result; + } + + private static CompletableFuture writeResponse( + HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, Object data, Response response) { + try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) { + produceProcessor.encodeResponse(output, data); + CompletableFuture result = new CompletableFuture<>(); + responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> { + if (e != null) { + result.completeExceptionally(e); + } + try { + responseEx.flushBuffer(); + } catch (IOException ex) { + LOGGER.warn("Failed to flush buffer for Server Send Events", ex); + } + }); + result.complete(response); + return result; + } catch (Throwable e) { + LOGGER.error("internal service error must be fixed.", e); + responseEx.setStatus(500); + return CompletableFuture.failedFuture(e); + } + } } diff --git a/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor b/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor index 9353b9dce7c..a05c6126d7f 100644 --- a/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor +++ b/common/common-rest/src/main/resources/META-INF/services/org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor @@ -16,4 +16,5 @@ # org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor -org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor \ No newline at end of file +org.apache.servicecomb.common.rest.codec.produce.ProduceTextPlainProcessor +org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor \ No newline at end of file diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java index 0d327ae0646..f0ac3345635 100644 --- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java +++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java @@ -916,7 +916,7 @@ public void setStatus(int sc) { } @Override - public void flushBuffer() { + public void endResponse() { endCount.value = endCount.value + 1; } @@ -956,7 +956,7 @@ public void setStatus(int sc) { } @Override - public void flushBuffer() { + public void endResponse() { endCount.value = endCount.value + 1; } diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java new file mode 100644 index 00000000000..c1cbf95b976 --- /dev/null +++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/produce/TestProduceEventStreamProcessor.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.common.rest.codec.produce; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Test; + +import com.fasterxml.jackson.databind.JavaType; + +import io.reactivex.rxjava3.core.Flowable; + +/** + * Test ProduceEventStreamProcessor + * + * @since 2025-08-19 + */ +public class TestProduceEventStreamProcessor { + @Test + public void doDecodeResponse() throws Exception { + doDecodeResponseTemplateComposite("\n"); + doDecodeResponseTemplateComposite("\r"); + doDecodeResponseTemplateComposite("\r\n"); + } + + private void doDecodeResponseTemplateComposite(String lineDelimiter) throws Exception { + doDecodeResponseTemplate(lineDelimiter, 2); + doDecodeResponseTemplate(lineDelimiter, 1); + doDecodeResponseTemplate(lineDelimiter, 0); + } + + private void doDecodeResponseTemplate(String lineDelimiter, int delimiterInTheFirstSegment) throws Exception { + final ProduceEventStreamProcessor processor = new ProduceEventStreamProcessor(); + final ByteArrayInputStream stream0 = prepareStream( + "id: 0" + lineDelimiter + "data: \"aaa\"" + lineDelimiter.repeat(delimiterInTheFirstSegment)); + final Object o0 = processor.doDecodeResponse(stream0, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + if (delimiterInTheFirstSegment > 1) { + checkDecodeResult(o0, new ItemChecker(0, Collections.singletonList("aaa"), null, null)); + } else { + checkDecodeResult(o0); + } + + final ByteArrayInputStream stream1 = prepareStream( + lineDelimiter.repeat(2 - delimiterInTheFirstSegment) + "id: 1" + lineDelimiter + "data: \"bbb\"" + + lineDelimiter); + final Object o1 = processor.doDecodeResponse(stream1, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + if (delimiterInTheFirstSegment > 1) { + checkDecodeResult(o1); + } else { + checkDecodeResult(o1, new ItemChecker(0, Collections.singletonList("aaa"), null, null)); + } + + final ByteArrayInputStream stream2 = prepareStream( + lineDelimiter + "id: 2" + lineDelimiter + "data: \"ccc\"" + lineDelimiter + "event: test" + lineDelimiter + + "retry: 123"); + final Object o2 = processor.doDecodeResponse(stream2, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o2, + new ItemChecker(1, Collections.singletonList("bbb"), null, null)); + + final ByteArrayInputStream stream3 = prepareStream( + lineDelimiter + lineDelimiter + "id: 3" + lineDelimiter + "data: \"ddd\"" + lineDelimiter + + "event: test3" + lineDelimiter + "retry: 321" + lineDelimiter); + final Object o3 = processor.doDecodeResponse(stream3, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o3, + new ItemChecker(2, Collections.singletonList("ccc"), "test", 123L)); + final Object o4 = ((Flowable) processor.close()).toList().blockingGet(); + checkDecodeResult(o4, new ItemChecker(3, Collections.singletonList("ddd"), "test3", 321L)); + } + + @Test + public void doDecodeResponseMultiData() throws Exception { + doDecodeResponseMultiDataTemplate("\n"); + doDecodeResponseMultiDataTemplate("\r"); + doDecodeResponseMultiDataTemplate("\r\n"); + } + + private void doDecodeResponseMultiDataTemplate(String lineDelimiter) throws Exception { + final ProduceEventStreamProcessor processor = new ProduceEventStreamProcessor(); + + final ByteArrayInputStream stream0 = prepareStream( + "id: 0" + lineDelimiter + "data: \"aaa1\"" + lineDelimiter + "data: \"aaa2\"" + lineDelimiter + lineDelimiter); + final Object o0 = processor.doDecodeResponse(stream0, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o0, new ItemChecker(0, Arrays.asList("aaa1", "aaa2"), null, null)); + + final ByteArrayInputStream stream1 = prepareStream( + "id: 1" + lineDelimiter + "data: \"aaa3\"" + lineDelimiter + "data: \"aaa4\"" + lineDelimiter + "data: \"aaa5\"" + + lineDelimiter + "data: \"aaa6\""); + final Object o1 = processor.doDecodeResponse(stream1, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o1); + final Object o2 = ((Flowable) processor.close()).toList().blockingGet(); + checkDecodeResult(o2, new ItemChecker(1, Arrays.asList("aaa3", "aaa4", "aaa5", "aaa6"), null, null)); + } + + @Test + public void doDecodeResponseMultiPackage() throws Exception { + doDecodeResponseMultiPackageTemplate("\n"); + doDecodeResponseMultiPackageTemplate("\r"); + doDecodeResponseMultiPackageTemplate("\r\n"); + } + + private void doDecodeResponseMultiPackageTemplate(String lineDelimiter) throws Exception { + final ProduceEventStreamProcessor processor = new ProduceEventStreamProcessor(); + + final ByteArrayInputStream stream0 = prepareStream( + "id: 0" + lineDelimiter + "data: \"aaa\"" + lineDelimiter + lineDelimiter + lineDelimiter); + final Object o0 = processor.doDecodeResponse(stream0, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o0, new ItemChecker(0, Collections.singletonList("aaa"), null, null)); + + final ByteArrayInputStream stream1 = prepareStream("id: 1" + lineDelimiter + "data: \"bbb\"" + lineDelimiter); + final Object o1 = processor.doDecodeResponse(stream1, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o1); + + final ByteArrayInputStream stream2 = prepareStream( + lineDelimiter + "id: 2" + lineDelimiter + "data: \"ccc\"" + lineDelimiter + "event: test" + lineDelimiter + + "retry: 123" + lineDelimiter + lineDelimiter + "id: 3" + lineDelimiter + "data: \"ddd\"" + lineDelimiter + + "event: test3" + lineDelimiter + "retry: 321" + lineDelimiter + lineDelimiter); + final Object o2 = processor.doDecodeResponse(stream2, + RestObjectMapperFactory.getRestObjectMapper().constructType(String.class)); + checkDecodeResult(o2, + new ItemChecker(1, Collections.singletonList("bbb"), null, null), + new ItemChecker(2, Collections.singletonList("ccc"), "test", 123L), + new ItemChecker(3, Collections.singletonList("ddd"), "test3", 321L)); + final Object o4 = ((Flowable) processor.close()).toList().blockingGet(); + checkDecodeResult(o4); + } + + @Test + public void doDecodeResponseHalfPackage() { + for (int splitIndexesCount = 1; splitIndexesCount < 3; splitIndexesCount++) { + for (int trailingDelimiterCount = 0; trailingDelimiterCount < 3; trailingDelimiterCount++) { + doDecodeResponseHalfPackageTemplate("\n", splitIndexesCount, trailingDelimiterCount); + doDecodeResponseHalfPackageTemplate("\r", splitIndexesCount, trailingDelimiterCount); + doDecodeResponseHalfPackageTemplate("\r\n", splitIndexesCount, trailingDelimiterCount); + } + } + } + + private void doDecodeResponseHalfPackageTemplate(String lineDelimiter, int splitIndexesCount, + int trailingDelimiterCount) { + final String messageTemplate = + "data: \"中文aaa\"" + lineDelimiter + + "id: 0" + lineDelimiter + + lineDelimiter + + "id: 1" + lineDelimiter + + "data: \"bbb中文\"" + lineDelimiter + + "data: \"123汉语\"" + lineDelimiter + + lineDelimiter + + "data: \"~!@#$%^&*()_+=-0987654321`中文[]{}\\\\|;':\\\",./<>?abc\"" + lineDelimiter + + "data: \"文字ccc\"" + lineDelimiter + + "data: \"中文321\"" + lineDelimiter + + "id: 2" + lineDelimiter + + "retry: 3600" + lineDelimiter + + "event: test" + lineDelimiter.repeat(trailingDelimiterCount); + final byte[] messageTemplateBytes = messageTemplate.getBytes(StandardCharsets.UTF_8); + + int[] splitIndexes = new int[splitIndexesCount]; + try { + runDecodeResponseHalfPackageTemplate(0, splitIndexes, messageTemplateBytes); + } catch (AssertionError e) { + throw new AssertionError(e.getMessage() + ", messageTemplate=[" + + messageTemplate + + "]", e); + } + } + + private void runDecodeResponseHalfPackageTemplate(int cursor, int[] splitIndexes, byte[] messageTemplateBytes) { + if (cursor != splitIndexes.length) { + for (int i = cursor == 0 ? 0 : splitIndexes[cursor - 1]; i <= messageTemplateBytes.length; ++i) { + splitIndexes[cursor] = i; + runDecodeResponseHalfPackageTemplate(cursor + 1, splitIndexes, messageTemplateBytes); + } + return; + } + + final List byteArrayInputStreams = splitByteArrayInputStreams( + splitIndexes, messageTemplateBytes); + + final ProduceEventStreamProcessor processor = new ProduceEventStreamProcessor(); + final JavaType javaType = RestObjectMapperFactory.getRestObjectMapper().constructType(String.class); + + final List> entityList = new ArrayList<>(); + for (ByteArrayInputStream byteArrayInputStream : byteArrayInputStreams) { + final List> entities; + try { + entities = (List>) processor.decodeResponse( + byteArrayInputStream, javaType); + } catch (Exception e) { + throw new RuntimeException(e); + } + entityList.addAll(entities); + } + try { + entityList.addAll(((Flowable>) processor.close()).toList().blockingGet()); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + checkDecodeResult(entityList, + new ItemChecker(0, Collections.singletonList("中文aaa"), null, null), + new ItemChecker(1, Arrays.asList("bbb中文", "123汉语"), null, null), + new ItemChecker(2, Arrays.asList("~!@#$%^&*()_+=-0987654321`中文[]{}\\|;':\",./<>?abc", + "文字ccc", "中文321"), "test", 3600L)); + } catch (AssertionError e) { + throw new AssertionError(e.getMessage() + System.lineSeparator() + + ", splitIndexes=" + Arrays.toString(splitIndexes), e); + } + } + + private List splitByteArrayInputStreams(int[] splitIndexes, byte[] messageTemplateBytes) { + int splitHeader = 0; + final List byteArrayInputStreams = new ArrayList<>(); + for (int split : splitIndexes) { + final byte[] segment = Arrays.copyOfRange(messageTemplateBytes, splitHeader, split); + final ByteArrayInputStream stream = new ByteArrayInputStream(segment); + byteArrayInputStreams.add(stream); + splitHeader = split; + } + + final byte[] segment = Arrays.copyOfRange(messageTemplateBytes, splitIndexes[splitIndexes.length - 1], + messageTemplateBytes.length); + final ByteArrayInputStream stream = new ByteArrayInputStream(segment); + byteArrayInputStreams.add(stream); + + return byteArrayInputStreams; + } + + private void checkDecodeResult(Object obj) { + MatcherAssert.assertThat(obj, Matchers.instanceOf(List.class)); + MatcherAssert.assertThat(((List) obj).size(), Matchers.equalTo(0)); + } + + private void checkDecodeResult(Object obj, ItemChecker... checkers) { + MatcherAssert.assertThat(obj, Matchers.instanceOf(List.class)); + final List objList = (List) obj; + MatcherAssert.assertThat("expect size of objList is " + checkers.length, + objList.size(), Matchers.equalTo(checkers.length)); + + for (int i = 0; i < objList.size(); i++) { + Object result = objList.get(i); + checkers[i] + .check(result); + } + } + + private ByteArrayInputStream prepareStream(String buffer) { + return new ByteArrayInputStream( + buffer.getBytes(StandardCharsets.UTF_8)); + } + + private static class ItemChecker { + private int id; + + private List expectDataList; + + private String event; + + private Long retry; + + public ItemChecker(int id, List expectDataList, String event, Long retry) { + this.id = id; + this.expectDataList = expectDataList; + this.event = event; + this.retry = retry; + } + + public void check(Object actualResult) { + final SseEventResponseEntity entity0 = ((SseEventResponseEntity) actualResult); + MatcherAssert.assertThat(entity0.getData(), Matchers.instanceOf(List.class)); + final List data = entity0.getData(); + MatcherAssert.assertThat("actual data = " + data.toString(), data.size(), + Matchers.equalTo(expectDataList.size())); + for (int i = 0; i < data.size(); i++) { + MatcherAssert.assertThat(data.get(i), Matchers.equalTo(expectDataList.get(i))); + } + MatcherAssert.assertThat("actual event = " + entity0.getEvent(), + entity0.getEvent(), event == null ? Matchers.nullValue() : Matchers.equalTo(event)); + MatcherAssert.assertThat("actual retry = " + entity0.getRetry(), + entity0.getRetry(), retry == null ? Matchers.nullValue() : Matchers.equalTo(retry)); + MatcherAssert.assertThat("actual id = " + entity0.getId(), + entity0.getId(), Matchers.equalTo(id)); + } + } +} diff --git a/demo/demo-cse-v2/gateway/pom.xml b/demo/demo-cse-v2/gateway/pom.xml index 3dc9ed6c3ca..4d491cdf8a3 100644 --- a/demo/demo-cse-v2/gateway/pom.xml +++ b/demo/demo-cse-v2/gateway/pom.xml @@ -33,6 +33,20 @@ org.apache.servicecomb java-chassis-spring-boot-starter-standalone + + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + + + org.apache.logging.log4j + log4j-to-slf4j + + org.apache.servicecomb diff --git a/demo/demo-cse-v2/provider/pom.xml b/demo/demo-cse-v2/provider/pom.xml index c5b57816f28..e186908d3bf 100644 --- a/demo/demo-cse-v2/provider/pom.xml +++ b/demo/demo-cse-v2/provider/pom.xml @@ -52,6 +52,10 @@ + + io.reactivex.rxjava3 + rxjava + diff --git a/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java new file mode 100644 index 00000000000..7cfdf6bdf5e --- /dev/null +++ b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/model/Model.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.demo.model; + +public class Model { + private String name; + + private int age; + + public Model() { + + } + + public Model(String name, int age) { + this.name = name; + this.age = age; + } + + public int getAge() { + return age; + } + + public Model setAge(int age) { + this.age = age; + return this; + } + + public String getName() { + return name; + } + + public Model setName(String name) { + this.name = name; + return this; + } +} diff --git a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java new file mode 100644 index 00000000000..4830f872cdd --- /dev/null +++ b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.springboot.springmvc.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.apache.servicecomb.demo.CategorizedTestCase; +import org.apache.servicecomb.demo.TestMgr; +import org.apache.servicecomb.demo.model.Model; +import org.apache.servicecomb.springboot.springmvc.client.ThirdSvcConfiguration.ReactiveStreamClient; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +@Component +public class ReactiveStreamIT implements CategorizedTestCase { + @Autowired + @Qualifier("reactiveStreamProvider") + ReactiveStreamClient reactiveStreamProvider; + + @Override + public void testRestTransport() throws Exception { + testSseString(reactiveStreamProvider); + testSseStringWithParam(reactiveStreamProvider); + testSseModel(reactiveStreamProvider); + testSseResponseEntity(reactiveStreamProvider); + testSseMultipleData(reactiveStreamProvider); + } + + private void testSseString(ReactiveStreamClient client) throws Exception { + Publisher result = client.sseString(); + TestMgr.check("abc", buildBufferString(result)); + } + + private void testSseStringWithParam(ReactiveStreamClient client) throws Exception { + Publisher result = client.sseStringWithParam("d"); + TestMgr.check("abcd", buildBufferString(result)); + } + + private String buildBufferString(Publisher result) throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(1); + StringBuilder buffer = new StringBuilder(); + result.subscribe(new Subscriber<>() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(String s) { + buffer.append(s); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + subscription.cancel(); + countDownLatch.countDown(); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + return buffer.toString(); + } + + private void testSseModel(ReactiveStreamClient client) throws Exception { + Publisher result = client.sseModel(); + CountDownLatch countDownLatch = new CountDownLatch(1); + StringBuilder buffer = new StringBuilder(); + result.subscribe(new Subscriber<>() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(Model model) { + buffer.append(model.getName()).append(model.getAge()); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + subscription.cancel(); + countDownLatch.countDown(); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check("jack0jack1jack2jack3jack4", buffer.toString()); + } + + private void testSseResponseEntity(ReactiveStreamClient client) throws Exception { + Publisher> result = client.sseResponseEntity(); + CountDownLatch countDownLatch = new CountDownLatch(1); + StringBuilder buffer = new StringBuilder(); + result.subscribe(new Subscriber<>() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(SseEventResponseEntity responseEntity) { + if (!StringUtils.isEmpty(responseEntity.getEvent())) { + buffer.append(responseEntity.getEvent()); + } + for (Model model : responseEntity.getData()) { + buffer.append(model.getName()).append(model.getAge()); + } + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + subscription.cancel(); + countDownLatch.countDown(); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check("test0jack0test1jack1test2jack2", buffer.toString()); + } + + private void testSseMultipleData(ReactiveStreamClient client) throws Exception { + Publisher> result = client.sseMultipleData(); + CountDownLatch countDownLatch = new CountDownLatch(1); + StringBuilder buffer = new StringBuilder(); + result.subscribe(new Subscriber<>() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(SseEventResponseEntity responseEntity) { + if (!StringUtils.isEmpty(responseEntity.getEvent())) { + buffer.append(responseEntity.getEvent()); + } + for (Model model : responseEntity.getData()) { + buffer.append(model.getName()).append(model.getAge()); + } + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + subscription.cancel(); + countDownLatch.countDown(); + } + + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }); + countDownLatch.await(10, TimeUnit.SECONDS); + TestMgr.check("test0jack0tom0test1jack1tom1test2jack2tom2", buffer.toString()); + } +} diff --git a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java new file mode 100644 index 00000000000..3bd1986d9b3 --- /dev/null +++ b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ThirdSvcConfiguration.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.springboot.springmvc.client; + +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.apache.servicecomb.demo.model.Model; +import org.apache.servicecomb.provider.pojo.Invoker; +import org.reactivestreams.Publisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; + +@Configuration +public class ThirdSvcConfiguration { + @RequestMapping(path = "/") + public interface ReactiveStreamClient { + @GetMapping("/sseString") + Publisher sseString(); + + @GetMapping("/sseStringWithParam") + Publisher sseStringWithParam(String param); + + @GetMapping("/sseModel") + Publisher sseModel(); + + @GetMapping("/sseResponseEntity") + Publisher> sseResponseEntity(); + + @GetMapping("/sseMultipleData") + Publisher> sseMultipleData(); + } + + @Bean("reactiveStreamProvider") + public ReactiveStreamClient reactiveStreamProvider() { + return Invoker.createProxy("springmvc", "ReactiveStreamController", ReactiveStreamClient.class); + } +} diff --git a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java new file mode 100644 index 00000000000..323d3fbc307 --- /dev/null +++ b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-server/src/main/java/org/apache/servicecomb/springboot/springmvc/server/ReactiveStreamController.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.springboot.springmvc.server; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.apache.servicecomb.demo.model.Model; +import org.apache.servicecomb.provider.rest.common.RestSchema; +import org.reactivestreams.Publisher; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; + +import io.reactivex.rxjava3.core.Flowable; + +@RestSchema(schemaId = "ReactiveStreamController") +@RequestMapping(path = "/") +public class ReactiveStreamController { + @GetMapping("/sseString") + public Publisher sseString() { + return Flowable.fromArray("a", "b", "c"); + } + + @GetMapping("/sseStringWithParam") + public Publisher sseStringWithParam(@RequestParam(name = "param") String param) { + return Flowable.fromArray("a", "b", "c", param); + }; + + @GetMapping("/sseModel") + public Publisher sseModel() { + return Flowable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS) + .map(item -> new Model("jack", item.intValue())); + } + + @GetMapping("/sseResponseEntity") + public Publisher> sseResponseEntity() { + AtomicInteger index = new AtomicInteger(0); + return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS) + .map(item -> new SseEventResponseEntity() + .event("test" + index) + .id(index.getAndIncrement()) + .retry(System.currentTimeMillis()) + .data(new Model("jack", item.intValue()))); + } + + @GetMapping("/sseMultipleData") + public Publisher> sseMultipleData() { + AtomicInteger index = new AtomicInteger(0); + return Flowable.intervalRange(0, 3, 0, 1, TimeUnit.SECONDS) + .map(item -> new SseEventResponseEntity() + .event("test" + index) + .id(index.getAndIncrement()) + .retry(System.currentTimeMillis()) + .data(new Model("jack", item.intValue())) + .data(new Model("tom", item.intValue()))); + } +} diff --git a/dependencies/default/pom.xml b/dependencies/default/pom.xml index 3132ff8ac66..efeb106ed54 100644 --- a/dependencies/default/pom.xml +++ b/dependencies/default/pom.xml @@ -107,6 +107,8 @@ 4.0.2 2.3.1 3.0.9 + 1.0.4 + 3.1.10 ${basedir}/../.. @@ -397,6 +399,21 @@ + + org.reactivestreams + reactive-streams + ${reactive-streams.version} + + + io.reactivex.rxjava3 + rxjava + ${rxjava3.version} + + + io.vertx + vertx-rx-java3 + ${vertx.version} + org.apache.maven diff --git a/foundations/foundation-vertx/pom.xml b/foundations/foundation-vertx/pom.xml index f9bcd9a25fa..c0f80507bc5 100644 --- a/foundations/foundation-vertx/pom.xml +++ b/foundations/foundation-vertx/pom.xml @@ -43,6 +43,10 @@ org.apache.servicecomb foundation-common + + io.vertx + vertx-rx-java3 + io.vertx diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java index 5d363e874d9..d8800800ff6 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java @@ -144,7 +144,7 @@ private boolean writeToBufferQueue(AbstractTcpClientPackage tcpClientPackage) { if (Status.WORKING.equals(status)) { // encode in sender thread try (TcpOutputStream os = tcpClientPackage.createStream()) { - write(os.getByteBuf()); + write(os.getBuffer()); tcpClientPackage.finishWriteToBuffer(); } return true; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java index 2a0559baba6..fc1fc17df89 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/AbstractHttpServletResponse.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import io.vertx.core.buffer.Buffer; import jakarta.servlet.ServletOutputStream; import jakarta.servlet.http.Cookie; import jakarta.servlet.http.Part; @@ -85,7 +86,7 @@ public int getBufferSize() { @Override public void flushBuffer() throws IOException { - throw new Error("not supported method"); + // for vert.x do noting } @Override @@ -222,4 +223,14 @@ public Object getAttribute(String key) { public CompletableFuture sendPart(Part body) { throw new Error("not supported method"); } + + @Override + public CompletableFuture sendBuffer(Buffer buffer) { + throw new Error("not supported method"); + } + + @Override + public void endResponse() throws IOException { + throw new Error("not supported method"); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java index 265ee658ec1..dfad8711a00 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/HttpServletResponseEx.java @@ -17,12 +17,15 @@ package org.apache.servicecomb.foundation.vertx.http; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.Part; import jakarta.ws.rs.core.Response.StatusType; +import io.reactivex.rxjava3.core.Flowable; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpHeaders; public interface HttpServletResponseEx extends HttpServletResponse, BodyBufferSupport { @@ -37,4 +40,16 @@ public interface HttpServletResponseEx extends HttpServletResponse, BodyBufferSu default void setChunked(boolean chunked) { setHeader(HttpHeaders.TRANSFER_ENCODING.toString(), HttpHeaders.CHUNKED.toString()); } + + CompletableFuture sendBuffer(Buffer buffer); + + default Flowable getFlowableBuffer() { + return null; + } + + void endResponse() throws IOException; + + default void setChunkedForEvent(boolean chunked) { + // not set header transfer-encoding=chunked in Rest Over Servlet, or will have Multiple in response. + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java index fbe459745e9..a227560cd28 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/StandardHttpServletResponseEx.java @@ -83,7 +83,7 @@ public StatusType getStatusType() { } @Override - public void flushBuffer() throws IOException { + public void endResponse() throws IOException { byte[] bytes = getBodyBytes(); if (bytes != null) { getOutputStream().write(bytes, 0, getBodyBytesLength()); @@ -122,4 +122,16 @@ public CompletableFuture sendPart(Part part) { Context context = Vertx.currentContext(); return new PumpFromPart(context, part).toOutputStream(outputStream, false); } + + @Override + public CompletableFuture sendBuffer(Buffer buffer) { + CompletableFuture future = new CompletableFuture<>(); + try { + getOutputStream().write(buffer.getBytes(), 0, buffer.length()); + future.complete(null); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java index da7e532174b..0d8eaef9f0e 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxClientResponseToHttpServletResponse.java @@ -24,6 +24,7 @@ import org.apache.servicecomb.foundation.common.http.HttpStatus; +import io.reactivex.rxjava3.core.Flowable; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientResponse; @@ -32,11 +33,18 @@ public class VertxClientResponseToHttpServletResponse extends AbstractHttpServle private StatusType statusType; + private Flowable flowableBuffer; + public VertxClientResponseToHttpServletResponse(HttpClientResponse clientResponse, Buffer bodyBuffer) { this.clientResponse = clientResponse; setBodyBuffer(bodyBuffer); } + public VertxClientResponseToHttpServletResponse(HttpClientResponse clientResponse, Flowable buffer) { + this.clientResponse = clientResponse; + this.flowableBuffer = buffer; + } + @Override public int getStatus() { return clientResponse.statusCode(); @@ -69,4 +77,9 @@ public Collection getHeaders(String name) { public Collection getHeaderNames() { return clientResponse.headers().names(); } + + @Override + public Flowable getFlowableBuffer() { + return flowableBuffer; + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java index e0c518b291b..5cb640f38f7 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerResponseToHttpServletResponse.java @@ -17,6 +17,7 @@ package org.apache.servicecomb.foundation.vertx.http; +import java.io.IOException; import java.util.Collection; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,7 @@ import io.vertx.core.Context; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerResponse; public class VertxServerResponseToHttpServletResponse extends AbstractHttpServletResponse { @@ -100,7 +102,7 @@ public Collection getHeaderNames() { } @Override - public void flushBuffer() { + public void endResponse() { if (context == Vertx.currentContext()) { internalFlushBuffer(); return; @@ -134,4 +136,26 @@ public CompletableFuture sendPart(Part part) { public void setChunked(boolean chunked) { serverResponse.setChunked(chunked); } + + @Override + public CompletableFuture sendBuffer(Buffer buffer) { + if (serverResponse.closed()) { + return CompletableFuture.failedFuture(new IOException("Response is closed before sending any data. " + + "Maybe client is timeout or check idle connection timeout for provider is properly configured.")); + } + CompletableFuture future = new CompletableFuture<>(); + serverResponse.write(buffer).onComplete(result -> { + if (result.failed()) { + future.completeExceptionally(result.cause()); + } else { + future.complete(null); + } + }); + return future; + } + + @Override + public void setChunkedForEvent(boolean chunked) { + this.setChunked(chunked); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java index f063d5d0656..42c9b31261d 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/BufferOutputStream.java @@ -20,9 +20,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import io.netty.buffer.ByteBuf; import io.vertx.core.buffer.Buffer; -import io.vertx.core.buffer.impl.VertxByteBufAllocator; /** * BufferOutputStream. @@ -32,40 +30,36 @@ public class BufferOutputStream extends OutputStream { private static final int DIRECT_BUFFER_SIZE = 1024; - protected ByteBuf byteBuf; + protected Buffer byteBuf; public BufferOutputStream() { - this(VertxByteBufAllocator.DEFAULT.heapBuffer(DIRECT_BUFFER_SIZE, Integer.MAX_VALUE)); + this(Buffer.buffer(DIRECT_BUFFER_SIZE)); } - public BufferOutputStream(ByteBuf buffer) { + public BufferOutputStream(Buffer buffer) { this.byteBuf = buffer; } - public ByteBuf getByteBuf() { - return byteBuf; - } - public Buffer getBuffer() { - return Buffer.buffer(byteBuf); + return byteBuf; } public int length() { - return byteBuf.readableBytes(); + return byteBuf.length(); } public void writeByte(byte value) { - byteBuf.writeByte(value); + byteBuf.appendByte(value); } // 实际是写byte @Override public void write(int byteValue) { - byteBuf.writeByte((byte) byteValue); + byteBuf.appendByte((byte) byteValue); } public void write(boolean value) { - byteBuf.writeBoolean(value); + byteBuf.appendByte(value ? (byte) 1 : (byte) 0); } public void writeInt(int pos, int value) { @@ -73,20 +67,20 @@ public void writeInt(int pos, int value) { } public void writeShort(short value) { - byteBuf.writeShort(value); + byteBuf.appendShort(value); } public void writeInt(int value) { - byteBuf.writeInt(value); + byteBuf.appendInt(value); } public void writeLong(long value) { - byteBuf.writeLong(value); + byteBuf.appendLong(value); } public void writeString(String value) { - byteBuf.writeInt(value.length()); - byteBuf.writeCharSequence(value, StandardCharsets.UTF_8); + writeInt(value.length()); + byteBuf.appendString(value, StandardCharsets.UTF_8.toString()); } @Override @@ -96,7 +90,7 @@ public void write(byte[] b) { @Override public void write(byte[] bytes, int offset, int len) { - byteBuf.writeBytes(bytes, offset, len); + byteBuf.appendBytes(bytes, offset, len); } @Override @@ -105,6 +99,6 @@ public void close() { } public int writerIndex() { - return byteBuf.writerIndex(); + return byteBuf.length(); } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java index 95f460013d2..396cf163400 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/tcp/TcpConnection.java @@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.vertx.core.Context; @@ -49,7 +48,7 @@ public class TcpConnection { // so this optimization: // 1.avoid vertx's lock // 2.reduce netty's task schedule - private final Queue writeQueue = new ConcurrentLinkedQueue<>(); + private final Queue writeQueue = new ConcurrentLinkedQueue<>(); private final AtomicLong writeQueueSize = new AtomicLong(); @@ -83,7 +82,7 @@ public void initNetSocket(NetSocketImpl netSocket) { this.context = netSocket.getContext(); } - public void write(ByteBuf buf) { + public void write(Buffer buf) { writeQueue.add(buf); long oldSize = writeQueueSize.getAndIncrement(); if (oldSize == 0) { @@ -99,13 +98,13 @@ protected void scheduleWrite() { protected void writeInContext() { CompositeByteBuf cbb = ByteBufAllocator.DEFAULT.compositeBuffer(); for (; ; ) { - ByteBuf buf = writeQueue.poll(); + Buffer buf = writeQueue.poll(); if (buf == null) { break; } writeQueueSize.decrementAndGet(); - cbb.addComponent(true, buf); + cbb.addComponent(true, buf.getByteBuf()); if (cbb.numComponents() == cbb.maxNumComponents()) { CompositeByteBuf last = cbb; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java index 2c9fdc895cc..cd468a93567 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestStream.java @@ -53,7 +53,7 @@ public void testBufferOutputStream() { Assertions.assertTrue((1 < oBufferOutputStream.length())); @SuppressWarnings("resource") - BufferInputStream oBufferInputStream = new BufferInputStream(oBufferOutputStream.getByteBuf()); + BufferInputStream oBufferInputStream = new BufferInputStream(oBufferOutputStream.getBuffer().getByteBuf()); Assertions.assertEquals("test", oBufferInputStream.readString()); Assertions.assertEquals(1, oBufferInputStream.readByte()); Assertions.assertEquals(true, oBufferInputStream.readBoolean()); diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java index 0767898f2d4..a10a0c19325 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java @@ -27,11 +27,11 @@ import org.junit.Test; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; import io.vertx.core.net.impl.NetSocketImpl; import mockit.Deencapsulation; @@ -90,14 +90,14 @@ public void send_inWorkingStatus(@Mocked AbstractTcpClientPackage tcpClientPacka Deencapsulation.setField(tcpClientConnection, "status", Status.WORKING); long msgId = 1; - ByteBuf byteBuf = Unpooled.buffer(); + Buffer byteBuf = Buffer.buffer(); new Expectations(tcpClientConnection) { { tcpClientPackage.getMsgId(); result = msgId; tcpClientPackage.createStream(); result = tcpOutputStream; - tcpOutputStream.getByteBuf(); + tcpOutputStream.getBuffer(); result = byteBuf; } }; @@ -172,7 +172,7 @@ public void send_disconnectedToWorking(@Mocked AbstractTcpClientPackage tcpClien { tcpClientPackage.getMsgId(); result = msgId; - tcpClientConnection.write((ByteBuf) any); + tcpClientConnection.write((Buffer) any); } }; new MockUp(context) { diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java index e6083edd0f8..053cc56e7de 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestAbstractHttpServletResponse.java @@ -91,8 +91,7 @@ public void testGetBufferSize() { @Test public void testFlushBuffer() { - Error error = Assertions.assertThrows(Error.class, () -> response.flushBuffer()); - checkError(error); + Assertions.assertDoesNotThrow(() -> response.flushBuffer()); } @Test diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java index c672fb28879..5c34814f1d3 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestStandardHttpServletResponseEx.java @@ -74,7 +74,7 @@ public void getBodyBytesLength() { } @Test - public void flushBuffer() throws IOException { + public void endResponse() throws IOException { Buffer buffer = Buffer.buffer(); ServletOutputStream output = new ServletOutputStream() { @Override @@ -97,12 +97,12 @@ public void write(int b) { responseEx = new StandardHttpServletResponseEx(response); // no body - responseEx.flushBuffer(); + responseEx.endResponse(); Assertions.assertEquals(0, buffer.length()); Buffer body = Buffer.buffer().appendString("body"); responseEx.setBodyBuffer(body); - responseEx.flushBuffer(); + responseEx.endResponse(); Assertions.assertEquals("body", buffer.toString()); } diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java index e438da8b024..5778091aceb 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java @@ -253,21 +253,21 @@ public void getHeaderNames() { } @Test - public void flushBuffer_sameContext() throws IOException { - response.flushBuffer(); + public void endResponse_sameContext() throws IOException { + response.endResponse(); Assertions.assertFalse(runOnContextInvoked); } @Test - public void flushBuffer_diffContext() throws IOException { + public void endResponse_diffContext() throws IOException { new Expectations() { { Vertx.currentContext(); result = null; } }; - response.flushBuffer(); + response.endResponse(); Assertions.assertTrue(runOnContextInvoked); } diff --git a/swagger/swagger-generator/generator-core/pom.xml b/swagger/swagger-generator/generator-core/pom.xml index c0aa020363a..4a58258000e 100644 --- a/swagger/swagger-generator/generator-core/pom.xml +++ b/swagger/swagger-generator/generator-core/pom.xml @@ -51,6 +51,10 @@ com.google.inject guice + + org.reactivestreams + reactive-streams + org.junit.jupiter diff --git a/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java b/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java new file mode 100644 index 00000000000..2f59fcb64a2 --- /dev/null +++ b/swagger/swagger-generator/generator-core/src/main/java/org/apache/servicecomb/swagger/generator/core/processor/response/PublisherProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.swagger.generator.core.processor.response; + +import java.lang.reflect.Type; +import java.util.List; + +import org.apache.servicecomb.swagger.generator.OperationGenerator; +import org.apache.servicecomb.swagger.generator.SwaggerGenerator; +import org.apache.servicecomb.swagger.jakarta.ModelConvertersAdapterJakarta; +import org.reactivestreams.Publisher; + +import io.swagger.models.Model; +import jakarta.ws.rs.core.MediaType; + +public class PublisherProcessor extends DefaultResponseTypeProcessor { + protected static final List EVENTS_PRODUCE = List.of(MediaType.SERVER_SENT_EVENTS); + + public PublisherProcessor() { + extractActualType = true; + } + + @Override + public Type getProcessType() { + return Publisher.class; + } + + @Override + public Model process(SwaggerGenerator swaggerGenerator, OperationGenerator operationGenerator, + Type genericResponseType) { + ModelConvertersAdapterJakarta.getInstance().addClassToSkip(Publisher.class.getName()); + operationGenerator.getOperation().produces(EVENTS_PRODUCE); + return super.process(swaggerGenerator, operationGenerator, genericResponseType); + } +} diff --git a/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor b/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor index 53678b70263..9abf37e8c9d 100644 --- a/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor +++ b/swagger/swagger-generator/generator-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor @@ -17,3 +17,4 @@ org.apache.servicecomb.swagger.generator.core.processor.response.CompletableFutureProcessor org.apache.servicecomb.swagger.generator.core.processor.response.OptionalProcessor +org.apache.servicecomb.swagger.generator.core.processor.response.PublisherProcessor diff --git a/swagger/swagger-invocation/invocation-core/pom.xml b/swagger/swagger-invocation/invocation-core/pom.xml index 1680a004422..3bb762136dd 100644 --- a/swagger/swagger-invocation/invocation-core/pom.xml +++ b/swagger/swagger-invocation/invocation-core/pom.xml @@ -31,6 +31,14 @@ org.apache.servicecomb swagger-generator-core + + io.vertx + vertx-rx-java3 + + + io.vertx + vertx-core + org.apache.servicecomb diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java new file mode 100644 index 00000000000..636af6558d9 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.swagger.invocation.response.consumer; + +import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; + +import io.reactivex.rxjava3.core.Flowable; + +public class PublisherConsumerResponseMapper implements ConsumerResponseMapper { + private final boolean shouldExtractEntity; + + public PublisherConsumerResponseMapper(boolean shouldExtractEntity) { + this.shouldExtractEntity = shouldExtractEntity; + } + + @Override + public Object mapResponse(Response response) { + Flowable> flowable = response.getResult(); + if (shouldExtractEntity) { + return flowable.concatMap(entity -> Flowable.fromIterable(entity.getData())); + } + return flowable; + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java new file mode 100644 index 00000000000..e7dd6d63c20 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/consumer/PublisherConsumerResponseMapperFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.swagger.invocation.response.consumer; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +import org.apache.servicecomb.swagger.invocation.response.ResponseMapperFactorys; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.reactivestreams.Publisher; + +public class PublisherConsumerResponseMapperFactory implements ConsumerResponseMapperFactory { + @Override + public boolean isMatch(Type consumerType) { + if (!ParameterizedType.class.isAssignableFrom(consumerType.getClass())) { + return false; + } + + return ((ParameterizedType) consumerType).getRawType().equals(Publisher.class); + } + + @Override + public ConsumerResponseMapper createResponseMapper(ResponseMapperFactorys factorys, + Type consumerType) { + Type realConsumerType = ((ParameterizedType) consumerType).getActualTypeArguments()[0]; + return new PublisherConsumerResponseMapper( + !realConsumerType.getTypeName().contains(SseEventResponseEntity.class.getName())); + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java new file mode 100644 index 00000000000..c5f56bb53d6 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.swagger.invocation.response.producer; + +import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; + +import io.reactivex.rxjava3.core.Flowable; +import jakarta.ws.rs.core.Response.StatusType; + +public class PublisherProducerResponseMapper implements ProducerResponseMapper { + private final boolean shouldConstructEntity; + + public PublisherProducerResponseMapper(boolean shouldConstructEntity) { + this.shouldConstructEntity = shouldConstructEntity; + } + + @Override + public Response mapResponse(StatusType status, Object result) { + if (shouldConstructEntity) { + Flowable> responseEntity = ((Flowable) result).map(obj -> + new SseEventResponseEntity<>() + .data(obj)); + return Response.create(status, responseEntity); + } + return Response.create(status, result); + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java new file mode 100644 index 00000000000..9b35751f436 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapperFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.swagger.invocation.response.producer; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +import org.apache.servicecomb.swagger.invocation.response.ResponseMapperFactorys; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.reactivestreams.Publisher; + +public class PublisherProducerResponseMapperFactory implements ProducerResponseMapperFactory { + @Override + public boolean isMatch(Type producerType) { + if (!ParameterizedType.class.isAssignableFrom(producerType.getClass())) { + return false; + } + + return ((ParameterizedType) producerType).getRawType().equals(Publisher.class); + } + + @Override + public ProducerResponseMapper createResponseMapper(ResponseMapperFactorys factorys, + Type providerType) { + Type realProducerType = ((ParameterizedType) providerType).getActualTypeArguments()[0]; + return new PublisherProducerResponseMapper( + !realProducerType.getTypeName().contains(SseEventResponseEntity.class.getName())); + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java new file mode 100644 index 00000000000..923949de6a8 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntity.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.swagger.invocation.sse; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import io.swagger.annotations.ApiModelProperty; + +public class SseEventResponseEntity { + private static final Logger LOGGER = LoggerFactory.getLogger(SseEventResponseEntity.class); + + /** + * event id + */ + private Integer id; + + /** + * event type + */ + private String event; + + /** + * reconnection time + */ + private Long retry; + + /** + * business data + */ + private List datas = new ArrayList<>(); + + public SseEventResponseEntity id(int id) { + if (this.id != null) { + LOGGER.warn("origin id: [{}] is exists, overridden by the current value: [{}]", this.id, id); + } + this.id = id; + return this; + } + + public SseEventResponseEntity event(String event) { + if (!StringUtils.isEmpty(this.event)) { + LOGGER.warn("origin event: [{}] is exists, overridden by the current value: [{}]", this.event, event); + } + this.event = event; + return this; + } + + public SseEventResponseEntity retry(long retry) { + if (this.retry != null) { + LOGGER.warn("origin retry: [{}] is exists, overridden by the current value: [{}]", this.retry, retry); + } + this.retry = retry; + return this; + } + + public SseEventResponseEntity data(T data) { + if (data == null) { + LOGGER.warn("The data content cannot be null!"); + } else { + datas.add(data); + } + return this; + } + + public Integer getId() { + return id; + } + + public String getEvent() { + return event; + } + + public Long getRetry() { + return retry; + } + + public List getData() { + return datas; + } + + @JsonIgnore + @ApiModelProperty(hidden = true) + public boolean isEmpty() { + return id == null + && event == null + && retry == null + && datas.isEmpty(); + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java new file mode 100644 index 00000000000..a72161cf258 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/sse/SseEventResponseEntityProcessor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.swagger.invocation.sse; + +import org.apache.servicecomb.swagger.generator.core.processor.response.DefaultResponseTypeProcessor; + +public class SseEventResponseEntityProcessor extends DefaultResponseTypeProcessor { + public SseEventResponseEntityProcessor() { + extractActualType = true; + } + + @Override + public Class getProcessType() { + return SseEventResponseEntity.class; + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor index bb2abfe5016..965674ee080 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor +++ b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.generator.ResponseTypeProcessor @@ -17,3 +17,4 @@ org.apache.servicecomb.swagger.invocation.generator.ScbResponseProcessor org.apache.servicecomb.swagger.invocation.ws.ServerWebSocketResponseProcessor +org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntityProcessor diff --git a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory index 0474058a9e3..5092ed6b32c 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory +++ b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.consumer.ConsumerResponseMapperFactory @@ -19,3 +19,4 @@ org.apache.servicecomb.swagger.invocation.response.consumer.CseResponseConsumerR org.apache.servicecomb.swagger.invocation.response.consumer.CompletableFutureConsumerResponseMapperFactory org.apache.servicecomb.swagger.invocation.response.consumer.DefaultConsumerResponseMapperFactory org.apache.servicecomb.swagger.invocation.response.consumer.OptionalConsumerResponseMapperFactory +org.apache.servicecomb.swagger.invocation.response.consumer.PublisherConsumerResponseMapperFactory \ No newline at end of file diff --git a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory index dac2a90e393..bf675cf9629 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory +++ b/swagger/swagger-invocation/invocation-core/src/main/resources/META-INF/services/org.apache.servicecomb.swagger.invocation.response.producer.ProducerResponseMapperFactory @@ -19,3 +19,4 @@ org.apache.servicecomb.swagger.invocation.response.producer.CseResponseProducerR org.apache.servicecomb.swagger.invocation.response.producer.CompletableFutureProducerResponseMapperFactory org.apache.servicecomb.swagger.invocation.response.producer.DefaultProducerResponseMapperFactory org.apache.servicecomb.swagger.invocation.response.producer.OptionalProducerResponseMapperFactory +org.apache.servicecomb.swagger.invocation.response.producer.PublisherProducerResponseMapperFactory \ No newline at end of file diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java index 36e0af59835..f973f9e1c75 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java @@ -58,6 +58,6 @@ private void logException(Throwable throwable) { @Override protected void sendResponse(Invocation invocation, Response response) { HighwayTransportContext transportContext = invocation.getTransportContext(); - connection.write(transportContext.getResponseBuffer().getByteBuf()); + connection.write(transportContext.getResponseBuffer()); } } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java index 98fab636217..0c9e8e79b99 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java @@ -153,7 +153,7 @@ private void sendResponse(Map context, Response response) { respBuffer = HighwayCodec.encodeResponse(msgId, header, bodySchema, response.getResult()); } invocation.getInvocationStageTrace().finishServerFiltersResponse(); - connection.write(respBuffer.getByteBuf()); + connection.write(respBuffer); } catch (Exception e) { // keep highway performance and simple, this encoding/decoding error not need handle by client String msg = String.format("encode response failed, %s, msgId=%d", diff --git a/transports/transport-rest/transport-rest-client/pom.xml b/transports/transport-rest/transport-rest-client/pom.xml index 0a22aaf8c80..c2265042ae8 100644 --- a/transports/transport-rest/transport-rest-client/pom.xml +++ b/transports/transport-rest/transport-rest-client/pom.xml @@ -40,6 +40,10 @@ org.apache.servicecomb common-rest + + io.vertx + vertx-rx-java3 + io.vertx diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java index af5ee1d10b9..7e37ebc9ac2 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java @@ -18,11 +18,14 @@ package org.apache.servicecomb.transport.rest.client.http; import java.util.Collection; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; import org.apache.servicecomb.common.rest.RestConst; +import org.apache.servicecomb.common.rest.codec.produce.ProduceEventStreamProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager; import org.apache.servicecomb.common.rest.definition.RestOperationMeta; @@ -35,12 +38,17 @@ import org.apache.servicecomb.swagger.invocation.context.HttpStatus; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; +import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JavaType; import com.netflix.config.DynamicPropertyFactory; +import io.reactivex.rxjava3.core.Flowable; +import io.vertx.core.buffer.Buffer; + public class DefaultHttpClientFilter implements HttpClientFilter { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHttpClientFilter.class); @@ -76,6 +84,7 @@ protected ProduceProcessor findProduceProcessor(RestOperationMeta restOperation, if (idx != -1) { contentTypeForFind = contentType.substring(0, idx); } + return restOperation.findProduceProcessor(contentTypeForFind); } @@ -84,12 +93,11 @@ protected Response extractResponse(Invocation invocation, HttpServletResponseEx if (result != null) { return Response.create(responseEx.getStatusType(), result); } - OperationMeta operationMeta = invocation.getOperationMeta(); JavaType responseType = invocation.findResponseType(responseEx.getStatus()); RestOperationMeta swaggerRestOperation = operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION); ProduceProcessor produceProcessor = findProduceProcessor(swaggerRestOperation, responseEx); - if (produceProcessor == null) { + if (produceProcessor == null && !isEventStream(responseEx)) { // This happens outside the runtime such as Servlet filter response. Here we give a default json parser to it // and keep user data not get lose. LOGGER.warn("Response content-type {} is not supported. Method {}, path {}, statusCode {}, reasonPhrase {}.", @@ -103,7 +111,16 @@ protected Response extractResponse(Invocation invocation, HttpServletResponseEx } try { - result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(), responseType); + if (responseEx.getFlowableBuffer() == null) { + result = produceProcessor.decodeResponse(responseEx.getBodyBuffer(), responseType); + } else { + Flowable flowable = responseEx.getFlowableBuffer(); + ProduceEventStreamProcessor finalProduceProcessor = new ProduceEventStreamProcessor(); + result = flowable.concatMap(buffer -> extractFlowableBody(finalProduceProcessor, responseType, buffer)) + .doFinally(finalProduceProcessor::close) + .doOnCancel(finalProduceProcessor::close) + .filter(Objects::nonNull); + } Response response = Response.create(responseEx.getStatusType(), result); if (response.isFailed()) { LOGGER.warn("invoke operation [{}] failed, status={}, msg={}", invocation.getMicroserviceQualifiedName(), @@ -130,6 +147,16 @@ protected Response extractResponse(Invocation invocation, HttpServletResponseEx } } + private boolean isEventStream(HttpServletResponseEx responseEx) { + return responseEx.getHeader(HttpHeaders.CONTENT_TYPE) != null + && responseEx.getHeader(HttpHeaders.CONTENT_TYPE).contains(MediaType.SERVER_SENT_EVENTS); + } + + protected Publisher> extractFlowableBody(ProduceEventStreamProcessor produceProcessor, + JavaType responseType, Buffer buffer) throws Exception { + return produceProcessor.decodeResponse(buffer, responseType); + } + @Override public Response afterReceiveResponse(Invocation invocation, HttpServletResponseEx responseEx) { Response response = extractResponse(invocation, responseEx); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java index f749f1173d0..1cb7c20e71a 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; @@ -53,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.reactivex.rxjava3.core.Flowable; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; @@ -60,6 +62,7 @@ import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.RequestOptions; +import io.vertx.rxjava3.FlowableHelper; public class RestClientInvocation { private static final Logger LOGGER = LoggerFactory.getLogger(RestClientInvocation.class); @@ -87,6 +90,8 @@ public class RestClientInvocation { private boolean alreadyFailed = false; + protected static final String EVENTS_MEDIA_TYPE = MediaType.SERVER_SENT_EVENTS; + public RestClientInvocation(HttpClientWithContext httpClientWithContext, List httpClientFilters) { this.httpClientWithContext = httpClientWithContext; this.httpClientFilters = httpClientFilters; @@ -218,6 +223,11 @@ protected void handleResponse(HttpClientResponse httpClientResponse) { return; } + if (isServerSendEvents(httpClientResponse)) { + processFlowableResponseBody(FlowableHelper.toFlowable(httpClientResponse)); + return; + } + httpClientResponse.exceptionHandler(e -> { invocation.getTraceIdLogger().error(LOGGER, "Failed to receive response, local:{}, remote:{}, message={}.", getLocalAddress(), httpClientResponse.netSocket().remoteAddress(), @@ -228,17 +238,38 @@ protected void handleResponse(HttpClientResponse httpClientResponse) { clientResponse.bodyHandler(this::processResponseBody); } + private boolean isServerSendEvents(HttpClientResponse httpClientResponse) { + if (httpClientResponse.getHeader("Content-Type") == null) { + return false; + } + return httpClientResponse.getHeader("Content-Type").contains(EVENTS_MEDIA_TYPE); + } + /** * after this method, connection will be recycled to connection pool * @param responseBuf response body buffer, when download, responseBuf is null, because download data by ReadStreamPart */ protected void processResponseBody(Buffer responseBuf) { + HttpServletResponseEx responseEx = + new VertxClientResponseToHttpServletResponse(clientResponse, responseBuf); + doProcessResponseBody(responseEx); + } + + /** + * after this method, connection will be recycled to connection pool + * @param flowable sse flowable response + */ + protected void processFlowableResponseBody(Flowable flowable) { + HttpServletResponseEx responseEx = + new VertxClientResponseToHttpServletResponse(clientResponse, flowable); + doProcessResponseBody(responseEx); + } + + private void doProcessResponseBody(HttpServletResponseEx responseEx) { invocation.getInvocationStageTrace().finishReceiveResponse(); invocation.getResponseExecutor().execute(() -> { try { invocation.getInvocationStageTrace().startClientFiltersResponse(); - HttpServletResponseEx responseEx = - new VertxClientResponseToHttpServletResponse(clientResponse, responseBuf); for (HttpClientFilter filter : httpClientFilters) { if (filter.enabled()) { Response response = filter.afterReceiveResponse(invocation, responseEx); diff --git a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java index 5c0b891e4a2..2b5192a5460 100644 --- a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java +++ b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestDefaultHttpClientFilter.java @@ -141,6 +141,8 @@ public void extractResult_decodeError(@Mocked Invocation invocation, @Mocked Rea result = 400; responseEx.getBodyBuffer(); result = new BufferImpl().appendString("abc"); + responseEx.getFlowableBuffer(); + result = null; } }; new MockUp() { @@ -189,6 +191,8 @@ public void extractResult_decodeError200(@Mocked Invocation invocation, @Mocked result = 200; responseEx.getBodyBuffer(); result = new BufferImpl().appendString("abc"); + responseEx.getFlowableBuffer(); + result = null; } }; new MockUp() { @@ -237,6 +241,8 @@ public void testAfterReceiveResponseNullProduceProcessor(@Mocked Invocation invo result = Status.FORBIDDEN; responseEx.getBodyBuffer(); result = Buffer.buffer(JsonUtils.writeValueAsString(data).getBytes()); + responseEx.getFlowableBuffer(); + result = null; } }; @@ -284,6 +290,8 @@ public void testAfterReceiveResponseNormal(@Mocked Invocation invocation, responseEx.getStatusType(); result = Status.OK; + responseEx.getFlowableBuffer(); + result = null; } };