Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class CompositeTypeConstructorGenerator extends AbstractProcessor
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy",
"org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability");

private static final List<String> SINGLETONS = List.of("Accepted");

@Override
public SourceVersion getSupportedSourceVersion()
Expand Down Expand Up @@ -188,7 +189,14 @@ private void generateCompositeTypeConstructor(final Filer filer, final TypeEleme
pw.println(" @Override");
pw.println(" protected " + objectSimpleName + " construct(final FieldValueReader fieldValueReader) throws AmqpErrorException");
pw.println(" {");
pw.println(" " + objectSimpleName + " obj = new " + objectSimpleName + "();");
if (SINGLETONS.contains(objectSimpleName))
{
pw.println(" " + objectSimpleName + " obj = " + objectSimpleName + ".INSTANCE;");
}
else
{
pw.println(" " + objectSimpleName + " obj = new " + objectSimpleName + "();");
}
pw.println();
generateAssigners(pw, typeElement);
pw.println(" return obj;");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
import org.apache.qpid.server.security.limit.ConnectionLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
Expand All @@ -74,6 +73,7 @@
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
Expand Down Expand Up @@ -117,6 +117,7 @@
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.security.limit.ConnectionLimitException;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
Expand Down Expand Up @@ -146,31 +147,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();


private static final byte[] SASL_HEADER = new byte[]
{
(byte) 'A',
(byte) 'M',
(byte) 'Q',
(byte) 'P',
(byte) 3,
(byte) 1,
(byte) 0,
(byte) 0
};

private static final byte[] AMQP_HEADER = new byte[]
{
(byte) 'A',
(byte) 'M',
(byte) 'Q',
(byte) 'P',
(byte) 0,
(byte) 1,
(byte) 0,
(byte) 0
};

private final FrameWriter _frameWriter;
private ProtocolHandler _frameHandler;
private volatile boolean _transportBlockedForWriting;
Expand Down Expand Up @@ -286,7 +262,7 @@ else if(getNetwork().getSelectedHost() != null)
}
String mechanism = saslInit.getMechanism().toString();
final Binary initialResponse = saslInit.getInitialResponse();
byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
byte[] response = initialResponse == null ? Bytes.EMPTY_BYTE_ARRAY : initialResponse.getArray();

List<String> availableMechanisms =
_subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
Expand All @@ -306,7 +282,7 @@ public void receiveSaslResponse(final SaslResponse saslResponse)
{
assertState(ConnectionState.AWAIT_SASL_RESPONSE);
final Binary responseBinary = saslResponse.getResponse();
byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
byte[] response = responseBinary == null ? Bytes.EMPTY_BYTE_ARRAY : responseBinary.getArray();

processSaslResponse(response);
}
Expand Down Expand Up @@ -338,7 +314,7 @@ private void processSaslResponse(final byte[] response)
SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
if (authenticationResult == null)
{
authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : Bytes.EMPTY_BYTE_ARRAY);
challenge = authenticationResult.getChallenge();
}

