Skip to content

Commit ff5b603

Browse files
committed
MINIFICPP-2594 Add XMLReader controller service
- Upgrade pugixml library to v1.15
1 parent c151058 commit ff5b603

File tree

9 files changed

+598
-64
lines changed

9 files changed

+598
-64
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,10 @@ if (ENABLE_ALL OR ENABLE_PROMETHEUS OR ENABLE_GRAFANA_LOKI OR ENABLE_CIVET)
356356
endif()
357357

358358
## Add extensions
359+
360+
# PugiXML required for standard processors and WEL extension
361+
include(PugiXml)
362+
359363
file(GLOB extension-directories "extensions/*")
360364
foreach(extension-dir ${extension-directories})
361365
if (IS_DIRECTORY ${extension-dir} AND EXISTS ${extension-dir}/CMakeLists.txt)

CONTROLLERS.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ limitations under the License.
3333
- [SSLContextService](#SSLContextService)
3434
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
3535
- [VolatileMapStateStorage](#VolatileMapStateStorage)
36+
- [XMLReader](#XMLReader)
3637

3738

3839
## AWSCredentialsService
@@ -347,3 +348,21 @@ In the list below, the names of required properties appear in bold. Any other pr
347348
| Name | Default Value | Allowable Values | Description |
348349
|-----------------|---------------|------------------|--------------------------------|
349350
| Linked Services | | | Referenced Controller Services |
351+
352+
353+
## XMLReader
354+
355+
### Description
356+
357+
Reads XML content and creates Record objects. Records are expected in the second level of XML data, embedded in an enclosing root tag. Types for records are inferred automatically based on the content of the XML tags. For timestamps, the format is expected to be ISO 8601 compliant.
358+
359+
### Properties
360+
361+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
362+
363+
| Name | Default Value | Allowable Values | Description |
364+
|-----------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
365+
| Field Name for Content | | | If tags with content (e. g. <field>content</field>) are defined as nested records in the schema, the name of the tag will be used as name for the record and the value of this property will be used as name for the field. If the tag contains subnodes besides the content (e.g. <field>content<subfield>subcontent</subfield></field>), or a node attribute is present, we need to define a name for the text content, so that it can be distinguished from the subnodes. If this property is not set, the default name 'value' will be used for the text content of the tag in this case. |
366+
| **Parse XML Attributes** | false | true<br/>false | When 'Schema Access Strategy' is 'Infer Schema' and this property is 'true' then XML attributes are parsed and added to the record as new fields. When the schema is inferred but this property is 'false', XML attributes and their values are ignored. |
367+
| Attribute Prefix | | | If this property is set, the name of attributes will be prepended with a prefix when they are added to a record. |
368+
| **Expect Records as Array** | false | true<br/>false | This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element". Because XML does not provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire XML blob with a "wrapper element". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element" that will be ignored. |

cmake/BundledPugiXml.cmake

Lines changed: 0 additions & 59 deletions
This file was deleted.

cmake/PugiXml.cmake

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
include(FetchContent)
18+
19+
set(PUGIXML_BUILD_TESTS OFF CACHE BOOL "" FORCE)
20+
21+
FetchContent_Declare(
22+
pugixml
23+
URL https://github.com/zeux/pugixml/archive/refs/tags/v1.15.tar.gz
24+
URL_HASH SHA256=b39647064d9e28297a34278bfb897092bf33b7c487906ddfc094c9e8868bddcb
25+
)
26+
FetchContent_MakeAvailable(pugixml)

extensions/standard-processors/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ target_include_directories(minifi-standard-processors PUBLIC "${CMAKE_SOURCE_DIR
2727

2828
include(RangeV3)
2929
include(Asio)
30-
target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio)
30+
target_link_libraries(minifi-standard-processors ${LIBMINIFI} Threads::Threads range-v3 asio pugixml)
3131

3232
include(Coroutines)
3333
enable_coroutines()
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "XMLReader.h"
19+
20+
#include <algorithm>
21+
22+
#include "core/Resource.h"
23+
#include "utils/TimeUtil.h"
24+
#include "utils/gsl.h"
25+
26+
namespace org::apache::nifi::minifi::standard {
27+
28+
namespace {
29+
bool hasChildNodes(const pugi::xml_node& node) {
30+
return std::any_of(node.begin(), node.end(), [] (const pugi::xml_node& child) {
31+
return child.type() == pugi::node_element;
32+
});
33+
}
34+
35+
void addRecordFieldToObject(core::RecordObject& record_object, const std::string& name, const core::RecordField& field) {
36+
auto it = record_object.find(name);
37+
if (it == record_object.end()) {
38+
record_object.emplace(name, field);
39+
return;
40+
}
41+
42+
if (std::holds_alternative<core::RecordArray>(it->second.value_)) {
43+
std::get<core::RecordArray>(it->second.value_).emplace_back(field);
44+
return;
45+
}
46+
47+
core::RecordArray array;
48+
array.emplace_back(it->second);
49+
array.emplace_back(field);
50+
it->second = core::RecordField(std::move(array));
51+
}
52+
} // namespace
53+
54+
void XMLReader::writeRecordField(core::RecordObject& record_object, const std::string& name, const std::string& value, bool write_pcdata_node) const {
55+
// If the name is the value set in the Field Name for Content property, we should only add this value to the RecordObject if we are writing a plain character data node.
56+
if (!write_pcdata_node && name == field_name_for_content_) {
57+
return;
58+
}
59+
60+
if (value == "true" || value == "false") {
61+
addRecordFieldToObject(record_object, name, core::RecordField(value == "true"));
62+
return;
63+
} else if (auto date = utils::timeutils::parseDateTimeStr(value)) {
64+
addRecordFieldToObject(record_object, name, core::RecordField(*date));
65+
return;
66+
} else if (auto date = utils::timeutils::parseRfc3339(value)) {
67+
addRecordFieldToObject(record_object, name, core::RecordField(*date));
68+
return;
69+
}
70+
71+
if (std::all_of(value.begin(), value.end(), ::isdigit)) {
72+
try {
73+
uint64_t value_as_uint64 = std::stoull(value);
74+
addRecordFieldToObject(record_object, name, core::RecordField(value_as_uint64));
75+
return;
76+
} catch (const std::exception&) {
77+
}
78+
}
79+
80+
if (value.starts_with('-') && std::all_of(value.begin() + 1, value.end(), ::isdigit)) {
81+
try {
82+
int64_t value_as_int64 = std::stoll(value);
83+
addRecordFieldToObject(record_object, name, core::RecordField(value_as_int64));
84+
return;
85+
} catch (const std::exception&) {
86+
}
87+
}
88+
89+
try {
90+
auto value_as_double = std::stod(value);
91+
addRecordFieldToObject(record_object, name, core::RecordField(value_as_double));
92+
return;
93+
} catch (const std::exception&) {
94+
}
95+
96+
addRecordFieldToObject(record_object, name, core::RecordField(value));
97+
}
98+
99+
void XMLReader::writeRecordFieldFromXmlNode(core::RecordObject& record_object, const pugi::xml_node& node) const {
100+
writeRecordField(record_object, node.name(), node.child_value());
101+
}
102+
103+
void XMLReader::parseNodeElement(core::RecordObject& record_object, const pugi::xml_node& node) const {
104+
gsl_Expects(node.type() == pugi::node_element);
105+
if (parse_xml_attributes_ && node.first_attribute()) {
106+
core::RecordObject child_record_object;
107+
for (const pugi::xml_attribute& attr : node.attributes()) {
108+
writeRecordField(child_record_object, attribute_prefix_ + attr.name(), attr.value());
109+
}
110+
parseXmlNode(child_record_object, node);
111+
record_object.emplace(node.name(), core::RecordField(std::move(child_record_object)));
112+
return;
113+
}
114+
115+
if (hasChildNodes(node)) {
116+
core::RecordObject child_record_object;
117+
parseXmlNode(child_record_object, node);
118+
record_object.emplace(node.name(), core::RecordField(std::move(child_record_object)));
119+
return;
120+
}
121+
122+
writeRecordFieldFromXmlNode(record_object, node);
123+
}
124+
125+
void XMLReader::parseXmlNode(core::RecordObject& record_object, const pugi::xml_node& node) const {
126+
std::string pc_data_value;
127+
for (pugi::xml_node child : node.children()) {
128+
if (child.type() == pugi::node_element) {
129+
parseNodeElement(record_object, child);
130+
} else if (child.type() == pugi::node_pcdata) {
131+
pc_data_value.append(child.value());
132+
}
133+
}
134+
135+
if (!pc_data_value.empty()) {
136+
writeRecordField(record_object, field_name_for_content_, pc_data_value, true);
137+
}
138+
}
139+
140+
void XMLReader::addRecordFromXmlNode(const pugi::xml_node& node, core::RecordSet& record_set) const {
141+
core::RecordObject record_object;
142+
parseXmlNode(record_object, node);
143+
core::Record record(std::move(record_object));
144+
record_set.emplace_back(std::move(record));
145+
}
146+
147+
bool XMLReader::parseRecordsFromXml(core::RecordSet& record_set, const std::string& xml_content) const {
148+
pugi::xml_document doc;
149+
if (!doc.load_string(xml_content.c_str())) {
150+
logger_->log_error("Failed to parse XML content: {}", xml_content);
151+
return false;
152+
}
153+
154+
if (expect_records_as_array_) {
155+
pugi::xml_node root = doc.first_child();
156+
for (pugi::xml_node record_node : root.children()) {
157+
addRecordFromXmlNode(record_node, record_set);
158+
}
159+
return true;
160+
}
161+
162+
pugi::xml_node root = doc.first_child();
163+
if (!root.first_child()) {
164+
logger_->log_info("XML content does not contain any records: {}", xml_content);
165+
return true;
166+
}
167+
addRecordFromXmlNode(root, record_set);
168+
return true;
169+
}
170+
171+
void XMLReader::onEnable() {
172+
field_name_for_content_ = getProperty(FieldNameForContent.name).value_or("value");
173+
parse_xml_attributes_ = getProperty(ParseXMLAttributes.name).value_or("false") == "true";
174+
attribute_prefix_ = getProperty(AttributePrefix.name).value_or("");
175+
expect_records_as_array_ = getProperty(ExpectRecordsAsArray.name).value_or("false") == "true";
176+
}
177+
178+
nonstd::expected<core::RecordSet, std::error_code> XMLReader::read(io::InputStream& input_stream) {
179+
core::RecordSet record_set{};
180+
const auto read_result = [this, &record_set](io::InputStream& input_stream) -> int64_t {
181+
std::string content;
182+
content.resize(input_stream.size());
183+
const auto read_ret = gsl::narrow<int64_t>(input_stream.read(as_writable_bytes(std::span(content))));
184+
if (io::isError(read_ret)) {
185+
logger_->log_error("Failed to read XML data from input stream");
186+
return -1;
187+
}
188+
if (!parseRecordsFromXml(record_set, content)) {
189+
return -1;
190+
}
191+
return read_ret;
192+
}(input_stream);
193+
if (io::isError(read_result)) {
194+
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
195+
}
196+
return record_set;
197+
}
198+
199+
REGISTER_RESOURCE(XMLReader, ControllerService);
200+
} // namespace org::apache::nifi::minifi::standard

0 commit comments

Comments
 (0)