Skip to content

Commit 83bec13

Browse files
committed
MINIFICPP-2596 Add XMLRecordSetWriter controller service
1 parent ff5b603 commit 83bec13

File tree

4 files changed

+634
-0
lines changed

4 files changed

+634
-0
lines changed

CONTROLLERS.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ limitations under the License.
3434
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
3535
- [VolatileMapStateStorage](#VolatileMapStateStorage)
3636
- [XMLReader](#XMLReader)
37+
- [XMLRecordSetWriter](#XMLRecordSetWriter)
3738

3839

3940
## AWSCredentialsService
@@ -366,3 +367,23 @@ In the list below, the names of required properties appear in bold. Any other pr
366367
| **Parse XML Attributes** | false | true<br/>false | When 'Schema Access Strategy' is 'Infer Schema' and this property is 'true' then XML attributes are parsed and added to the record as new fields. When the schema is inferred but this property is 'false', XML attributes and their values are ignored. |
367368
| Attribute Prefix | | | If this property is set, the name of attributes will be prepended with a prefix when they are added to a record. |
368369
| **Expect Records as Array** | false | true<br/>false | This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element". Because XML does not provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire XML blob with a "wrapper element". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element" that will be ignored. |
370+
371+
372+
## XMLRecordSetWriter
373+
374+
### Description
375+
376+
Writes a RecordSet to XML. The records are wrapped by a root tag.
377+
378+
### Properties
379+
380+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
381+
382+
| Name | Default Value | Allowable Values | Description |
383+
|-----------------------------|---------------|-----------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
384+
| Array Tag Name | | | Name of the tag used by property "Wrap Elements of Arrays" to write arrays |
385+
| **Wrap Elements of Arrays** | No Wrapping | Use Property as Wrapper<br/>Use Property for Elements<br/>No Wrapping | Specifies how the writer wraps elements of fields of type array |
386+
| **Omit XML Declaration** | false | true<br/>false | Specifies whether or not to include XML declaration |
387+
| **Pretty Print XML** | false | true<br/>false | Specifies whether or not the XML should be pretty printed |
388+
| **Name of Record Tag** | | | Specifies the name of the XML record tag wrapping the record fields. |
389+
| **Name of Root Tag** | | | Specifies the name of the XML root tag wrapping the record set. This property has to be defined if the writer is supposed to write multiple records in a single FlowFile. |
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "XMLRecordSetWriter.h"
18+
19+
#include "core/Resource.h"
20+
#include "Exception.h"
21+
#include "utils/TimeUtil.h"
22+
23+
namespace org::apache::nifi::minifi::standard {
24+
25+
void XMLRecordSetWriter::onEnable() {
26+
if (auto wrap_elements_of_arrays = magic_enum::enum_cast<WrapElementsOfArraysOptions>(getProperty(WrapElementsOfArrays.name).value_or("No Wrapping"))) {
27+
wrap_elements_of_arrays_ = *wrap_elements_of_arrays;
28+
} else {
29+
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for Wrap Elements of Arrays property: " + getProperty(WrapElementsOfArrays.name).value_or(""));
30+
}
31+
32+
array_tag_name_ = getProperty(ArrayTagName.name).value_or("");
33+
if (array_tag_name_.empty() &&
34+
(wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyAsWrapper ||
35+
wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyForElements)) {
36+
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Array Tag Name property must be set when Wrap Elements of Arrays is set to Use Property as Wrapper or Use Property for Elements");
37+
}
38+
39+
omit_xml_declaration_ = getProperty(OmitXMLDeclaration.name).value_or("false") == "true";
40+
pretty_print_xml_ = getProperty(PrettyPrintXML.name).value_or("false") == "true";
41+
42+
name_of_record_tag_ = getProperty(NameOfRecordTag.name).value_or("");
43+
if (name_of_record_tag_.empty()) {
44+
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Name of Record Tag property must be set");
45+
}
46+
47+
name_of_root_tag_ = getProperty(NameOfRootTag.name).value_or("");
48+
if (name_of_root_tag_.empty()) {
49+
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Name of Root Tag property must be set");
50+
}
51+
}
52+
53+
std::string XMLRecordSetWriter::formatXmlOutput(pugi::xml_document& xml_doc) const {
54+
std::ostringstream xml_string_stream;
55+
uint64_t xml_formatting_flags = 0;
56+
if (pretty_print_xml_) {
57+
xml_formatting_flags |= pugi::format_indent;
58+
} else {
59+
xml_formatting_flags |= pugi::format_raw;
60+
}
61+
if (omit_xml_declaration_) {
62+
xml_formatting_flags |= pugi::format_no_declaration;
63+
}
64+
xml_doc.save(xml_string_stream, " ", gsl::narrow<unsigned int>(xml_formatting_flags));
65+
return xml_string_stream.str();
66+
}
67+
68+
void XMLRecordSetWriter::convertRecordArrayField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const {
69+
const auto& record_array = std::get<core::RecordArray>(field.value_);
70+
pugi::xml_node array_node;
71+
if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyAsWrapper) {
72+
array_node = parent_node.append_child(array_tag_name_.c_str());
73+
} else if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyForElements) {
74+
array_node = parent_node.append_child(field_name.c_str());
75+
}
76+
for (const auto& array_field : record_array) {
77+
if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyAsWrapper) {
78+
convertRecordField(field_name, array_field, array_node);
79+
} else if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyForElements) {
80+
convertRecordField(array_tag_name_, array_field, array_node);
81+
} else {
82+
convertRecordField(field_name, array_field, parent_node);
83+
}
84+
}
85+
}
86+
87+
void XMLRecordSetWriter::convertRecordField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const {
88+
if (std::holds_alternative<core::RecordArray>(field.value_)) {
89+
convertRecordArrayField(field_name, field, parent_node);
90+
return;
91+
}
92+
93+
pugi::xml_node field_node = parent_node.append_child(field_name.c_str());
94+
if (std::holds_alternative<std::string>(field.value_)) {
95+
field_node.text().set(std::get<std::string>(field.value_));
96+
} else if (std::holds_alternative<int64_t>(field.value_)) {
97+
field_node.text().set(std::to_string(std::get<int64_t>(field.value_)).c_str());
98+
} else if (std::holds_alternative<uint64_t>(field.value_)) {
99+
field_node.text().set(std::to_string(std::get<uint64_t>(field.value_)).c_str());
100+
} else if (std::holds_alternative<double>(field.value_)) {
101+
field_node.text().set(fmt::format("{:g}", std::get<double>(field.value_)).c_str());
102+
} else if (std::holds_alternative<bool>(field.value_)) {
103+
field_node.text().set(std::get<bool>(field.value_) ? "true" : "false");
104+
} else if (std::holds_alternative<std::chrono::system_clock::time_point>(field.value_)) {
105+
auto time_point = std::get<std::chrono::system_clock::time_point>(field.value_);
106+
auto time_str = utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(time_point));
107+
field_node.text().set(time_str.c_str());
108+
} else if (std::holds_alternative<core::RecordObject>(field.value_)) {
109+
const auto& record_object = std::get<core::RecordObject>(field.value_);
110+
for (const auto& [obj_key, obj_field] : record_object) {
111+
convertRecordField(obj_key, obj_field, field_node);
112+
}
113+
}
114+
}
115+
116+
std::string XMLRecordSetWriter::convertRecordSetToXml(const core::RecordSet& record_set) const {
117+
gsl_Expects(!name_of_record_tag_.empty() && !name_of_root_tag_.empty());
118+
pugi::xml_document xml_doc;
119+
auto root_node = xml_doc.append_child(name_of_root_tag_.c_str());
120+
121+
for (const auto& record : record_set) {
122+
auto record_node = root_node.append_child(name_of_record_tag_.c_str());
123+
for (const auto& [key, field] : record) {
124+
convertRecordField(key, field, record_node);
125+
}
126+
}
127+
128+
return formatXmlOutput(xml_doc);
129+
}
130+
131+
void XMLRecordSetWriter::write(const core::RecordSet& record_set, const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) {
132+
if (!flow_file) {
133+
logger_->log_error("FlowFile is null, cannot write RecordSet to XML");
134+
return;
135+
}
136+
137+
auto xml_content = convertRecordSetToXml(record_set);
138+
session.write(flow_file, [&xml_content](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
139+
stream->write(reinterpret_cast<const uint8_t*>(xml_content.data()), xml_content.size());
140+
return gsl::narrow<int64_t>(xml_content.size());
141+
});
142+
}
143+
144+
REGISTER_RESOURCE(XMLRecordSetWriter, ControllerService);
145+
} // namespace org::apache::nifi::minifi::standard
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "controllers/RecordSetWriter.h"
20+
#include "core/PropertyDefinitionBuilder.h"
21+
#include "core/logging/Logger.h"
22+
#include "core/logging/LoggerFactory.h"
23+
#include "pugixml.hpp"
24+
25+
namespace org::apache::nifi::minifi::standard {
26+
enum class WrapElementsOfArraysOptions {
27+
UsePropertyAsWrapper,
28+
UsePropertyForElements,
29+
NoWrapping
30+
};
31+
} // namespace org::apache::nifi::minifi::standard
32+
33+
namespace magic_enum::customize {
34+
using WrapElementsOfArraysOptions = org::apache::nifi::minifi::standard::WrapElementsOfArraysOptions;
35+
36+
template <>
37+
constexpr customize_t enum_name<WrapElementsOfArraysOptions>(WrapElementsOfArraysOptions value) noexcept {
38+
switch (value) {
39+
case WrapElementsOfArraysOptions::UsePropertyAsWrapper:
40+
return "Use Property as Wrapper";
41+
case WrapElementsOfArraysOptions::UsePropertyForElements:
42+
return "Use Property for Elements";
43+
case WrapElementsOfArraysOptions::NoWrapping:
44+
return "No Wrapping";
45+
}
46+
return invalid_tag;
47+
}
48+
} // namespace magic_enum::customize
49+
50+
namespace org::apache::nifi::minifi::standard {
51+
52+
class XMLRecordSetWriter final : public core::RecordSetWriterImpl {
53+
public:
54+
explicit XMLRecordSetWriter(const std::string_view name, const utils::Identifier& uuid = {}) : RecordSetWriterImpl(name, uuid) {}
55+
56+
XMLRecordSetWriter(XMLRecordSetWriter&&) = delete;
57+
XMLRecordSetWriter(const XMLRecordSetWriter&) = delete;
58+
XMLRecordSetWriter& operator=(XMLRecordSetWriter&&) = delete;
59+
XMLRecordSetWriter& operator=(const XMLRecordSetWriter&) = delete;
60+
61+
~XMLRecordSetWriter() override = default;
62+
63+
EXTENSIONAPI static constexpr const char* Description = "Writes a RecordSet to XML. The records are wrapped by a root tag.";
64+
65+
EXTENSIONAPI static constexpr auto ArrayTagName = core::PropertyDefinitionBuilder<>::createProperty("Array Tag Name")
66+
.withDescription("Name of the tag used by property \"Wrap Elements of Arrays\" to write arrays")
67+
.build();
68+
EXTENSIONAPI static constexpr auto WrapElementsOfArrays = core::PropertyDefinitionBuilder<3>::createProperty("Wrap Elements of Arrays")
69+
.withDescription("Specifies how the writer wraps elements of fields of type array")
70+
.withDefaultValue(magic_enum::enum_name(WrapElementsOfArraysOptions::NoWrapping))
71+
.withAllowedValues(magic_enum::enum_names<WrapElementsOfArraysOptions>())
72+
.isRequired(true)
73+
.build();
74+
EXTENSIONAPI static constexpr auto OmitXMLDeclaration = core::PropertyDefinitionBuilder<>::createProperty("Omit XML Declaration")
75+
.withDescription("Specifies whether or not to include XML declaration")
76+
.isRequired(true)
77+
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
78+
.withDefaultValue("false")
79+
.build();
80+
EXTENSIONAPI static constexpr auto PrettyPrintXML = core::PropertyDefinitionBuilder<>::createProperty("Pretty Print XML")
81+
.withDescription("Specifies whether or not the XML should be pretty printed")
82+
.isRequired(true)
83+
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
84+
.withDefaultValue("false")
85+
.build();
86+
EXTENSIONAPI static constexpr auto NameOfRecordTag = core::PropertyDefinitionBuilder<>::createProperty("Name of Record Tag")
87+
.withDescription("Specifies the name of the XML record tag wrapping the record fields.")
88+
.withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
89+
.isRequired(true)
90+
.build();
91+
EXTENSIONAPI static constexpr auto NameOfRootTag = core::PropertyDefinitionBuilder<>::createProperty("Name of Root Tag")
92+
.withDescription("Specifies the name of the XML root tag wrapping the record set. This property has to be defined if the writer is supposed to write multiple records in a single FlowFile.")
93+
.withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR)
94+
.isRequired(true)
95+
.build();
96+
97+
EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 6>{
98+
ArrayTagName, WrapElementsOfArrays, OmitXMLDeclaration, PrettyPrintXML, NameOfRecordTag, NameOfRootTag
99+
};
100+
101+
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
102+
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
103+
104+
void write(const core::RecordSet& record_set, const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) override;
105+
106+
void initialize() override {
107+
setSupportedProperties(Properties);
108+
}
109+
void onEnable() override;
110+
void yield() override {}
111+
bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }
112+
bool isWorkAvailable() override { return false; }
113+
114+
private:
115+
std::string formatXmlOutput(pugi::xml_document& xml_doc) const;
116+
std::string convertRecordSetToXml(const core::RecordSet& record_set) const;
117+
void convertRecordArrayField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const;
118+
void convertRecordField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const;
119+
120+
WrapElementsOfArraysOptions wrap_elements_of_arrays_ = WrapElementsOfArraysOptions::NoWrapping;
121+
std::string array_tag_name_;
122+
bool omit_xml_declaration_ = false;
123+
bool pretty_print_xml_ = false;
124+
std::string name_of_record_tag_;
125+
std::string name_of_root_tag_;
126+
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<XMLRecordSetWriter>::getLogger();
127+
};
128+
129+
} // namespace org::apache::nifi::minifi::standard

0 commit comments

Comments
 (0)