diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 97bc07a6f710..b98d28c85db6 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -33,6 +33,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.migration.PropertyConfiguration; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -75,6 +76,7 @@ @ReadsAttribute(attribute = AbstractAMQPProcessor.AMQP_CLUSTER_ID_ATTRIBUTE, description = "The ID of the AMQP Cluster"), }) public class PublishAMQP extends AbstractAMQPProcessor { + private static final long MAXIMUM_INPUT_FLOWFILE_SIZE_LIMIT = 128 * 1024 * 1024L; public static final PropertyDescriptor EXCHANGE = new PropertyDescriptor.Builder() .name("Exchange Name") @@ -95,6 +97,16 @@ public class PublishAMQP extends AbstractAMQPProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor MAXIMUM_INPUT_FLOWFILE_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Input FlowFile Size") + .description("Maximum size of an input FlowFile that will be read into memory before publishing. PublishAMQP reads FlowFile content into a byte array " + + "before publishing, so FlowFiles larger than this value are routed to failure before content is read. Configure this value according to " + + "broker limits and available JVM memory.") + .required(true) + .defaultValue("128 MB") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, MAXIMUM_INPUT_FLOWFILE_SIZE_LIMIT)) + .build(); public static final PropertyDescriptor HEADERS_SOURCE = new PropertyDescriptor.Builder() .name("Headers Source") .description("The source of the headers which will be applied to the published message.") @@ -136,6 +148,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { Stream.of( EXCHANGE, ROUTING_KEY, + MAXIMUM_INPUT_FLOWFILE_SIZE, HEADERS_SOURCE, HEADERS_PATTERN, HEADER_SEPARATOR @@ -151,7 +164,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { /** * Will construct AMQP message by extracting its body from the incoming {@link FlowFile}. AMQP Properties will be extracted from the * {@link FlowFile} and converted to {@link BasicProperties} to be sent along with the message. Upon success the incoming {@link FlowFile} is - * transferred to 'success' {@link Relationship} and upon failure FlowFile is penalized and transferred to the 'failure' {@link Relationship} + * transferred to 'success' {@link Relationship} and upon failure FlowFile is transferred to the 'failure' {@link Relationship} *
*

* NOTE: Attributes extracted from {@link FlowFile} are considered candidates for AMQP properties if their names are prefixed with @@ -166,6 +179,14 @@ protected void processResource(final Connection connection, final AMQPPublisher return; } + final long maximumInputFlowFileSize = context.getProperty(MAXIMUM_INPUT_FLOWFILE_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue(); + if (flowFile.getSize() > maximumInputFlowFileSize) { + getLogger().warn("FlowFile {} with size {} bytes exceeds configured maximum input FlowFile size of {} bytes; routing to failure", + flowFile, flowFile.getSize(), maximumInputFlowFileSize); + session.transfer(flowFile, REL_FAILURE); + return; + } + final String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue(); if (routingKey == null) { throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" @@ -187,7 +208,7 @@ protected void processResource(final Connection connection, final AMQPPublisher session.rollback(); throw e; } catch (AMQPException e) { - session.transfer(session.penalize(flowFile), REL_FAILURE); + session.transfer(flowFile, REL_FAILURE); throw e; } @@ -220,7 +241,7 @@ public void migrateProperties(final PropertyConfiguration config) { * Extracts contents of the {@link FlowFile} as byte array. */ private byte[] extractMessage(final FlowFile flowFile, ProcessSession session) { - final byte[] messageContent = new byte[(int) flowFile.getSize()]; + final byte[] messageContent = new byte[Math.toIntExact(flowFile.getSize())]; session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true)); return messageContent; } @@ -317,9 +338,9 @@ private Map validateAMQPHeaderProperty(final String amqpPropValu for (String strEntry : strEntries) { final String[] kv = strEntry.split("=", -1); // without using limit, trailing delimiter would be ignored if (kv.length == 2) { - headers.put(kv[0].trim(), kv[1].trim()); + addHeader(headers, amqpPropValue, strEntry, kv[0], kv[1].trim()); } else if (kv.length == 1) { - headers.put(kv[0].trim(), null); + addHeader(headers, amqpPropValue, strEntry, kv[0], null); } else { getLogger().warn("Malformed key value pair in AMQP header property ({}): {}", amqpPropValue, strEntry); } @@ -327,6 +348,16 @@ private Map validateAMQPHeaderProperty(final String amqpPropValu return headers; } + private void addHeader(final Map headers, final String amqpPropValue, final String strEntry, final String headerKey, final Object headerValue) { + final String trimmedHeaderKey = headerKey.trim(); + if (trimmedHeaderKey.isEmpty()) { + getLogger().warn("Skipping AMQP header with empty key in property ({}): {}", amqpPropValue, strEntry); + return; + } + + headers.put(trimmedHeaderKey, headerValue); + } + protected Pattern getPattern(ProcessContext context, InputHeaderSource selectedHeaderSource) { return switch (selectedHeaderSource) { case FLOWFILE_ATTRIBUTES -> Pattern.compile(context.getProperty(HEADERS_PATTERN).evaluateAttributeExpressions().getValue()); diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index e2a2f0697aed..85064c4a3c49 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -37,7 +37,9 @@ import java.util.concurrent.ExecutorService; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class PublishAMQPTest { @@ -151,7 +153,7 @@ public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception expectedHeaders.put("foo3", null); final Map attributes = new HashMap<>(); - attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed"); + attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=(bar,bar)|foo2=bar2|foo3|foo4=malformed=|foo5=mal=formed||=ignored"); runner.enqueue("Hello Joe".getBytes(), attributes); @@ -171,6 +173,33 @@ public void validateMalformedHeaderIgnoredAndPublishToSuccess() throws Exception assertNotNull(channel.basicGet("queue2", true)); } + @Test + public void validateEmptyHeaderKeysIgnoredAndPublishToSuccess() throws Exception { + setConnectionProperties(runner); + runner.setProperty(PublishAMQP.HEADER_SEPARATOR, "|"); + + final Map expectedHeaders = new HashMap<>(); + expectedHeaders.put("foo", "bar"); + expectedHeaders.put("foo2", null); + expectedHeaders.put("foo3", ""); + + final Map attributes = new HashMap<>(); + attributes.put(AbstractAMQPProcessor.AMQP_HEADERS_ATTRIBUTE, "foo=bar|=missing| |foo2| foo3 = "); + + runner.enqueue("Hello Joe".getBytes(), attributes); + + runner.run(); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).getFirst(); + assertNotNull(successFF); + + final Channel channel = pubProc.getConnection().createChannel(); + final GetResponse msg1 = channel.basicGet("queue1", true); + assertNotNull(msg1); + + assertEquals(expectedHeaders, msg1.getProps().getHeaders()); + } + @Test public void validateFailedPublishAndTransferToFailure() { setConnectionProperties(runner); @@ -181,7 +210,42 @@ public void validateFailedPublishAndTransferToFailure() { runner.run(); assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); - assertNotNull(runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst()); + final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst(); + assertNotNull(failureFlowFile); + assertFalse(failureFlowFile.isPenalized()); + runner.assertPenalizeCount(0); + } + + @Test + public void validateOversizedFlowFileTransferredToFailureWithoutPublishing() throws Exception { + setConnectionProperties(runner); + runner.setProperty(PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE, "4 B"); + + runner.enqueue("Hello".getBytes()); + + runner.run(); + + assertTrue(runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); + final MockFlowFile failureFlowFile = runner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst(); + assertNotNull(failureFlowFile); + assertFalse(failureFlowFile.isPenalized()); + runner.assertPenalizeCount(0); + + final Channel channel = pubProc.getConnection().createChannel(); + assertNull(channel.basicGet("queue1", true)); + } + + @Test + public void validateMaximumInputFlowFileSizeProperty() { + assertEquals("Maximum Input FlowFile Size", PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE.getName()); + assertEquals("128 MB", PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE.getDefaultValue()); + + setConnectionProperties(runner); + runner.setProperty(PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE, "128 MB"); + runner.assertValid(); + + runner.setProperty(PublishAMQP.MAXIMUM_INPUT_FLOWFILE_SIZE, "129 MB"); + runner.assertNotValid(); } @Test