Skip to content

Commit e16a9d8

Browse files
committed
improve proxy connection closing logic
1 parent fbadbfd commit e16a9d8

File tree

1 file changed

+18
-18
lines changed

1 file changed

+18
-18
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,15 @@ public class ProxyConnection extends ChannelInboundHandlerAdapter implements
7373
private LookupHandler lookupHandler;
7474
private AMQShortString virtualHost;
7575
private String vhost;
76+
private String remoteAddress;
7677

7778
private List<Object> connectMsgList = new ArrayList<>();
7879

7980
private enum State {
8081
Init,
8182
RedirectLookup,
8283
RedirectToBroker,
84+
Closing,
8385
Closed
8486
}
8587

@@ -98,12 +100,16 @@ public ProxyConnection(ProxyService proxyService) throws PulsarClientException {
98100
public void channelActive(ChannelHandlerContext cnx) throws Exception {
99101
super.channelActive(cnx);
100102
this.cnx = cnx;
103+
this.remoteAddress = cnx.channel().remoteAddress().toString();
104+
log.info("[{}] New proxy connection established", remoteAddress);
101105
}
102106

103107
@Override
104108
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
105109
super.channelInactive(ctx);
106-
this.close();
110+
resetProxyHandler();
111+
log.info("[{}] ProxyConnection closed.", remoteAddress);
112+
this.state = State.Closed;
107113
}
108114

109115
@Override
@@ -135,7 +141,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
135141
try {
136142
brokerDecoder.decodeBuffer(QpidByteBuffer.wrap(buffer.nioBuffer()));
137143
} catch (Throwable e) {
138-
log.error("error while handle command:", e);
144+
log.error("Closing the proxy connection, error while handle command:", e);
139145
close();
140146
}
141147

@@ -242,7 +248,8 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap
242248
public void handleConnect(AtomicInteger retryTimes) {
243249
log.info("handle connect residue retryTimes: {}", retryTimes);
244250
if (retryTimes.get() == 0) {
245-
log.warn("Handle connect retryTimes is 0.");
251+
log.warn("[{}] Closing the proxy connection, retry times for handling connect is exhausted.",
252+
remoteAddress);
246253
close();
247254
return;
248255
}
@@ -264,8 +271,7 @@ public void handleConnect(AtomicInteger retryTimes) {
264271
handleConnectComplete(pair.getLeft(), pair.getRight(), retryTimes);
265272
});
266273
} catch (Exception e) {
267-
log.error("Lookup broker failed.", e);
268-
resetProxyHandler();
274+
log.error("[{}] Closing the proxy connection, lookup broker failed.", remoteAddress, e);
269275
close();
270276
}
271277
}
@@ -365,29 +371,23 @@ public synchronized void writeFrame(AMQDataBlock frame) {
365371
cnx.writeAndFlush(frame)
366372
.addListener(future -> {
367373
if (!future.isSuccess()) {
368-
log.error("ProxyConnection failed to write frame.", future.cause());
374+
log.error("[{}] ProxyConnection failed to write frame.", remoteAddress, future.cause());
369375
}
370376
});
371377
}
372378

373379
@Override
374380
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
375-
log.error("ProxyConnection exception caught: ", cause);
381+
super.exceptionCaught(ctx, cause);
382+
log.error("[{}] Closing the proxy connection, exception caught: ", remoteAddress, cause);
383+
close();
376384
}
377385

378386
public void close() {
379-
log.info("ProxyConnection close.");
380-
if (log.isDebugEnabled()) {
381-
log.debug("ProxyConnection close.");
382-
}
383-
384-
if (proxyHandler != null) {
385-
resetProxyHandler();
386-
}
387-
if (cnx != null) {
388-
cnx.close();
387+
if (state != State.Closed) {
388+
state = State.Closing;
389389
}
390-
state = State.Closed;
390+
cnx.close();
391391
}
392392

393393
}

0 commit comments

Comments
 (0)