Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.qpid.server.QpidException;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.common.ServerPropertyNames;
import org.apache.qpid.server.protocol.ProtocolVersion;
Expand Down Expand Up @@ -74,13 +73,15 @@ public class ProxyConnection extends ChannelInboundHandlerAdapter implements
private LookupHandler lookupHandler;
private AMQShortString virtualHost;
private String vhost;
private String remoteAddress;

private List<Object> connectMsgList = new ArrayList<>();

private enum State {
Init,
RedirectLookup,
RedirectToBroker,
Closing,
Closed
}

Expand All @@ -99,12 +100,16 @@ public ProxyConnection(ProxyService proxyService) throws PulsarClientException {
public void channelActive(ChannelHandlerContext cnx) throws Exception {
super.channelActive(cnx);
this.cnx = cnx;
this.remoteAddress = cnx.channel().remoteAddress().toString();
log.info("[{}] New proxy connection established", remoteAddress);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
this.close();
resetProxyHandler();
log.info("[{}] ProxyConnection closed.", remoteAddress);
this.state = State.Closed;
}

@Override
Expand Down Expand Up @@ -136,7 +141,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
try {
brokerDecoder.decodeBuffer(QpidByteBuffer.wrap(buffer.nioBuffer()));
} catch (Throwable e) {
log.error("error while handle command:", e);
log.error("Closing the proxy connection, error while handle command:", e);
close();
}

Expand Down Expand Up @@ -180,7 +185,7 @@ public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) {
"PLAIN token".getBytes(US_ASCII),
"en_US".getBytes(US_ASCII));
writeFrame(responseBody.generateFrame(0));
} catch (QpidException e) {
} catch (Exception e) {
log.error("Received unsupported protocol initiation for protocol version: {} ", getProtocolVersion(), e);
}
}
Expand Down Expand Up @@ -243,7 +248,8 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap
public void handleConnect(AtomicInteger retryTimes) {
log.info("handle connect residue retryTimes: {}", retryTimes);
if (retryTimes.get() == 0) {
log.warn("Handle connect retryTimes is 0.");
log.warn("[{}] Closing the proxy connection, retry times for handling connect is exhausted.",
remoteAddress);
close();
return;
}
Expand All @@ -265,8 +271,7 @@ public void handleConnect(AtomicInteger retryTimes) {
handleConnectComplete(pair.getLeft(), pair.getRight(), retryTimes);
});
} catch (Exception e) {
log.error("Lookup broker failed.", e);
resetProxyHandler();
log.error("[{}] Closing the proxy connection, lookup broker failed.", remoteAddress, e);
close();
}
}
Expand Down Expand Up @@ -363,22 +368,26 @@ public synchronized void writeFrame(AMQDataBlock frame) {
if (log.isDebugEnabled()) {
log.debug("send: " + frame);
}
cnx.writeAndFlush(frame);
cnx.writeAndFlush(frame)
.addListener(future -> {
if (!future.isSuccess()) {
log.error("[{}] ProxyConnection failed to write frame.", remoteAddress, future.cause());
}
});
}

public void close() {
log.info("ProxyConnection close.");
if (log.isDebugEnabled()) {
log.debug("ProxyConnection close.");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
log.error("[{}] Closing the proxy connection, exception caught: ", remoteAddress, cause);
close();
}

if (proxyHandler != null) {
resetProxyHandler();
}
if (cnx != null) {
cnx.close();
public void close() {
if (state != State.Closed) {
state = State.Closing;
}
state = State.Closed;
cnx.close();
}

}
35 changes: 35 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,41 @@
<!-- dependency definitions -->
<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
<version>${qpid-protocol-plugin.version}</version>
</dependency>

<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
<version>${qpid-protocol-plugin.version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-core</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.streamnative</groupId>
<artifactId>streamnative-bom</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ ASSETS_DIR=release
mkdir $ASSETS_DIR

mvn clean install -DskipTests -Dmaven.wagon.http.retryHandler.count=3
mv amqp-impl/target/pulsar-protocol-handler-amqp-*.jar ./$ASSETS_DIR/pulsar-protocol-handler-amqp-"${version}".nar
mv amqp-impl/target/pulsar-protocol-handler-amqp-*.nar ./$ASSETS_DIR/pulsar-protocol-handler-amqp-"${version}".nar
cp README.md ./$ASSETS_DIR/pulsar-protocol-handler-amqp-readme.md
Loading