Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ if (ENABLE_ALL OR ENABLE_PROMETHEUS OR ENABLE_GRAFANA_LOKI OR ENABLE_CIVET)
endif()

## Add extensions

# PugiXML required for standard processors and WEL extension
include(PugiXml)

file(GLOB extension-directories "extensions/*")
foreach(extension-dir ${extension-directories})
if (IS_DIRECTORY ${extension-dir} AND EXISTS ${extension-dir}/CMakeLists.txt)
Expand Down
19 changes: 19 additions & 0 deletions CONTROLLERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ limitations under the License.
- [SSLContextService](#SSLContextService)
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
- [VolatileMapStateStorage](#VolatileMapStateStorage)
- [XMLReader](#XMLReader)


## AWSCredentialsService
Expand Down Expand Up @@ -332,3 +333,21 @@ In the list below, the names of required properties appear in bold. Any other pr
| Name | Default Value | Allowable Values | Description |
|-----------------|---------------|------------------|--------------------------------|
| Linked Services | | | Referenced Controller Services |


## XMLReader

### Description

Reads XML content and creates Record objects. Records are expected in the second level of XML data, embedded in an enclosing root tag. Types for records are inferred automatically based on the content of the XML tags. For timestamps, the format is expected to be ISO 8601 compliant.

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|-----------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Field Name for Content | | | If tags with content (e. g. <field>content</field>) are defined as nested records in the schema, the name of the tag will be used as name for the record and the value of this property will be used as name for the field. If the tag contains subnodes besides the content (e.g. <field>content<subfield>subcontent</subfield></field>), or a node attribute is present, we need to define a name for the text content, so that it can be distinguished from the subnodes. If this property is not set, the default name 'value' will be used for the text content of the tag in this case. |
| **Parse XML Attributes** | false | true<br/>false | When this property is 'true' then XML attributes are parsed and added to the record as new fields, otherwise XML attributes and their values are ignored. |
| Attribute Prefix | | | If this property is set, the name of attributes will be prepended with a prefix when they are added to a record. |
| **Expect Records as Array** | false | true<br/>false | This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element". Because XML does not provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire XML blob with a "wrapper element". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element" that will be ignored. |
23 changes: 0 additions & 23 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -2354,29 +2354,6 @@ This product bundles 'zlib' within 'OpenCV' under the following license:
Comments) 1950 to 1952 in the files http://tools.ietf.org/html/rfc1950
(zlib format), rfc1951 (deflate format) and rfc1952 (gzip format).

This product bundles 'TinyXml2' within 'AWS SDK for C++' under a zlib license:

Original code by Lee Thomason (www.grinninglizard.com)

This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any
damages arising from the use of this software.

Permission is granted to anyone to use this software for any
purpose, including commercial applications, and to alter it and
redistribute it freely, subject to the following restrictions:

1. The origin of this software must not be misrepresented; you must
not claim that you wrote the original software. If you use this
software in a product, an acknowledgment in the product documentation
would be appreciated but is not required.

2. Altered source versions must be plainly marked as such, and
must not be misrepresented as being the original software.

3. This notice may not be removed or altered from any source
distribution.


This product bundles 'cJSON' within 'AWS SDK for C++' under an MIT license:

Expand Down
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ THIRD PARTY COMPONENTS
This software includes third party software subject to the following copyrights:
- Very fast, header-only/compiled, C++ logging library from spdlog - Copyright (c) 2016 Gabi Melman
- An open-source formatting library for C++ from fmt - Copyright (c) 2012 - present, Victor Zverovich
- XML parsing and utility functions from TinyXml2 - Lee Thomason
- JSON parsing and utility functions from JsonCpp - Copyright (c) 2007-2010 Baptiste Lepilleur
- OpenSSL build files for cmake used for Android Builds - Copyright (C) 2007-2012 LuaDist and Copyright (C) 2013 Brian Sidebotham
- Android tool chain cmake build files - Copyright (c) 2010-2011, Ethan Rublee and Copyright (c) 2011-2014, Andrey Kamaev
Expand Down Expand Up @@ -78,6 +77,7 @@ This software includes third party software subject to the following copyrights:
- llhttp - Copyright Fedor Indutny, 2018.
- benchmark - Copyright 2015 Google Inc.
- llama.cpp - Copyright (c) 2023-2024 The ggml authors
- pugixml - Copyright (C) 2003, by Kristen Wegner ([email protected])

The licenses for these third party components are included in LICENSE.txt

Expand Down
59 changes: 0 additions & 59 deletions cmake/BundledPugiXml.cmake

This file was deleted.

26 changes: 26 additions & 0 deletions cmake/PugiXml.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
include(FetchContent)

set(PUGIXML_BUILD_TESTS OFF CACHE BOOL "" FORCE)

FetchContent_Declare(
pugixml
URL https://github.com/zeux/pugixml/archive/refs/tags/v1.15.tar.gz
URL_HASH SHA256=b39647064d9e28297a34278bfb897092bf33b7c487906ddfc094c9e8868bddcb
)
FetchContent_MakeAvailable(pugixml)
2 changes: 1 addition & 1 deletion extensions/standard-processors/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ target_include_directories(minifi-standard-processors PUBLIC "${CMAKE_SOURCE_DIR

include(RangeV3)
include(Asio)
target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio pugixml)

include(Coroutines)
enable_coroutines()
Expand Down
207 changes: 207 additions & 0 deletions extensions/standard-processors/controllers/XMLReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "XMLReader.h"

#include <algorithm>
#include <ranges>

#include "core/Resource.h"
#include "utils/TimeUtil.h"
#include "utils/gsl.h"

namespace org::apache::nifi::minifi::standard {

namespace {
bool hasChildNodes(const pugi::xml_node& node) {
return std::ranges::any_of(node, [] (const pugi::xml_node& child) {
return child.type() == pugi::node_element;
});
}

void addRecordFieldToObject(core::RecordObject& record_object, const std::string& name, const core::RecordField& field) {
auto it = record_object.find(name);
if (it == record_object.end()) {
record_object.emplace(name, field);
return;
}

if (std::holds_alternative<core::RecordArray>(it->second.value_)) {
std::get<core::RecordArray>(it->second.value_).emplace_back(field);
return;
}

core::RecordArray array;
array.emplace_back(it->second);
array.emplace_back(field);
it->second = core::RecordField(std::move(array));
}
} // namespace

void XMLReader::writeRecordField(core::RecordObject& record_object, const std::string& name, const std::string& value, bool write_pcdata_node) const {
// If the name is the value set in the Field Name for Content property, we should only add this value to the RecordObject if we are writing a plain character data node.
if (!write_pcdata_node && name == field_name_for_content_) {
return;
}

if (value == "true" || value == "false") {
addRecordFieldToObject(record_object, name, core::RecordField(value == "true"));
return;
} else if (auto date = utils::timeutils::parseDateTimeStr(value)) {
addRecordFieldToObject(record_object, name, core::RecordField(*date));
return;
} else if (auto date = utils::timeutils::parseRfc3339(value)) {
addRecordFieldToObject(record_object, name, core::RecordField(*date));
return;
}

if (std::ranges::all_of(value, ::isdigit)) {
try {
uint64_t value_as_uint64 = std::stoull(value);
addRecordFieldToObject(record_object, name, core::RecordField(value_as_uint64));
return;
} catch (const std::exception&) {
}
}

if (value.starts_with('-') && std::ranges::all_of(value | std::views::drop(1), ::isdigit)) {
try {
int64_t value_as_int64 = std::stoll(value);
addRecordFieldToObject(record_object, name, core::RecordField(value_as_int64));
return;
} catch (const std::exception&) {
}
}

try {
auto value_as_double = std::stod(value);
addRecordFieldToObject(record_object, name, core::RecordField(value_as_double));
return;
} catch (const std::exception&) {
}

addRecordFieldToObject(record_object, name, core::RecordField(value));
}

void XMLReader::parseNodeElement(core::RecordObject& record_object, const pugi::xml_node& node) const {
gsl_Expects(node.type() == pugi::node_element);
if (parse_xml_attributes_ && node.first_attribute()) {
core::RecordObject child_record_object;
for (const pugi::xml_attribute& attr : node.attributes()) {
writeRecordField(child_record_object, attribute_prefix_ + attr.name(), attr.value());
}
parseXmlNode(child_record_object, node);
addRecordFieldToObject(record_object, node.name(), core::RecordField(std::move(child_record_object)));
return;
}

if (hasChildNodes(node)) {
core::RecordObject child_record_object;
parseXmlNode(child_record_object, node);
addRecordFieldToObject(record_object, node.name(), core::RecordField(std::move(child_record_object)));
return;
}

writeRecordField(record_object, node.name(), node.child_value());
}

void XMLReader::parseXmlNode(core::RecordObject& record_object, const pugi::xml_node& node) const {
std::string pc_data_value;
for (pugi::xml_node child : node.children()) {
if (child.type() == pugi::node_element) {
parseNodeElement(record_object, child);
} else if (child.type() == pugi::node_pcdata) {
pc_data_value.append(child.value());
}
}

if (!pc_data_value.empty()) {
writeRecordField(record_object, field_name_for_content_, pc_data_value, true);
}
}

void XMLReader::addRecordFromXmlNode(const pugi::xml_node& node, core::RecordSet& record_set) const {
core::RecordObject record_object;
parseXmlNode(record_object, node);
core::Record record(std::move(record_object));
record_set.emplace_back(std::move(record));
}

bool XMLReader::parseRecordsFromXml(core::RecordSet& record_set, const std::string& xml_content) const {
pugi::xml_document doc;
if (!doc.load_string(xml_content.c_str())) {
logger_->log_error("Failed to parse XML content: {}", xml_content);
return false;
}

if (expect_records_as_array_) {
pugi::xml_node root = doc.first_child();
for (pugi::xml_node record_node : root.children()) {
addRecordFromXmlNode(record_node, record_set);
}
return true;
}

pugi::xml_node root = doc.first_child();
if (!root.first_child()) {
logger_->log_info("XML content does not contain any records: {}", xml_content);
return true;
}
addRecordFromXmlNode(root, record_set);
return true;
}

void XMLReader::onEnable() {
auto parseBoolProperty = [this](std::string_view property_name) -> bool {
if (auto property_value_str = getProperty(property_name); property_value_str && !property_value_str->empty()) {
if (auto property_value = parsing::parseBool(*property_value_str)) {
return *property_value;
}
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid value for {} property: {}", property_name, *property_value_str));
}
return false;
};

field_name_for_content_ = getProperty(FieldNameForContent.name).value_or("value");
parse_xml_attributes_ = parseBoolProperty(ParseXMLAttributes.name);
attribute_prefix_ = getProperty(AttributePrefix.name).value_or("");
expect_records_as_array_ = parseBoolProperty(ExpectRecordsAsArray.name);
}

nonstd::expected<core::RecordSet, std::error_code> XMLReader::read(io::InputStream& input_stream) {
core::RecordSet record_set{};
const auto read_result = [this, &record_set](io::InputStream& input_stream) -> size_t {
std::string content;
content.resize(input_stream.size());
const auto read_ret = input_stream.read(as_writable_bytes(std::span(content)));
if (io::isError(read_ret)) {
logger_->log_error("Failed to read XML data from input stream");
return io::STREAM_ERROR;
}
if (!parseRecordsFromXml(record_set, content)) {
return io::STREAM_ERROR;
}
return read_ret;
}(input_stream);
if (io::isError(read_result)) {
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
}
return record_set;
}

REGISTER_RESOURCE(XMLReader, ControllerService);
} // namespace org::apache::nifi::minifi::standard
Loading
Loading