Skip to content

Commit 5b72718

Browse files
committed
Merge branch '3.5.x'
2 parents b96fe46 + 9336441 commit 5b72718

File tree

6 files changed

+134
-89
lines changed

6 files changed

+134
-89
lines changed

framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
/*---------------------------------------------------------------------------------------------
2-
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3-
* This file is a part of the ModelEngine Project.
4-
* Licensed under the MIT License. See License.txt in the project root for license information.
5-
*--------------------------------------------------------------------------------------------*/
1+
/*
2+
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*/
66

77
package modelengine.fit.http.server.netty;
88

@@ -71,7 +71,7 @@ public NettyHttpServerRequest(HttpRequest request, ChannelHandlerContext ctx, bo
7171
this.startLine = this.initStartLine();
7272
this.headers = this.initHeaders();
7373
this.body = this.isLargeBody() ? NettyReadableMessageBody.large() : NettyReadableMessageBody.common();
74-
log.info("Netty http request initialized. [id={0}, request={1}]", ctx.name(), this.startLine());
74+
log.debug("Netty http request initialized. [id={0}, request={1}]", ctx.name(), this.startLine());
7575
}
7676

7777
private boolean isLargeBody() {
@@ -197,7 +197,7 @@ public void close() throws IOException {
197197
if (this.isClosed.get()) {
198198
return;
199199
}
200-
log.info("Netty http request closed. [id={}]", this.ctx.name());
200+
log.debug("Netty http request closed. [id={}]", this.ctx.name());
201201
this.isClosed.set(true);
202202
this.body.close();
203203
} finally {

framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerResponse.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
/*---------------------------------------------------------------------------------------------
2-
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3-
* This file is a part of the ModelEngine Project.
4-
* Licensed under the MIT License. See License.txt in the project root for license information.
5-
*--------------------------------------------------------------------------------------------*/
1+
/*
2+
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*/
66

77
package modelengine.fit.http.server.netty;
88

@@ -126,6 +126,11 @@ public boolean isActive() {
126126
return this.ctx.channel().isActive();
127127
}
128128

129+
@Override
130+
public void closeChannel() {
131+
this.ctx.close();
132+
}
133+
129134
@Override
130135
public void close() throws IOException {
131136
this.isClosed = true;

framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/entity/support/DefaultTextEvent.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
/*---------------------------------------------------------------------------------------------
2-
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3-
* This file is a part of the ModelEngine Project.
4-
* Licensed under the MIT License. See License.txt in the project root for license information.
5-
*--------------------------------------------------------------------------------------------*/
1+
/*
2+
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*/
66

77
package modelengine.fit.http.entity.support;
88

@@ -88,7 +88,7 @@ private static void appendComment(StringBuilder sb, String comment) {
8888

8989
private static void appendData(StringBuilder sb, ObjectSerializer objectSerializer, Object data) {
9090
if (data == null) {
91-
if (sb.length() == 0) {
91+
if (sb.isEmpty()) {
9292
sb.append(LF);
9393
}
9494
sb.append(LF);

framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponse.java

Lines changed: 95 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
/*---------------------------------------------------------------------------------------------
2-
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3-
* This file is a part of the ModelEngine Project.
4-
* Licensed under the MIT License. See License.txt in the project root for license information.
5-
*--------------------------------------------------------------------------------------------*/
1+
/*
2+
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*/
66

77
package modelengine.fit.http.server.support;
88

@@ -22,6 +22,7 @@
2222
import modelengine.fit.http.entity.Entity;
2323
import modelengine.fit.http.entity.FileEntity;
2424
import modelengine.fit.http.entity.ReadableBinaryEntity;
25+
import modelengine.fit.http.entity.TextEvent;
2526
import modelengine.fit.http.entity.TextEventStreamEntity;
2627
import modelengine.fit.http.entity.WritableBinaryEntity;
2728
import modelengine.fit.http.entity.support.DefaultWritableBinaryEntity;
@@ -36,6 +37,7 @@
3637
import modelengine.fit.http.server.HttpClassicServerResponse;
3738
import modelengine.fit.http.server.InternalServerErrorException;
3839
import modelengine.fit.http.support.AbstractHttpClassicResponse;
40+
import modelengine.fitframework.flowable.Subscription;
3941
import modelengine.fitframework.resource.UrlUtils;
4042
import modelengine.fitframework.serialization.ObjectSerializer;
4143
import modelengine.fitframework.util.ObjectUtils;
@@ -45,8 +47,6 @@
4547
import java.nio.charset.Charset;
4648
import java.nio.charset.StandardCharsets;
4749
import java.util.Optional;
48-
import java.util.concurrent.CountDownLatch;
49-
import java.util.concurrent.atomic.AtomicReference;
5050

5151
/**
5252
* 表示 {@link HttpClassicServerResponse} 的默认实现。
@@ -148,78 +148,106 @@ public void send() {
148148
if (this.entity == null) {
149149
this.headers().set(CONTENT_LENGTH, ZERO);
150150
this.serverResponse.writeStartLineAndHeaders();
151-
} else if (this.entity instanceof ReadableBinaryEntity) {
152-
if (this.entity instanceof FileEntity) {
153-
FileEntity actual = cast(this.entity);
154-
this.headers().set(CONTENT_LENGTH, String.valueOf(actual.length()));
155-
} else if (!this.headers().contains(CONTENT_LENGTH)) {
156-
this.headers().set(TRANSFER_ENCODING, CHUNKED);
157-
}
158-
this.serverResponse.writeStartLineAndHeaders();
159-
ReadableBinaryEntity readableBinaryEntity = cast(this.entity);
160-
byte[] bytes = new byte[512];
161-
int read;
162-
while ((read = readableBinaryEntity.read(bytes)) > -1) {
163-
this.serverResponse.writeBody(bytes, 0, read);
164-
}
165-
} else if (this.entity instanceof WritableBinaryEntity) {
166-
// WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。
151+
this.serverResponse.flush();
167152
} else if (this.entity instanceof TextEventStreamEntity) {
168153
this.headers().set(CACHE_CONTROL, NO_CACHE);
169154
this.headers().set(CONNECTION, KEEP_ALIVE);
170155
this.headers().set(TRANSFER_ENCODING, CHUNKED);
171156
this.serverResponse.writeStartLineAndHeaders();
172-
this.sendTextEventStream(cast(this.entity));
157+
this.sendTextEventStream(cast(this.entity), charset);
173158
} else {
174-
byte[] entityBytes = this.entitySerializer().serializeEntity(ObjectUtils.cast(this.entity), charset);
175-
this.headers().set(CONTENT_LENGTH, String.valueOf(entityBytes.length));
176-
this.serverResponse.writeStartLineAndHeaders();
177-
this.serverResponse.writeBody(entityBytes);
159+
this.sendDirectly(charset);
178160
}
179-
this.serverResponse.flush();
180161
} catch (IOException e) {
181162
throw new InternalServerErrorException("Failed to write response.", e);
182163
}
183164
}
184165

185-
@Override
186-
public boolean isActive() {
187-
return this.serverResponse.isActive();
188-
}
189-
190-
private void sendTextEventStream(TextEventStreamEntity eventStreamEntity) throws IOException {
166+
private void sendTextEventStream(TextEventStreamEntity eventStreamEntity, Charset charset) throws IOException {
191167
ObjectSerializer objectSerializer = this.jsonSerializer()
192168
.orElseThrow(() -> new IllegalStateException("The json serializer cannot be null."));
193-
AtomicReference<Exception> exception = new AtomicReference<>();
194-
CountDownLatch latch = new CountDownLatch(1);
195169
eventStreamEntity.stream()
196-
.map(sse -> sse.serialize(objectSerializer).getBytes(StandardCharsets.UTF_8))
197-
.subscribe(null, (subscription, bytes) -> {
198-
try {
199-
this.serverResponse.writeBody(bytes);
200-
} catch (IOException e) {
201-
subscription.cancel();
202-
exception.set(e);
203-
latch.countDown();
204-
}
205-
}, subscription -> latch.countDown(), (ignore, e) -> {
206-
exception.set(e);
207-
latch.countDown();
208-
});
170+
.map(event -> event.serialize(objectSerializer).getBytes(charset))
171+
.subscribe(null,
172+
(subscription, bytes) -> this.onSseMessage(subscription, bytes, charset),
173+
subscription -> this.onSseComplete(charset),
174+
(subscription, e) -> this.onSseError(e, charset));
175+
}
176+
177+
private void onSseMessage(Subscription subscription, byte[] bytes, Charset charset) {
209178
try {
210-
latch.await();
211-
} catch (InterruptedException e) {
212-
Thread.currentThread().interrupt();
213-
throw new InternalServerErrorException("Failed to execute handler.", e);
179+
this.serverResponse.writeBody(bytes);
180+
} catch (IOException e) {
181+
subscription.cancel();
182+
this.onSseError(e, charset);
214183
}
215-
Exception e = exception.get();
216-
if (e == null) {
217-
return;
184+
}
185+
186+
private void onSseComplete(Charset charset) {
187+
try {
188+
this.serverResponse.flush();
189+
} catch (IOException e) {
190+
this.onSseError(e, charset);
191+
} finally {
192+
try {
193+
this.close0();
194+
} catch (IOException e) {
195+
// ignore.
196+
}
197+
}
198+
}
199+
200+
private void onSseError(Throwable throwable, Charset charset) {
201+
try {
202+
TextEvent errorEvent = TextEvent.custom().event("error").data(throwable.getMessage()).build();
203+
ObjectSerializer objectSerializer = this.jsonSerializer()
204+
.orElseThrow(() -> new IllegalStateException("The json serializer cannot be null."));
205+
this.serverResponse.writeBody(errorEvent.serialize(objectSerializer).getBytes(charset));
206+
this.serverResponse.flush();
207+
} catch (IOException e) {
208+
this.serverResponse.closeChannel();
209+
InternalServerErrorException internalServerErrorException =
210+
new InternalServerErrorException("Failed to send error response when sse.", e);
211+
internalServerErrorException.addSuppressed(throwable);
212+
throw internalServerErrorException;
213+
} finally {
214+
try {
215+
this.close0();
216+
} catch (IOException e) {
217+
// ignore.
218+
}
218219
}
219-
if (e instanceof IOException) {
220-
throw (IOException) e;
220+
}
221+
222+
private void sendDirectly(Charset charset) throws IOException {
223+
if (this.entity instanceof ReadableBinaryEntity) {
224+
if (this.entity instanceof FileEntity) {
225+
FileEntity actual = cast(this.entity);
226+
this.headers().set(CONTENT_LENGTH, String.valueOf(actual.length()));
227+
} else if (!this.headers().contains(CONTENT_LENGTH)) {
228+
this.headers().set(TRANSFER_ENCODING, CHUNKED);
229+
}
230+
this.serverResponse.writeStartLineAndHeaders();
231+
ReadableBinaryEntity readableBinaryEntity = cast(this.entity);
232+
byte[] bytes = new byte[512];
233+
int read;
234+
while ((read = readableBinaryEntity.read(bytes)) > -1) {
235+
this.serverResponse.writeBody(bytes, 0, read);
236+
}
237+
} else if (this.entity instanceof WritableBinaryEntity) {
238+
// WritableBinaryEntity 已经在用户代码层面进行了输出,因此此处什么都不需要处理。
239+
} else {
240+
byte[] entityBytes = this.entitySerializer().serializeEntity(ObjectUtils.cast(this.entity), charset);
241+
this.headers().set(CONTENT_LENGTH, String.valueOf(entityBytes.length));
242+
this.serverResponse.writeStartLineAndHeaders();
243+
this.serverResponse.writeBody(entityBytes);
221244
}
222-
throw new InternalServerErrorException("Failed to execute handler.", e);
245+
this.serverResponse.flush();
246+
}
247+
248+
@Override
249+
public boolean isActive() {
250+
return this.serverResponse.isActive();
223251
}
224252

225253
@Override
@@ -239,6 +267,13 @@ protected void commit() {
239267

240268
@Override
241269
public void close() throws IOException {
270+
if (this.entity instanceof TextEventStreamEntity) {
271+
return;
272+
}
273+
this.close0();
274+
}
275+
276+
private void close0() throws IOException {
242277
this.serverResponse.close();
243278
if (this.entity != null) {
244279
this.entity.close();

framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/test/java/modelengine/fit/http/server/support/DefaultHttpClassicServerResponseTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
/*---------------------------------------------------------------------------------------------
2-
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3-
* This file is a part of the ModelEngine Project.
4-
* Licensed under the MIT License. See License.txt in the project root for license information.
5-
*--------------------------------------------------------------------------------------------*/
1+
/*
2+
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*/
66

77
package modelengine.fit.http.server.support;
88

@@ -99,7 +99,7 @@ void shouldThrowExceptionWhenSendTextStreamError() throws IOException {
9999
ServerResponse serverResponse = mock(ServerResponse.class);
100100
when(serverResponse.startLine()).thenReturn(mock(ConfigurableStatusLine.class));
101101
when(serverResponse.headers()).thenReturn(mock(ConfigurableMessageHeaders.class));
102-
doThrow(new IOException("Error")).when(serverResponse).writeBody(new byte[1]);
102+
doThrow(new IOException("Error")).when(serverResponse).flush();
103103
Choir<byte[]> mappedChoir = Choir.just(new byte[1], new byte[2]);
104104
Choir<TextEvent> stream = ObjectUtils.cast(mock(Choir.class));
105105
when(stream.map(Mockito.any())).thenReturn(ObjectUtils.cast(mappedChoir));

framework/fit/java/fit-builtin/services/fit-http-protocol/definition/src/main/java/modelengine/fit/http/protocol/ServerResponse.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
/*---------------------------------------------------------------------------------------------
2-
* Copyright (c) 2024 Huawei Technologies Co., Ltd. All rights reserved.
3-
* This file is a part of the ModelEngine Project.
4-
* Licensed under the MIT License. See License.txt in the project root for license information.
5-
*--------------------------------------------------------------------------------------------*/
1+
/*
2+
* Copyright (c) 2024-2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*/
66

77
package modelengine.fit.http.protocol;
88

@@ -82,4 +82,9 @@ default void writeBody(byte[] bytes) throws IOException {
8282
* @return true if the response is active; false otherwise.
8383
*/
8484
boolean isActive();
85+
86+
/**
87+
* Closes the underlying channel associated with the response.
88+
*/
89+
void closeChannel();
8590
}

0 commit comments

Comments
 (0)