Skip to content

Commit f7ade49

Browse files
committed
Merge branch '3.5.x'
2 parents 99ae861 + 42ef89e commit f7ade49

File tree

4 files changed

+77
-23
lines changed

4 files changed

+77
-23
lines changed

framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/HttpClassicRequestAssembler.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.netty.handler.codec.http.LastHttpContent;
3333
import io.netty.util.Attribute;
3434
import io.netty.util.AttributeKey;
35+
import io.netty.util.AttributeMap;
3536
import modelengine.fit.http.protocol.HttpResponseStatus;
3637
import modelengine.fit.http.server.ErrorResponse;
3738
import modelengine.fit.http.server.HttpClassicServer;
@@ -88,16 +89,24 @@ public HttpClassicRequestAssembler(HttpClassicServer server, boolean secure, Con
8889
}
8990

9091
private static void setRequest(ChannelHandlerContext ctx, NettyHttpServerRequest serverRequest) {
91-
Attribute<NettyHttpServerRequest> attr = ctx.channel().attr(REQUEST);
92+
Attribute<NettyHttpServerRequest> attr = ((AttributeMap) ctx).attr(REQUEST);
9293
attr.set(serverRequest);
9394
}
9495

9596
private static NettyHttpServerRequest getRequest(ChannelHandlerContext ctx) {
96-
return ctx.channel().attr(REQUEST).get();
97+
return ((AttributeMap) ctx).attr(REQUEST).get();
9798
}
9899

99100
private static void clearRequest(ChannelHandlerContext ctx) {
100-
ctx.channel().attr(REQUEST).set(null);
101+
NettyHttpServerRequest request = getRequest(ctx);
102+
if (request != null) {
103+
((AttributeMap) ctx).attr(REQUEST).set(null);
104+
try {
105+
request.close();
106+
} catch (IOException e) {
107+
log.warn("Failed to close netty http server request, ignored.", e);
108+
}
109+
}
101110
}
102111

103112
@Override
@@ -117,12 +126,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
117126
super.channelInactive(ctx);
118127
}
119128

120-
@Override
121-
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
122-
this.stopExecution(ctx);
123-
super.channelUnregistered(ctx);
124-
}
125-
126129
private void stopExecution(ChannelHandlerContext ctx) {
127130
NettyHttpServerRequest request = getRequest(ctx);
128131
if (request != null) {
@@ -135,6 +138,7 @@ private void stopExecution(ChannelHandlerContext ctx) {
135138
@Override
136139
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
137140
if (msg instanceof HttpRequest) {
141+
clearRequest(ctx);
138142
this.handleHttpRequest(ctx, cast(msg));
139143
return;
140144
}
@@ -171,6 +175,11 @@ private void doHttpRequest(ChannelHandlerContext ctx, NettyHttpServerRequest req
171175
this.exceptionCaught(ctx, cause, request);
172176
} finally {
173177
request.removeExecuteThread();
178+
try {
179+
request.tryClose();
180+
} catch (IOException e) {
181+
log.warn("Failed to close netty http server request when request finished, ignored.", e);
182+
}
174183
}
175184
}
176185

@@ -184,15 +193,18 @@ private void handleHttpContent(ChannelHandlerContext ctx, HttpContent content) {
184193
ctx.channel().isOpen());
185194
throw new IllegalStateException(message);
186195
}
187-
this.receiveHttpContent(ctx, request, content);
196+
this.receiveHttpContent(request, content);
188197
}
189198

