Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
Expand Down Expand Up @@ -75,6 +76,7 @@
@ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"),
})
public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
private static final long MAXIMUM_INPUT_FLOWFILE_SIZE_LIMIT = 128 * 1024 * 1024L;

public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder()
.name("Exchange Name")
Expand All @@ -95,6 +97,16 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAXIMUM_INPUT_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Input FlowFile Size")
.description("Maximum size of an input FlowFile that will be read into memory before publishing. PublishAMQP reads FlowFile content into a byte array "
+ "before publishing, so FlowFiles larger than this value are routed to failure before content is read. Configure this value according to "
+ "broker limits and available JVM memory.")
.required(true)
.defaultValue("128 MB")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, MAXIMUM_INPUT_FLOWFILE_SIZE_LIMIT))
.build();
public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder()
.name("Headers Source")
.description("The source of the headers which will be applied to the published message.")
Expand Down Expand Up @@ -136,6 +148,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
Stream.of(
EXCHANGE,
ROUTING_KEY,
MAXIMUM_INPUT_FLOWFILE_SIZE,
HEADERS_SOURCE,
HEADERS_PATTERN,
HEADER_SEPARATOR
Expand All @@ -151,7 +164,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
/**
* Will construct AMQP message by extracting its body from the incoming {@link FlowFile}. AMQP Properties will be extracted from the
* {@link FlowFile} and converted to {@link BasicProperties} to be sent along with the message. Upon success the incoming {@link FlowFile} is
* transferred to 'success' {@link Relationship} and upon failure FlowFile is penalized and transferred to the 'failure' {@link Relationship}
* transferred to 'success' {@link Relationship} and upon failure FlowFile is transferred to the 'failure' {@link Relationship}
* <br>
* <p>
* NOTE: Attributes extracted from {@link FlowFile} are considered candidates for AMQP properties if their names are prefixed with
Expand All @@ -166,6 +179,14 @@ protected void processResource(final Connection connection, final AMQPPublisher
return;
}

final long maximumInputFlowFileSize = context.getProperty(MAXIMUM_INPUT_FLOWFILE_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
if (flowFile.getSize() > maximumInputFlowFileSize) {
getLogger().warn("FlowFile {} with size {} bytes exceeds configured maximum input FlowFile size of {} bytes; routing to failure",
flowFile, flowFile.getSize(), maximumInputFlowFileSize);
session.transfer(flowFile, REL_FAILURE);
return;
}

final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null) {
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
Expand All @@ -187,7 +208,7 @@ protected void processResource(final Connection connection, final AMQPPublisher
session.rollback();
throw e;
} catch (AMQPException e) {
session.transfer(session.penalize(flowFile), REL_FAILURE);
session.transfer(flowFile, REL_FAILURE);
throw e;
}

Expand Down Expand Up @@ -220,7 +241,7 @@ public void migrateProperties(final PropertyConfiguration config) {
* Extracts contents of the {@link FlowFile} as byte array.
*/
private byte[] extractMessage(final FlowFile flowFile, ProcessSession session) {
final byte[] messageContent = new byte[(int) flowFile.getSize()];
final byte[] messageContent = new byte[Math.toIntExact(flowFile.getSize())];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Math.toIntExact required since we filter out larger flowfiles before?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved by checking against the configured size limit before allocation, so the remaining cast is bounded.

session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true));
return messageContent;
}
Expand Down Expand Up @@ -317,16 +338,26 @@ private Map<String, Object> validateAMQPHeaderProperty(final String amqpPropValu
for (String strEntry : strEntries) {
final String[] kv = strEntry.split("=", -1); // without using limit, trailing delimiter would be ignored
if (kv.length == 2) {
headers.put(kv[0].trim(), kv[1].trim());
addHeader(headers, amqpPropValue, strEntry, kv[0], kv[1].trim());
} else if (kv.length == 1) {
headers.put(kv[0].trim(), null);
addHeader(headers, amqpPropValue, strEntry, kv[0], null);
} else {
getLogger().warn("Malformed key value pair in AMQP header property ({}): {}", amqpPropValue, strEntry);
}
}
return headers;
}

private void addHeader(final Map<String, Object> headers, final String amqpPropValue, final String strEntry, final String headerKey, final Object headerValue) {
final String trimmedHeaderKey = headerKey.trim();
if (trimmedHeaderKey.isEmpty()) {
getLogger().warn("Skipping AMQP header with empty key in property ({}): {}", amqpPropValue, strEntry);
return;
}

headers.put(trimmedHeaderKey, headerValue);
}

protected Pattern getPattern(ProcessContext context, InputHeaderSource selectedHeaderSource) {
return switch (selectedHeaderSource) {
case FLOWFILE_ATTRIBUTES -> Pattern.compile(context.getProperty(HEADERS_PATTERN).evaluateAttributeExpressions().getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.concurrent.ExecutorService;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class PublishAMQPTest {
Expand Down Expand Up @@ -151,7 +153,7 @@ public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception
expectedHeaders.put("foo3", null);

final Map<String, String> attributes = new HashMap<>();
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed");
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed||=ignored");

runner.enqueue("Hello Joe".getBytes(), attributes);

Expand All @@ -171,6 +173,33 @@ public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception
assertNotNull(channel.basicGet("queue2", true));
}

@Test
public void validateEmptyHeaderKeysIgnoredAndPublishToSuccess() throws Exception {
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.HEADER_SEPARATOR, "|");

final Map<String, Object> expectedHeaders = new HashMap<>();
expectedHeaders.put("foo", "bar");
expectedHeaders.put("foo2", null);
expectedHeaders.put("foo3", "");

final Map<String, String> attributes = new HashMap<>();
attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=bar|=missing| |foo2| foo3 = ");

runner.enqueue("Hello Joe".getBytes(), attributes);

runner.run();

final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).getFirst();
assertNotNull(successFF);

final Channel channel = pubProc.getConnection().createChannel();
final GetResponse msg1 = channel.basicGet("queue1", true);
assertNotNull(msg1);

assertEquals(expectedHeaders, msg1.getProps().getHeaders());
}

@Test
public void validateFailedPublishAndTransferToFailure() {
setConnectionProperties(runner);
Expand All @@ -181,7 +210,42 @@ public void validateFailedPublishAndTransferToFailure() {
runner.run();

assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst());
final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst();
assertNotNull(failureFlowFile);
assertFalse(failureFlowFile.isPenalized());
runner.assertPenalizeCount(0);
}

@Test
public void validateOversizedFlowFileTransferredToFailureWithoutPublishing() throws Exception {
setConnectionProperties(runner);
runner.setProperty(PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE, "4 B");

runner.enqueue("Hello".getBytes());

runner.run();

assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty());
final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst();
assertNotNull(failureFlowFile);
assertFalse(failureFlowFile.isPenalized());
runner.assertPenalizeCount(0);

final Channel channel = pubProc.getConnection().createChannel();
assertNull(channel.basicGet("queue1", true));
}

@Test
public void validateMaximumInputFlowFileSizeProperty() {
assertEquals("Maximum Input FlowFile Size", PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE.getName());
assertEquals("128 MB", PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE.getDefaultValue());

setConnectionProperties(runner);
runner.setProperty(PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE, "128 MB");
runner.assertValid();

runner.setProperty(PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE, "129 MB");
runner.assertNotValid();
}

@Test
Expand Down