diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7f6ea33548..8a3ed89cc0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -381,6 +381,10 @@ if (ENABLE_ALL OR ENABLE_PROMETHEUS OR ENABLE_GRAFANA_LOKI OR ENABLE_CIVET)
endif()
## Add extensions
+
+# PugiXML required for standard processors and WEL extension
+include(PugiXml)
+
file(GLOB extension-directories "extensions/*")
foreach(extension-dir ${extension-directories})
if (IS_DIRECTORY ${extension-dir} AND EXISTS ${extension-dir}/CMakeLists.txt)
diff --git a/CONTROLLERS.md b/CONTROLLERS.md
index be3313ca2a..5e325f999b 100644
--- a/CONTROLLERS.md
+++ b/CONTROLLERS.md
@@ -32,6 +32,7 @@ limitations under the License.
- [SSLContextService](#SSLContextService)
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
- [VolatileMapStateStorage](#VolatileMapStateStorage)
+- [XMLReader](#XMLReader)
## AWSCredentialsService
@@ -332,3 +333,21 @@ In the list below, the names of required properties appear in bold. Any other pr
| Name | Default Value | Allowable Values | Description |
|-----------------|---------------|------------------|--------------------------------|
| Linked Services | | | Referenced Controller Services |
+
+
+## XMLReader
+
+### Description
+
+Reads XML content and creates Record objects. Records are expected in the second level of XML data, embedded in an enclosing root tag. Types for records are inferred automatically based on the content of the XML tags. For timestamps, the format is expected to be ISO 8601 compliant.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|-----------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Field Name for Content | | | If tags with content (e. g. content) are defined as nested records in the schema, the name of the tag will be used as name for the record and the value of this property will be used as name for the field. If the tag contains subnodes besides the content (e.g. contentsubcontent), or a node attribute is present, we need to define a name for the text content, so that it can be distinguished from the subnodes. If this property is not set, the default name 'value' will be used for the text content of the tag in this case. |
+| **Parse XML Attributes** | false | true
false | When this property is 'true' then XML attributes are parsed and added to the record as new fields, otherwise XML attributes and their values are ignored. |
+| Attribute Prefix | | | If this property is set, the name of attributes will be prepended with a prefix when they are added to a record. |
+| **Expect Records as Array** | false | true
false | This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element". Because XML does not provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire XML blob with a "wrapper element". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element" that will be ignored. |
diff --git a/LICENSE b/LICENSE
index 28eb012c24..9a547ba6be 100644
--- a/LICENSE
+++ b/LICENSE
@@ -2354,29 +2354,6 @@ This product bundles 'zlib' within 'OpenCV' under the following license:
Comments) 1950 to 1952 in the files http://tools.ietf.org/html/rfc1950
(zlib format), rfc1951 (deflate format) and rfc1952 (gzip format).
-This product bundles 'TinyXml2' within 'AWS SDK for C++' under a zlib license:
-
-Original code by Lee Thomason (www.grinninglizard.com)
-
-This software is provided 'as-is', without any express or implied
-warranty. In no event will the authors be held liable for any
-damages arising from the use of this software.
-
-Permission is granted to anyone to use this software for any
-purpose, including commercial applications, and to alter it and
-redistribute it freely, subject to the following restrictions:
-
-1. The origin of this software must not be misrepresented; you must
-not claim that you wrote the original software. If you use this
-software in a product, an acknowledgment in the product documentation
-would be appreciated but is not required.
-
-2. Altered source versions must be plainly marked as such, and
-must not be misrepresented as being the original software.
-
-3. This notice may not be removed or altered from any source
-distribution.
-
This product bundles 'cJSON' within 'AWS SDK for C++' under an MIT license:
diff --git a/NOTICE b/NOTICE
index e32c361174..455ad3fecf 100644
--- a/NOTICE
+++ b/NOTICE
@@ -43,7 +43,6 @@ THIRD PARTY COMPONENTS
This software includes third party software subject to the following copyrights:
- Very fast, header-only/compiled, C++ logging library from spdlog - Copyright (c) 2016 Gabi Melman
- An open-source formatting library for C++ from fmt - Copyright (c) 2012 - present, Victor Zverovich
-- XML parsing and utility functions from TinyXml2 - Lee Thomason
- JSON parsing and utility functions from JsonCpp - Copyright (c) 2007-2010 Baptiste Lepilleur
- OpenSSL build files for cmake used for Android Builds - Copyright (C) 2007-2012 LuaDist and Copyright (C) 2013 Brian Sidebotham
- Android tool chain cmake build files - Copyright (c) 2010-2011, Ethan Rublee and Copyright (c) 2011-2014, Andrey Kamaev
@@ -78,6 +77,7 @@ This software includes third party software subject to the following copyrights:
- llhttp - Copyright Fedor Indutny, 2018.
- benchmark - Copyright 2015 Google Inc.
- llama.cpp - Copyright (c) 2023-2024 The ggml authors
+- pugixml - Copyright (C) 2003, by Kristen Wegner (kristen@tima.net)
The licenses for these third party components are included in LICENSE.txt
diff --git a/cmake/BundledPugiXml.cmake b/cmake/BundledPugiXml.cmake
deleted file mode 100644
index d2a07da717..0000000000
--- a/cmake/BundledPugiXml.cmake
+++ /dev/null
@@ -1,59 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-function(use_bundled_pugixml SOURCE_DIR BINARY_DIR)
- # Define byproducts
- if (WIN32)
- set(BYPRODUCT "lib/pugixml.lib")
- else()
- set(BYPRODUCT "lib/libpugixml.a")
- endif()
-
- # Set build options
- set(PUGI_BYPRODUCT_DIR "${CMAKE_CURRENT_BINARY_DIR}/thirdparty/pugixml-install")
-
- set(PUGI_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
- "-DCMAKE_INSTALL_PREFIX=${PUGI_BYPRODUCT_DIR}"
- "-DBUILD_TESTS=OFF"
- "-DBUILD_SHARED_AND_STATIC_LIBS=OFF"
- "-DBUILD_SHARED_LIBS=OFF")
-
- # Build project
- ExternalProject_Add(
- pugixml-external
- URL "https://github.com/zeux/pugixml/releases/download/v1.9/pugixml-1.9.tar.gz"
- URL_HASH "SHA256=d156d35b83f680e40fd6412c4455fdd03544339779134617b9b28d19e11fdba6"
- SOURCE_DIR "${BINARY_DIR}/thirdparty/pugixml-src"
- CMAKE_ARGS ${PUGI_CMAKE_ARGS}
- BUILD_BYPRODUCTS "${PUGI_BYPRODUCT_DIR}/${BYPRODUCT}"
- EXCLUDE_FROM_ALL TRUE
- DOWNLOAD_NO_PROGRESS TRUE
- TLS_VERIFY TRUE
- )
-
- # Set variables
- set(PUGIXML_FOUND "YES" CACHE STRING "" FORCE)
- set(PUGIXML_INCLUDE_DIR "${PUGI_BYPRODUCT_DIR}/include" CACHE STRING "" FORCE)
- set(PUGIXML_LIBRARY "${PUGI_BYPRODUCT_DIR}/${BYPRODUCT}" CACHE STRING "" FORCE)
-
- # Create imported targets
- add_library(PUGI::libpugixml STATIC IMPORTED)
- set_target_properties(PUGI::libpugixml PROPERTIES IMPORTED_LOCATION "${PUGIXML_LIBRARY}")
- add_dependencies(PUGI::libpugixml pugixml-external)
- file(MAKE_DIRECTORY ${PUGIXML_INCLUDE_DIR})
- set_property(TARGET PUGI::libpugixml APPEND PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${PUGIXML_INCLUDE_DIR})
-endfunction(use_bundled_pugixml)
diff --git a/cmake/PugiXml.cmake b/cmake/PugiXml.cmake
new file mode 100644
index 0000000000..ba5a4df386
--- /dev/null
+++ b/cmake/PugiXml.cmake
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+include(FetchContent)
+
+set(PUGIXML_BUILD_TESTS OFF CACHE BOOL "" FORCE)
+
+FetchContent_Declare(
+ pugixml
+ URL https://github.com/zeux/pugixml/archive/refs/tags/v1.15.tar.gz
+ URL_HASH SHA256=b39647064d9e28297a34278bfb897092bf33b7c487906ddfc094c9e8868bddcb
+)
+FetchContent_MakeAvailable(pugixml)
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index 7c643c0876..6d2608348b 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -27,7 +27,7 @@ target_include_directories(minifi-standard-processors PUBLIC "${CMAKE_SOURCE_DIR
include(RangeV3)
include(Asio)
-target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
+target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio pugixml)
include(Coroutines)
enable_coroutines()
diff --git a/extensions/standard-processors/controllers/XMLReader.cpp b/extensions/standard-processors/controllers/XMLReader.cpp
new file mode 100644
index 0000000000..56749b258e
--- /dev/null
+++ b/extensions/standard-processors/controllers/XMLReader.cpp
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "XMLReader.h"
+
+#include
+#include
+
+#include "core/Resource.h"
+#include "utils/TimeUtil.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::standard {
+
+namespace {
+bool hasChildNodes(const pugi::xml_node& node) {
+ return std::ranges::any_of(node, [] (const pugi::xml_node& child) {
+ return child.type() == pugi::node_element;
+ });
+}
+
+void addRecordFieldToObject(core::RecordObject& record_object, const std::string& name, const core::RecordField& field) {
+ auto it = record_object.find(name);
+ if (it == record_object.end()) {
+ record_object.emplace(name, field);
+ return;
+ }
+
+ if (std::holds_alternative(it->second.value_)) {
+ std::get(it->second.value_).emplace_back(field);
+ return;
+ }
+
+ core::RecordArray array;
+ array.emplace_back(it->second);
+ array.emplace_back(field);
+ it->second = core::RecordField(std::move(array));
+}
+} // namespace
+
+void XMLReader::writeRecordField(core::RecordObject& record_object, const std::string& name, const std::string& value, bool write_pcdata_node) const {
+ // If the name is the value set in the Field Name for Content property, we should only add this value to the RecordObject if we are writing a plain character data node.
+ if (!write_pcdata_node && name == field_name_for_content_) {
+ return;
+ }
+
+ if (value == "true" || value == "false") {
+ addRecordFieldToObject(record_object, name, core::RecordField(value == "true"));
+ return;
+ } else if (auto date = utils::timeutils::parseDateTimeStr(value)) {
+ addRecordFieldToObject(record_object, name, core::RecordField(*date));
+ return;
+ } else if (auto date = utils::timeutils::parseRfc3339(value)) {
+ addRecordFieldToObject(record_object, name, core::RecordField(*date));
+ return;
+ }
+
+ if (std::ranges::all_of(value, ::isdigit)) {
+ try {
+ uint64_t value_as_uint64 = std::stoull(value);
+ addRecordFieldToObject(record_object, name, core::RecordField(value_as_uint64));
+ return;
+ } catch (const std::exception&) {
+ }
+ }
+
+ if (value.starts_with('-') && std::ranges::all_of(value | std::views::drop(1), ::isdigit)) {
+ try {
+ int64_t value_as_int64 = std::stoll(value);
+ addRecordFieldToObject(record_object, name, core::RecordField(value_as_int64));
+ return;
+ } catch (const std::exception&) {
+ }
+ }
+
+ try {
+ auto value_as_double = std::stod(value);
+ addRecordFieldToObject(record_object, name, core::RecordField(value_as_double));
+ return;
+ } catch (const std::exception&) {
+ }
+
+ addRecordFieldToObject(record_object, name, core::RecordField(value));
+}
+
+void XMLReader::parseNodeElement(core::RecordObject& record_object, const pugi::xml_node& node) const {
+ gsl_Expects(node.type() == pugi::node_element);
+ if (parse_xml_attributes_ && node.first_attribute()) {
+ core::RecordObject child_record_object;
+ for (const pugi::xml_attribute& attr : node.attributes()) {
+ writeRecordField(child_record_object, attribute_prefix_ + attr.name(), attr.value());
+ }
+ parseXmlNode(child_record_object, node);
+ addRecordFieldToObject(record_object, node.name(), core::RecordField(std::move(child_record_object)));
+ return;
+ }
+
+ if (hasChildNodes(node)) {
+ core::RecordObject child_record_object;
+ parseXmlNode(child_record_object, node);
+ addRecordFieldToObject(record_object, node.name(), core::RecordField(std::move(child_record_object)));
+ return;
+ }
+
+ writeRecordField(record_object, node.name(), node.child_value());
+}
+
+void XMLReader::parseXmlNode(core::RecordObject& record_object, const pugi::xml_node& node) const {
+ std::string pc_data_value;
+ for (pugi::xml_node child : node.children()) {
+ if (child.type() == pugi::node_element) {
+ parseNodeElement(record_object, child);
+ } else if (child.type() == pugi::node_pcdata) {
+ pc_data_value.append(child.value());
+ }
+ }
+
+ if (!pc_data_value.empty()) {
+ writeRecordField(record_object, field_name_for_content_, pc_data_value, true);
+ }
+}
+
+void XMLReader::addRecordFromXmlNode(const pugi::xml_node& node, core::RecordSet& record_set) const {
+ core::RecordObject record_object;
+ parseXmlNode(record_object, node);
+ core::Record record(std::move(record_object));
+ record_set.emplace_back(std::move(record));
+}
+
+bool XMLReader::parseRecordsFromXml(core::RecordSet& record_set, const std::string& xml_content) const {
+ pugi::xml_document doc;
+ if (!doc.load_string(xml_content.c_str())) {
+ logger_->log_error("Failed to parse XML content: {}", xml_content);
+ return false;
+ }
+
+ if (expect_records_as_array_) {
+ pugi::xml_node root = doc.first_child();
+ for (pugi::xml_node record_node : root.children()) {
+ addRecordFromXmlNode(record_node, record_set);
+ }
+ return true;
+ }
+
+ pugi::xml_node root = doc.first_child();
+ if (!root.first_child()) {
+ logger_->log_info("XML content does not contain any records: {}", xml_content);
+ return true;
+ }
+ addRecordFromXmlNode(root, record_set);
+ return true;
+}
+
+void XMLReader::onEnable() {
+ auto parseBoolProperty = [this](std::string_view property_name) -> bool {
+ if (auto property_value_str = getProperty(property_name); property_value_str && !property_value_str->empty()) {
+ if (auto property_value = parsing::parseBool(*property_value_str)) {
+ return *property_value;
+ }
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid value for {} property: {}", property_name, *property_value_str));
+ }
+ return false;
+ };
+
+ field_name_for_content_ = getProperty(FieldNameForContent.name).value_or("value");
+ parse_xml_attributes_ = parseBoolProperty(ParseXMLAttributes.name);
+ attribute_prefix_ = getProperty(AttributePrefix.name).value_or("");
+ expect_records_as_array_ = parseBoolProperty(ExpectRecordsAsArray.name);
+}
+
+nonstd::expected XMLReader::read(io::InputStream& input_stream) {
+ core::RecordSet record_set{};
+ const auto read_result = [this, &record_set](io::InputStream& input_stream) -> size_t {
+ std::string content;
+ content.resize(input_stream.size());
+ const auto read_ret = input_stream.read(as_writable_bytes(std::span(content)));
+ if (io::isError(read_ret)) {
+ logger_->log_error("Failed to read XML data from input stream");
+ return io::STREAM_ERROR;
+ }
+ if (!parseRecordsFromXml(record_set, content)) {
+ return io::STREAM_ERROR;
+ }
+ return read_ret;
+ }(input_stream);
+ if (io::isError(read_result)) {
+ return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
+ }
+ return record_set;
+}
+
+REGISTER_RESOURCE(XMLReader, ControllerService);
+} // namespace org::apache::nifi::minifi::standard
diff --git a/extensions/standard-processors/controllers/XMLReader.h b/extensions/standard-processors/controllers/XMLReader.h
new file mode 100644
index 0000000000..44b045cf13
--- /dev/null
+++ b/extensions/standard-processors/controllers/XMLReader.h
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "controllers/RecordSetReader.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerFactory.h"
+#include "pugixml.hpp"
+
+namespace org::apache::nifi::minifi::standard {
+
+class XMLReader final : public core::RecordSetReaderImpl {
+ public:
+ explicit XMLReader(const std::string_view name, const utils::Identifier& uuid = {}) : RecordSetReaderImpl(name, uuid) {}
+
+ XMLReader(XMLReader&&) = delete;
+ XMLReader(const XMLReader&) = delete;
+ XMLReader& operator=(XMLReader&&) = delete;
+ XMLReader& operator=(const XMLReader&) = delete;
+
+ ~XMLReader() override = default;
+
+ EXTENSIONAPI static constexpr const char* Description = "Reads XML content and creates Record objects. Records are expected in the second level of XML data, embedded in an enclosing root tag. "
+ "Types for records are inferred automatically based on the content of the XML tags. For timestamps, the format is expected to be ISO 8601 compliant.";
+
+ EXTENSIONAPI static constexpr auto FieldNameForContent = core::PropertyDefinitionBuilder<>::createProperty("Field Name for Content")
+ .withDescription("If tags with content (e. g. content) are defined as nested records in the schema, the name of the tag will be used as name for the record and the value of "
+ "this property will be used as name for the field. If the tag contains subnodes besides the content (e.g. contentsubcontent), "
+ "or a node attribute is present, we need to define a name for the text content, so that it can be distinguished from the subnodes. If this property is not set, the default "
+ "name 'value' will be used for the text content of the tag in this case.")
+ .build();
+ EXTENSIONAPI static constexpr auto ParseXMLAttributes = core::PropertyDefinitionBuilder<>::createProperty("Parse XML Attributes")
+ .withDescription("When this property is 'true' then XML attributes are parsed and added to the record as new fields, otherwise XML attributes and their values are ignored.")
+ .isRequired(true)
+ .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
+ .withDefaultValue("false")
+ .build();
+ EXTENSIONAPI static constexpr auto AttributePrefix = core::PropertyDefinitionBuilder<>::createProperty("Attribute Prefix")
+ .withDescription("If this property is set, the name of attributes will be prepended with a prefix when they are added to a record.")
+ .build();
+ EXTENSIONAPI static constexpr auto ExpectRecordsAsArray = core::PropertyDefinitionBuilder<>::createProperty("Expect Records as Array")
+ .withDescription("This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\". Because XML does not provide "
+ "for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire XML blob "
+ "with a \"wrapper element\". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\" "
+ "that will be ignored.")
+ .isRequired(true)
+ .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
+ .withDefaultValue("false")
+ .build();
+
+ EXTENSIONAPI static constexpr auto Properties = std::array{FieldNameForContent, ParseXMLAttributes, AttributePrefix, ExpectRecordsAsArray};
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ EXTENSIONAPI static constexpr auto ImplementsApis = std::array{ RecordSetReader::ProvidesApi };
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+ nonstd::expected read(io::InputStream& input_stream) override;
+
+ void initialize() override {
+ setSupportedProperties(Properties);
+ }
+ void onEnable() override;
+ void yield() override {}
+ bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }
+ bool isWorkAvailable() override { return false; }
+
+ private:
+ void writeRecordField(core::RecordObject& record_object, const std::string& name, const std::string& value, bool write_pcdata_node = false) const;
+ void parseNodeElement(core::RecordObject& record_object, const pugi::xml_node& node) const;
+ void parseXmlNode(core::RecordObject& record_object, const pugi::xml_node& node) const;
+ void addRecordFromXmlNode(const pugi::xml_node& node, core::RecordSet& record_set) const;
+ bool parseRecordsFromXml(core::RecordSet& record_set, const std::string& xml_content) const;
+
+ std::string field_name_for_content_;
+ bool parse_xml_attributes_ = false;
+ std::string attribute_prefix_;
+ bool expect_records_as_array_ = false;
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger();
+};
+
+} // namespace org::apache::nifi::minifi::standard
diff --git a/extensions/standard-processors/tests/unit/JsonRecordTests.cpp b/extensions/standard-processors/tests/unit/JsonRecordTests.cpp
index 858c1bb902..f6df180660 100644
--- a/extensions/standard-processors/tests/unit/JsonRecordTests.cpp
+++ b/extensions/standard-processors/tests/unit/JsonRecordTests.cpp
@@ -12,7 +12,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License.c
+ * limitations under the License.
*/
#include
@@ -80,9 +80,9 @@ constexpr std::string_view array_pretty_str = R"([
bool testJsonEquality(const std::string_view expected_str, const std::string_view actual_str) {
rapidjson::Document expected;
- expected.Parse(expected_str.data());
+ expected.Parse(expected_str.data(), expected_str.size());
rapidjson::Document actual;
- actual.Parse(actual_str.data());
+ actual.Parse(actual_str.data(), actual_str.size());
return actual == expected;
}
@@ -100,7 +100,7 @@ TEST_CASE("JsonRecordSetWriter tests") {
CHECK(json_record_set_writer.setProperty(JsonRecordSetWriter::OutputGrouping.name, output_grouping));
CHECK(json_record_set_writer.setProperty(JsonRecordSetWriter::PrettyPrint.name, prety_print));
json_record_set_writer.onEnable();
- CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, [expected = output_str](auto serialized_record_set) -> bool {
+ CHECK(core::test::testRecordWriter(json_record_set_writer, record_set, [expected = output_str](const auto& serialized_record_set) -> bool {
return testJsonEquality(expected, serialized_record_set);
}));
}
diff --git a/extensions/standard-processors/tests/unit/RecordSetTests.cpp b/extensions/standard-processors/tests/unit/RecordSetTests.cpp
index 7d1db9994c..9fe9ef3f8a 100644
--- a/extensions/standard-processors/tests/unit/RecordSetTests.cpp
+++ b/extensions/standard-processors/tests/unit/RecordSetTests.cpp
@@ -12,7 +12,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License.c
+ * limitations under the License.
*/
#include
diff --git a/extensions/standard-processors/tests/unit/XMLReaderTests.cpp b/extensions/standard-processors/tests/unit/XMLReaderTests.cpp
new file mode 100644
index 0000000000..a987151276
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/XMLReaderTests.cpp
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include
+
+#include "catch2/generators/catch_generators.hpp"
+#include "catch2/catch_approx.hpp"
+#include "controllers/XMLReader.h"
+#include "unit/Catch.h"
+#include "unit/TestBase.h"
+
+namespace org::apache::nifi::minifi::standard::test {
+
+class XMLReaderTestFixture {
+ public:
+ XMLReaderTestFixture() : xml_reader_("XMLReader") {
+ LogTestController::getInstance().clear();
+ LogTestController::getInstance().setTrace();
+ }
+
+ auto readRecordsFromXml(const std::string& xml_input, const std::unordered_map& properties = {}) {
+ initializeTestObject(xml_input, properties);
+ return xml_reader_.read(buffer_stream_);
+ }
+
+ private:
+ void initializeTestObject(const std::string& xml_input, const std::unordered_map& properties = {}) {
+ xml_reader_.initialize();
+ for (const auto& [key, value] : properties) {
+ REQUIRE(xml_reader_.setProperty(key, std::string{value}));
+ }
+ xml_reader_.onEnable();
+ buffer_stream_.write(reinterpret_cast(xml_input.data()), xml_input.size());
+ }
+
+ XMLReader xml_reader_;
+ io::BufferStream buffer_stream_;
+};
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Invalid XML input or empty input results in error", "[XMLReader]") {
+ const std::string xml_input = GENERATE("", "");
+ auto record_set = readRecordsFromXml(xml_input);
+ REQUIRE_FALSE(record_set);
+ REQUIRE(LogTestController::getInstance().contains("Failed to parse XML content: " + xml_input));
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with only root node results in empty record set", "[XMLReader]") {
+ auto record_set = readRecordsFromXml("");
+ REQUIRE(record_set);
+ REQUIRE(record_set->empty());
+ REQUIRE(LogTestController::getInstance().contains("XML content does not contain any records: "));
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML contains a single data node results in a single record with default content field name key", "[XMLReader]") {
+ auto record_set = readRecordsFromXml("text");
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(record.at("value").value_) == "text");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with one empty node", "[XMLReader]") {
+ auto record_set = readRecordsFromXml("");
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(record.at("node").value_).empty());
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with a single string child node results in a single record", "[XMLReader]") {
+ auto record_set = readRecordsFromXml("text");
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(record.at("child").value_) == "text");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with several child nodes with different types result in a single record", "[XMLReader]") {
+ const std::string xml_input = "text42-23true3.142023-03-15T12:34:56Z";
+ auto record_set = readRecordsFromXml(xml_input);
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(record.at("string").value_) == "text");
+ CHECK(std::get(record.at("number").value_) == 42);
+ CHECK(std::get(record.at("signed").value_) == -23);
+ CHECK(std::get(record.at("boolean").value_) == true);
+ CHECK(std::get(record.at("double").value_) == Catch::Approx(3.14));
+ auto timestamp = std::get(record.at("timestamp").value_);
+ auto expected_time = utils::timeutils::parseRfc3339("2023-03-15T12:34:56Z");
+ REQUIRE(expected_time);
+ CHECK(timestamp == *expected_time);
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with multiple subnodes result in a single record with record object", "[XMLReader]") {
+ const std::string xml_input = "text1text2text3";
+ auto record_set = readRecordsFromXml(xml_input);
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ auto record_object = std::get(record.at("node").value_);
+ REQUIRE(record_object.size() == 2);
+ CHECK(std::get(record_object.at("subnode1").value_) == "text1");
+ CHECK(std::get(std::get(record_object.at("subnode2").value_).at("subsub1").value_) == "text2");
+ CHECK(std::get(std::get(record_object.at("subnode2").value_).at("subsub2").value_) == "text3");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with nodes and text data is parsed correctly", "[XMLReader]") {
+ const std::string xml_input = "outtext1nodetextsubtextouttext2";
+ auto record_set = readRecordsFromXml(xml_input);
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(std::get(record.at("node").value_).at("subnode").value_) == "subtext");
+ CHECK(std::get(std::get(record.at("node").value_).at("value").value_) == "nodetext");
+ CHECK(std::get(record.at("value").value_) == "outtext1outtext2");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML with same nodes are converted to arrays", "[XMLReader]") {
+ const std::string xml_input = "- value1
- value2
";
+ auto record_set = readRecordsFromXml(xml_input);
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ auto& array_field = std::get(record.at("array").value_);
+ REQUIRE(array_field.size() == 1);
+ auto& item_array = std::get(array_field.at("item").value_);
+ REQUIRE(item_array.size() == 2);
+ CHECK(std::get(item_array[0].value_) == "value1");
+ CHECK(std::get(item_array[1].value_) == "value2");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "XML nodes with default value tag are ignored if text data is present", "[XMLReader]") {
+ const std::string xml_input = "s1s2s3";
+ auto record_set = readRecordsFromXml(xml_input);
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(record.at("value").value_) == "s1");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Specify Field Name for Content property for tagless values", "[XMLReader]") {
+ const std::string xml_input = "outtextnodetext";
+ auto record_set = readRecordsFromXml(xml_input, {{XMLReader::FieldNameForContent.name, "tagvalue"}});
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(record.at("node").value_) == "nodetext");
+ CHECK(std::get(record.at("tagvalue").value_) == "outtext");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Parse attributes as record fields if Parse XML Attributes property is set", "[XMLReader]") {
+ const std::string xml_input = R"(nodetext)";
+ auto record_set = readRecordsFromXml(xml_input, {{XMLReader::ParseXMLAttributes.name, "true"}});
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ CHECK(std::get(std::get(record.at("node").value_).at("attribute").value_) == "attr_value");
+ CHECK(std::get(std::get(record.at("node").value_).at("value").value_) == "nodetext");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Parse attributes as in an XML with nested node array", "[XMLReader]") {
+ const std::string xml_input = R"(1nodetext2)";
+ auto record_set = readRecordsFromXml(xml_input, {{XMLReader::ParseXMLAttributes.name, "true"}});
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ auto& node_object = std::get(record.at("node").value_);
+ CHECK(node_object.size() == 3);
+ CHECK(std::get(node_object.at("attribute").value_) == "attr_value");
+ CHECK(std::get(node_object.at("value").value_) == "nodetext");
+ auto& subnodes = std::get(node_object.at("subnode").value_);
+ CHECK(subnodes.size() == 2);
+ const auto& subnode_object = std::get(subnodes[0].value_);
+ CHECK(std::get(subnode_object.at("subattr").value_) == "subattr_value");
+ CHECK(std::get(subnode_object.at("value").value_) == 1);
+ CHECK(std::get(subnodes[1].value_) == 2);
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Attributes clashing with the content field name are ignored", "[XMLReader]") {
+ const std::string xml_input = R"(value)";
+ auto record_set = readRecordsFromXml(xml_input, {
+ {XMLReader::ParseXMLAttributes.name, "true"},
+ {XMLReader::FieldNameForContent.name, "tagvalue"}
+ });
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ auto& node_object = std::get(record.at("node").value_);
+ auto& a_object = std::get(node_object.at("subnode").value_);
+ CHECK(a_object.size() == 2);
+ CHECK(std::get(a_object.at("attr").value_) == "attr_value");
+ CHECK(std::get(a_object.at("tagvalue").value_) == "value");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Nodes clashing with the content field name are ignored", "[XMLReader]") {
+ const std::string xml_input = R"(valueignored)";
+ auto record_set = readRecordsFromXml(xml_input, {{XMLReader::FieldNameForContent.name, "tagvalue"}});
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ auto& node_object = std::get(record.at("node").value_);
+ CHECK(node_object.size() == 1);
+ CHECK(std::get(node_object.at("tagvalue").value_) == "value");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Attributes are prefixed with the defined prefix", "[XMLReader]") {
+ const std::string xml_input = R"(value)";
+ auto record_set = readRecordsFromXml(xml_input, {
+ {XMLReader::ParseXMLAttributes.name, "true"},
+ {XMLReader::FieldNameForContent.name, "fieldname"},
+ {XMLReader::AttributePrefix.name, "attr_"}
+ });
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 1);
+ auto& record = record_set->at(0);
+ auto& node_object = std::get(record.at("node").value_);
+ auto& a_object = std::get(node_object.at("subnode").value_);
+ CHECK(a_object.size() == 3);
+ CHECK(std::get(a_object.at("attr_mykey").value_) == "myattrval");
+ CHECK(std::get(a_object.at("attr_fieldname").value_) == "myattrval2");
+ CHECK(std::get(a_object.at("fieldname").value_) == "value");
+}
+
+TEST_CASE_METHOD(XMLReaderTestFixture, "Read multiple records from XML", "[XMLReader]") {
+ const std::string xml_input = "TonyBobHelloHi!";
+ auto record_set = readRecordsFromXml(xml_input, {{XMLReader::ExpectRecordsAsArray.name, "true"}});
+ REQUIRE(record_set);
+ REQUIRE(record_set->size() == 2);
+ auto& record1 = record_set->at(0);
+ auto& message_record = std::get(record1.at("message").value_);
+ CHECK(message_record.size() == 3);
+ CHECK(std::get(message_record.at("from").value_) == "Tony");
+ CHECK(std::get(message_record.at("to").value_) == "Bob");
+ CHECK(std::get(message_record.at("body").value_) == "Hello");
+ auto& record2 = record_set->at(1);
+ CHECK(std::get(record2.at("value").value_) == "Hi!");
+}
+
+} // namespace org::apache::nifi::minifi::standard::test
diff --git a/extensions/windows-event-log/CMakeLists.txt b/extensions/windows-event-log/CMakeLists.txt
index 38bd6ff9c5..c10179ee86 100644
--- a/extensions/windows-event-log/CMakeLists.txt
+++ b/extensions/windows-event-log/CMakeLists.txt
@@ -21,9 +21,6 @@ if (NOT (WIN32 AND (ENABLE_ALL OR ENABLE_WEL)))
return()
endif()
-include(BundledPugiXml)
-use_bundled_pugixml(${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
-
include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
file(GLOB SOURCES "*.cpp" "wel/*.cpp")
@@ -31,6 +28,6 @@ file(GLOB SOURCES "*.cpp" "wel/*.cpp")
add_minifi_library(minifi-wel SHARED ${SOURCES})
target_link_libraries(minifi-wel ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-wel PUGI::libpugixml ZLIB::ZLIB Wevtapi.lib)
+target_link_libraries(minifi-wel pugixml ZLIB::ZLIB Wevtapi.lib)
register_extension(minifi-wel "WEL EXTENSIONS" WEL-EXTENSION "Enables the suite of Windows Event Log extensions." "extensions/windows-event-log/tests")
diff --git a/minifi-api/include/minifi-cpp/core/Record.h b/minifi-api/include/minifi-cpp/core/Record.h
index 684da33acf..5c2ca3a615 100644
--- a/minifi-api/include/minifi-cpp/core/Record.h
+++ b/minifi-api/include/minifi-cpp/core/Record.h
@@ -30,6 +30,7 @@ namespace org::apache::nifi::minifi::core {
class Record final {
public:
Record() = default;
+ Record(core::RecordObject&& record_object) : fields_(std::move(record_object)) {}
Record(Record&& rhs) noexcept = default;
Record& operator=(Record&& rhs) noexcept = default;