190-
private void receiveHttpContent(ChannelHandlerContext ctx, NettyHttpServerRequest serverRequest,
191-
HttpContent content) {
199+
private void receiveHttpContent(NettyHttpServerRequest serverRequest, HttpContent content) {
192200
try {
193201
if (content instanceof LastHttpContent) {
194202
serverRequest.receiveLastHttpContent(cast(content));
195-
clearRequest(ctx);
203+
try {
204+
serverRequest.tryClose();
205+
} catch (IOException e) {
206+
log.warn("Failed to close netty http server request when received last http content, ignored.", e);
207+
}
196208
} else {
197209
serverRequest.receiveHttpContent(content);
198210
}

framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/main/java/modelengine/fit/http/server/netty/NettyHttpServerRequest.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import modelengine.fit.http.protocol.ServerRequest;
2525
import modelengine.fit.http.protocol.util.HeaderUtils;
2626
import modelengine.fitframework.log.Logger;
27+
import modelengine.fitframework.util.LockUtils;
2728
import modelengine.fitframework.util.ObjectUtils;
2829

2930
import java.io.IOException;
@@ -32,6 +33,9 @@
3233
import java.net.SocketAddress;
3334
import java.util.List;
3435
import java.util.Optional;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReadWriteLock;
3539

3640
/**
3741
* {@link ServerRequest} 的 Netty 实现。
@@ -51,8 +55,12 @@ public class NettyHttpServerRequest implements ServerRequest, OnHttpContentRecei
5155
private final RequestLine startLine;
5256
private final MessageHeaders headers;
5357
private final NettyReadableMessageBody body;
54-
private boolean isClosed;
55-
private Thread executeThread;
58+
private final AtomicBoolean isClosed = new AtomicBoolean(false);
59+
private final ReadWriteLock isClosedLock = LockUtils.newReentrantReadWriteLock();
60+
private final AtomicBoolean isComplete = new AtomicBoolean(false);
61+
private final AtomicBoolean isFinished = new AtomicBoolean(false);
62+
private final Lock tryCloseLock = LockUtils.newReentrantLock();
63+
private volatile Thread executeThread;
5664

5765
public NettyHttpServerRequest(HttpRequest request, ChannelHandlerContext ctx, boolean isSecure,
5866
long largeBodySize) {
@@ -127,6 +135,7 @@ public void receiveLastHttpContent(LastHttpContent content) throws IOException {
127135
this.checkIfClosed();
128136
ByteBuf byteBuf = content.content();
129137
this.body.write(byteBuf, true);
138+
LockUtils.synchronize(this.tryCloseLock, () -> this.isComplete.set(true));
130139
}
131140

132141
@Override
@@ -152,16 +161,48 @@ public boolean isActive() {
152161
}
153162

154163
private void checkIfClosed() throws IOException {
155-
if (this.isClosed) {
156-
throw new IOException("The netty http server request has already been closed.");
164+
this.isClosedLock.readLock().lock();
165+
try {
166+
if (this.isClosed.get()) {
167+
throw new IOException("The netty http server request has already been closed.");
168+
}
169+
} finally {
170+
this.isClosedLock.readLock().unlock();
171+
}
172+
}
173+
174+
/**
175+
* 尝试关闭。
176+
*
177+
* @throws IOException 当关闭失败时。
178+
*/
179+
void tryClose() throws IOException {
180+
this.tryCloseLock.lock();
181+
try {
182+
if (this.isFinished.get() && this.isComplete.get()) {
183+
this.close();
184+
}
185+
} finally {
186+
this.tryCloseLock.unlock();
157187
}
158188
}
159189

160190
@Override
161191
public void close() throws IOException {
162-
log.info("Netty http request closed. [id={}]", this.ctx.name());
163-
this.isClosed = true;
164-
this.body.close();
192+
if (this.isClosed.get()) {
193+
return;
194+
}
195+
this.isClosedLock.writeLock().lock();
196+
try {
197+
if (this.isClosed.get()) {
198+
return;
199+
}
200+
log.info("Netty http request closed. [id={}]", this.ctx.name());
201+
this.isClosed.set(true);
202+
this.body.close();
203+
} finally {
204+
this.isClosedLock.writeLock().unlock();
205+
}
165206
}
166207

167208
@Override
@@ -228,6 +269,7 @@ void setExecuteThread(Thread thread) {
228269
*/
229270
void removeExecuteThread() {
230271
this.executeThread = null;
272+
LockUtils.synchronize(this.tryCloseLock, () -> this.isFinished.set(true));
231273
}
232274

233275
/**

framework/fit/java/fit-builtin/plugins/fit-http-server-netty/src/test/java/modelengine/fit/http/server/netty/HttpClassicRequestAssemblerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.handler.codec.http.HttpMethod;
2323
import io.netty.handler.codec.http.HttpVersion;
2424
import io.netty.util.Attribute;
25+
import io.netty.util.AttributeMap;
2526
import modelengine.fit.http.protocol.HttpRequestMethod;
2627
import modelengine.fit.http.server.HttpHandler;
2728
import modelengine.fit.http.server.netty.support.DefaultNettyServerConfig;
@@ -72,9 +73,9 @@ void setup() {
7273
this.ctx = mock(ChannelHandlerContext.class);
7374
Channel channel = mock(Channel.class);
7475
when(this.ctx.channel()).thenReturn(channel);
75-
ChannelId channelId = mock(ChannelId.class);
7676
Attribute attribute = mock(Attribute.class);
77-
when(channel.attr(any())).thenReturn(attribute);
77+
when(((AttributeMap) this.ctx).attr(any())).thenReturn(attribute);
78+
ChannelId channelId = mock(ChannelId.class);
7879
when(channelId.asLongText()).thenReturn("requestId");
7980
this.requestAssembler = new HttpClassicRequestAssembler(classicServer,
8081
false,

framework/fit/java/fit-builtin/services/fit-http-classic/definition/src/main/java/modelengine/fit/http/server/support/DefaultHttpClassicServerRequest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ private byte[] actualEntityBytes() {
117117

118118
@Override
119119
public void close() throws IOException {
120-
this.serverRequest.close();
121120
if (this.entityLoader.isLoaded()) {
122121
Optional<Entity> entity = this.entityLoader.get();
123122
if (entity.isPresent()) {

0 commit comments

Comments
 (0)