diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java index 69f6e4d29..ca73cd165 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java @@ -39,7 +39,6 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.qpid.server.QpidException; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.common.ServerPropertyNames; import org.apache.qpid.server.protocol.ProtocolVersion; @@ -74,6 +73,7 @@ public class ProxyConnection extends ChannelInboundHandlerAdapter implements private LookupHandler lookupHandler; private AMQShortString virtualHost; private String vhost; + private String remoteAddress; private List connectMsgList = new ArrayList<>(); @@ -81,6 +81,7 @@ private enum State { Init, RedirectLookup, RedirectToBroker, + Closing, Closed } @@ -99,12 +100,16 @@ public ProxyConnection(ProxyService proxyService) throws PulsarClientException { public void channelActive(ChannelHandlerContext cnx) throws Exception { super.channelActive(cnx); this.cnx = cnx; + this.remoteAddress = cnx.channel().remoteAddress().toString(); + log.info("[{}] New proxy connection established", remoteAddress); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - this.close(); + resetProxyHandler(); + log.info("[{}] ProxyConnection closed.", remoteAddress); + this.state = State.Closed; } @Override @@ -136,7 +141,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception try { brokerDecoder.decodeBuffer(QpidByteBuffer.wrap(buffer.nioBuffer())); } catch (Throwable e) { - log.error("error while handle command:", e); + log.error("Closing the proxy connection, error while handle command:", e); close(); } @@ -180,7 +185,7 @@ public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) { "PLAIN token".getBytes(US_ASCII), "en_US".getBytes(US_ASCII)); writeFrame(responseBody.generateFrame(0)); - } catch (QpidException e) { + } catch (Exception e) { log.error("Received unsupported protocol initiation for protocol version: {} ", getProtocolVersion(), e); } } @@ -243,7 +248,8 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap public void handleConnect(AtomicInteger retryTimes) { log.info("handle connect residue retryTimes: {}", retryTimes); if (retryTimes.get() == 0) { - log.warn("Handle connect retryTimes is 0."); + log.warn("[{}] Closing the proxy connection, retry times for handling connect is exhausted.", + remoteAddress); close(); return; } @@ -265,8 +271,7 @@ public void handleConnect(AtomicInteger retryTimes) { handleConnectComplete(pair.getLeft(), pair.getRight(), retryTimes); }); } catch (Exception e) { - log.error("Lookup broker failed.", e); - resetProxyHandler(); + log.error("[{}] Closing the proxy connection, lookup broker failed.", remoteAddress, e); close(); } } @@ -363,22 +368,26 @@ public synchronized void writeFrame(AMQDataBlock frame) { if (log.isDebugEnabled()) { log.debug("send: " + frame); } - cnx.writeAndFlush(frame); + cnx.writeAndFlush(frame) + .addListener(future -> { + if (!future.isSuccess()) { + log.error("[{}] ProxyConnection failed to write frame.", remoteAddress, future.cause()); + } + }); } - public void close() { - log.info("ProxyConnection close."); - if (log.isDebugEnabled()) { - log.debug("ProxyConnection close."); - } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + log.error("[{}] Closing the proxy connection, exception caught: ", remoteAddress, cause); + close(); + } - if (proxyHandler != null) { - resetProxyHandler(); - } - if (cnx != null) { - cnx.close(); + public void close() { + if (state != State.Closed) { + state = State.Closing; } - state = State.Closed; + cnx.close(); } } diff --git a/pom.xml b/pom.xml index 5801ea0a3..2512c7cb5 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,41 @@ + + + org.apache.qpid + qpid-broker-plugins-amqp-0-8-protocol + ${qpid-protocol-plugin.version} + + + + org.apache.qpid + qpid-broker-core + ${qpid-protocol-plugin.version} + + + jackson-core + com.fasterxml.jackson.core + + + jackson-databind + com.fasterxml.jackson.core + + + guava + com.google.guava + + + slf4j-api + org.slf4j + + + commons-lang3 + org.apache.commons + + + + io.streamnative streamnative-bom diff --git a/scripts/build.sh b/scripts/build.sh index 71151a695..d78c5eb2f 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -28,5 +28,5 @@ ASSETS_DIR=release mkdir $ASSETS_DIR mvn clean install -DskipTests -Dmaven.wagon.http.retryHandler.count=3 -mv amqp-impl/target/pulsar-protocol-handler-amqp-*.jar ./$ASSETS_DIR/pulsar-protocol-handler-amqp-"${version}".nar +mv amqp-impl/target/pulsar-protocol-handler-amqp-*.nar ./$ASSETS_DIR/pulsar-protocol-handler-amqp-"${version}".nar cp README.md ./$ASSETS_DIR/pulsar-protocol-handler-amqp-readme.md