Expand Down Expand Up @@ -1204,21 +1180,30 @@ public int sendFrame(final int channel, final FrameBody body, final QpidByteBuff
{
if (!_closedForOutput)
{
ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
if (payload == null)
final int payloadRemaining = payload == null ? 0 : payload.remaining();
final boolean hasPayload = payloadRemaining > 0;

if (hasPayload && !(body instanceof Transfer))
{
throw new ConnectionScopedRuntimeException("Non-empty payload is only supported for Transfer frames. " +
"body=" + (body == null ? "null" : body.getClass().getName()) +
", payloadRemaining=" + payloadRemaining);
}

ValueWriter<FrameBody> writer = body == null ? null : _describedTypeRegistry.getValueWriter(body);
if (!hasPayload)
{
send(new TransportFrame(channel, body));
send(new TransportFrame(channel, body, writer));
return 0;
}
else
{
int size = writer.getEncodedSize();
int maxPayloadSize = _maxFrameSize - (size + 9);
long payloadLength = (long) payload.remaining();
if (payloadLength <= maxPayloadSize)
if (payloadRemaining <= maxPayloadSize)
{
send(new TransportFrame(channel, body, payload));
return (int)payloadLength;
send(new TransportFrame(channel, body, payload, writer));
return payloadRemaining;
}
else
{
Expand All @@ -1231,7 +1216,7 @@ public int sendFrame(final int channel, final FrameBody body, final QpidByteBuff
try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize))
{
payload.position(payload.position() + maxPayloadSize);
send(new TransportFrame(channel, body, payloadDup));
send(new TransportFrame(channel, body, payloadDup, writer));
}

return maxPayloadSize;
Expand Down Expand Up @@ -1361,14 +1346,16 @@ private void processProtocolHeader(final QpidByteBuffer msg)

final AuthenticationProvider<?> authenticationProvider = getPort().getAuthenticationProvider();

if(Arrays.equals(header, SASL_HEADER))
final byte[] amqpHeader = Bytes.amqpHeader();
final byte[] saslHeader = Bytes.saslHeader();
if (Arrays.equals(header, saslHeader))
{
if(_saslComplete)
{
throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
}

try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(SASL_HEADER))
try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(saslHeader))
{
getSender().send(protocolHeader);
}
Expand All @@ -1384,7 +1371,7 @@ private void processProtocolHeader(final QpidByteBuffer msg)
_connectionState = ConnectionState.AWAIT_SASL_INIT;
_frameHandler = getFrameHandler(true);
}
else if(Arrays.equals(header, AMQP_HEADER))
else if(Arrays.equals(header, amqpHeader))
{
if(!_saslComplete)
{
Expand All @@ -1406,7 +1393,7 @@ else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
}

}
try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(AMQP_HEADER))
try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(amqpHeader))
{
getSender().send(protocolHeader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

public class AnonymousRelayDestination implements ReceivingDestination
{
private static final Symbol[] CAPABILITIES = { DELAYED_DELIVERY };

private final Target _target;
private final NamedAddressSpace _addressSpace;
private final EventLogger _eventLogger;
Expand All @@ -53,10 +55,17 @@ public class AnonymousRelayDestination implements ReceivingDestination
.contains(DISCARD_UNROUTABLE);
}

/**
* Returns the target capabilities for an anonymous relay.
* <br>
* Note: returns a shared array instance to avoid per-call allocation. The returned array must be treated as
* immutable and must not be modified by callers.
* @return {@link Symbol} array
*/
@Override
public Symbol[] getCapabilities()
{
return new Symbol[]{DELAYED_DELIVERY};
return CAPABILITIES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -68,6 +67,8 @@
class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
private static final UnsettledAction DO_NOTHING_ACTION = new DoNothingAction();

private final boolean _acquires;

private long _deliveryTag = 0L;
Expand Down Expand Up @@ -189,55 +190,82 @@ public void doSend(final MessageInstanceConsumer consumer, final MessageInstance

headerSection = header.createEncodingRetainingSection();
}
List<QpidByteBuffer> payload = new ArrayList<>();
if(headerSection != null)

final EncodingRetainingSection<?> deliveryAnnotationsSection = message.getDeliveryAnnotationsSection();
final EncodingRetainingSection<?> messageAnnotationsSection = message.getMessageAnnotationsSection();
final EncodingRetainingSection<?> propertiesSection = message.getPropertiesSection();
final EncodingRetainingSection<?> applicationPropertiesSection = message.getApplicationPropertiesSection();
final EncodingRetainingSection<?> footerSection = message.getFooterSection();

final boolean bodyOnly = headerSection == null &&
deliveryAnnotationsSection == null &&
messageAnnotationsSection == null &&
propertiesSection == null &&
applicationPropertiesSection == null &&
footerSection == null;

if (bodyOnly)
{
payload.add(headerSection.getEncodedForm());
headerSection.dispose();
if (bodyContent != null)
{
transfer.setPayload(bodyContent);
bodyContent.dispose();
}
}
EncodingRetainingSection<?> section;
if((section = message.getDeliveryAnnotationsSection()) != null)
else
{
payload.add(section.getEncodedForm());
section.dispose();
}
final List<QpidByteBuffer> payload = new ArrayList<>();

if((section = message.getMessageAnnotationsSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if (headerSection != null)
{
payload.add(headerSection.getEncodedForm());
headerSection.dispose();
}

if((section = message.getPropertiesSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if (deliveryAnnotationsSection != null)
{
payload.add(deliveryAnnotationsSection.getEncodedForm());
deliveryAnnotationsSection.dispose();
}

if((section = message.getApplicationPropertiesSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if (messageAnnotationsSection != null)
{
payload.add(messageAnnotationsSection.getEncodedForm());
messageAnnotationsSection.dispose();
}

payload.add(bodyContent);
if (propertiesSection != null)
{
payload.add(propertiesSection.getEncodedForm());
propertiesSection.dispose();
}

if((section = message.getFooterSection()) != null)
{
payload.add(section.getEncodedForm());
section.dispose();
}
if (applicationPropertiesSection != null)
{
payload.add(applicationPropertiesSection.getEncodedForm());
applicationPropertiesSection.dispose();
}

try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload))
{
transfer.setPayload(combined);
}
if (bodyContent != null)
{
payload.add(bodyContent);
}

if (footerSection != null)
{
payload.add(footerSection.getEncodedForm());
footerSection.dispose();
}

payload.forEach(QpidByteBuffer::dispose);
try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload))
{
transfer.setPayload(combined);
}

payload.forEach(QpidByteBuffer::dispose);
}

byte[] data = new byte[8];
ByteBuffer.wrap(data).putLong(_deliveryTag++);
final Binary tag = new Binary(data);
final Binary tag = Binary.ofDeliveryTag(_deliveryTag++);

transfer.setDeliveryTag(tag);

Expand All @@ -249,7 +277,7 @@ public void doSend(final MessageInstanceConsumer consumer, final MessageInstance
transfer.setSettled(true);
if (_acquires && _transactionId == null)
{
transfer.setState(new Accepted());
transfer.setState(Accepted.INSTANCE);
}
}
else
Expand All @@ -262,7 +290,7 @@ public void doSend(final MessageInstanceConsumer consumer, final MessageInstance
}
else
{
action = new DoNothingAction();
action = DO_NOTHING_ACTION;
}

_linkEndpoint.addUnsettled(tag, action, entry);
Expand Down
Loading