Skip to content

Commit ef7dbcb

Browse files
lordgamezfgerlits
authored andcommitted
MINIFICPP-2595 Create ConvertRecord processor
Signed-off-by: Ferenc Gerlits <[email protected]> Closes #1996
1 parent f28961c commit ef7dbcb

File tree

8 files changed

+352
-0
lines changed

8 files changed

+352
-0
lines changed

PROCESSORS.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ limitations under the License.
2727
- [ConsumeKafka](#ConsumeKafka)
2828
- [ConsumeMQTT](#ConsumeMQTT)
2929
- [ConsumeWindowsEventLog](#ConsumeWindowsEventLog)
30+
- [ConvertRecord](#ConvertRecord)
3031
- [DefragmentText](#DefragmentText)
3132
- [DeleteAzureBlobStorage](#DeleteAzureBlobStorage)
3233
- [DeleteAzureDataLakeStorage](#DeleteAzureDataLakeStorage)
@@ -474,6 +475,37 @@ In the list below, the names of required properties appear in bold. Any other pr
474475
| success | Relationship for successfully consumed events. |
475476

476477

478+
## ConvertRecord
479+
480+
### Description
481+
482+
Converts records from one data format to another using configured Record Reader and Record Writer Controller Services.
483+
484+
### Properties
485+
486+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
487+
488+
| Name | Default Value | Allowable Values | Description |
489+
|-----------------------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
490+
| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data |
491+
| **Record Writer** | | | Specifies the Controller Service to use for writing out the records |
492+
| **Include Zero Record FlowFiles** | true | true<br/>false | When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship. |
493+
494+
### Relationships
495+
496+
| Name | Description |
497+
|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
498+
| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship |
499+
| success | FlowFiles that are successfully transformed will be routed to this relationship |
500+
501+
### Output Attributes
502+
503+
| Attribute | Relationship | Description |
504+
|----------------------|--------------|-------------------------------------------------------------------------------------------|
505+
| record.count | success | The number of records in the FlowFile |
506+
| record.error.message | failure | This attribute provides on failure the error message encountered by the Reader or Writer. |
507+
508+
477509
## DefragmentText
478510

479511
### Description

docker/test/integration/features/core_functionality.feature

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,18 @@ Feature: Core flow functionalities
123123
| config_format |
124124
| yaml |
125125
| json |
126+
127+
@CORE
128+
Scenario: ConvertRecord processor can convert records from one format to another
129+
Given a XMLReader controller service is set up
130+
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
131+
And a GenerateFlowFile processor with the "Data Format" property set to "Text"
132+
And the "Custom Text" property of the GenerateFlowFile processor is set to "<record><numbers>1</numbers><numbers>2</numbers></record>"
133+
And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false"
134+
And a ConvertRecord processor with the "Record Reader" property set to "XMLReader"
135+
And the "Record Writer" property of the ConvertRecord processor is set to "JsonRecordSetWriter"
136+
And the "success" relationship of the GenerateFlowFile processor is connected to the ConvertRecord
137+
And a PutFile processor with the "Directory" property set to "/tmp/output"
138+
And the "success" relationship of the ConvertRecord processor is connected to the PutFile
139+
When the MiNiFi instance starts up
140+
Then a flowfile with the JSON content '[{"numbers":[1,2]}]' is placed in the monitored directory in less than 60 seconds

docker/test/integration/features/steps/steps.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
2727
from minifi.controllers.JsonTreeReader import JsonTreeReader
2828
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
29+
from minifi.controllers.XMLReader import XMLReader
2930

3031
from behave import given, then, when
3132
from behave.model_describe import ModelDescriptor
@@ -405,6 +406,13 @@ def step_impl(context):
405406
container.add_controller(json_record_set_reader)
406407

407408

409+
@given("a XMLReader controller service is set up")
410+
def step_impl(context):
411+
xml_reader = XMLReader("XMLReader")
412+
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
413+
container.add_controller(xml_reader)
414+
415+
408416
# Kubernetes
409417
def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
410418
kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from ..core.ControllerService import ControllerService
16+
17+
18+
class XMLReader(ControllerService):
19+
def __init__(self, name=None):
20+
super(XMLReader, self).__init__(name=name)
21+
self.service_class = 'XMLReader'
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from ..core.Processor import Processor
16+
17+
18+
class ConvertRecord(Processor):
19+
def __init__(self, context):
20+
super(ConvertRecord, self).__init__(
21+
context=context,
22+
clazz='ConvertRecord',
23+
auto_terminate=['success', 'failure'])
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "ConvertRecord.h"
18+
19+
#include "core/Resource.h"
20+
#include "nonstd/expected.hpp"
21+
#include "utils/GeneralUtils.h"
22+
#include "utils/ProcessorConfigUtils.h"
23+
24+
namespace org::apache::nifi::minifi::processors {
25+
26+
void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
27+
record_set_reader_ = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
28+
record_set_writer_ = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
29+
include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles);
30+
}
31+
32+
void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
33+
gsl_Expects(record_set_reader_ && record_set_writer_);
34+
const auto flow_file = session.get();
35+
if (!flow_file) {
36+
context.yield();
37+
return;
38+
}
39+
40+
nonstd::expected<core::RecordSet, std::error_code> record_set;
41+
session.read(flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
42+
record_set = record_set_reader_->read(*input_stream);
43+
return gsl::narrow<int64_t>(input_stream->size());
44+
});
45+
if (!record_set) {
46+
logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message());
47+
flow_file->setAttribute(processors::ConvertRecord::RecordErrorMessageOutputAttribute.name, record_set.error().message());
48+
session.transfer(flow_file, Failure);
49+
return;
50+
}
51+
52+
if (!include_zero_record_flow_files_ && record_set->empty()) {
53+
logger_->log_info("No records found in flow file, removing flow file");
54+
session.remove(flow_file);
55+
return;
56+
}
57+
58+
record_set_writer_->write(*record_set, flow_file, session);
59+
flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name, std::to_string(record_set->size()));
60+
session.transfer(flow_file, Success);
61+
}
62+
63+
REGISTER_RESOURCE(ConvertRecord, Processor);
64+
65+
} // namespace org::apache::nifi::minifi::processors
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include <memory>
20+
#include <string_view>
21+
#include <utility>
22+
23+
#include "core/AbstractProcessor.h"
24+
#include "core/ProcessSession.h"
25+
#include "core/PropertyDefinitionBuilder.h"
26+
#include "core/RelationshipDefinition.h"
27+
#include "controllers/RecordSetReader.h"
28+
#include "controllers/RecordSetWriter.h"
29+
30+
namespace org::apache::nifi::minifi::processors {
31+
32+
class ConvertRecord : public core::AbstractProcessor<ConvertRecord> {
33+
public:
34+
using core::AbstractProcessor<ConvertRecord>::AbstractProcessor;
35+
36+
EXTENSIONAPI static constexpr const char* Description = "Converts records from one data format to another using configured Record Reader and Record Writer Controller Services.";
37+
38+
EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
39+
.withDescription("Specifies the Controller Service to use for reading incoming data")
40+
.isRequired(true)
41+
.withAllowedTypes<minifi::core::RecordSetReader>()
42+
.build();
43+
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
44+
.withDescription("Specifies the Controller Service to use for writing out the records")
45+
.isRequired(true)
46+
.withAllowedTypes<minifi::core::RecordSetWriter>()
47+
.build();
48+
EXTENSIONAPI static constexpr auto IncludeZeroRecordFlowFiles = core::PropertyDefinitionBuilder<>::createProperty("Include Zero Record FlowFiles")
49+
.withDescription("When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship.")
50+
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
51+
.withDefaultValue("true")
52+
.isRequired(true)
53+
.build();
54+
55+
EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
56+
RecordReader,
57+
RecordWriter,
58+
IncludeZeroRecordFlowFiles
59+
});
60+
61+
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
62+
"If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship"};
63+
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are successfully transformed will be routed to this relationship"};
64+
EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Success};
65+
66+
EXTENSIONAPI static constexpr auto RecordCountOutputAttribute = core::OutputAttributeDefinition<1>{"record.count", {Success}, "The number of records in the FlowFile"};
67+
EXTENSIONAPI static constexpr auto RecordErrorMessageOutputAttribute = core::OutputAttributeDefinition<1>{"record.error.message", {Failure},
68+
"This attribute provides on failure the error message encountered by the Reader or Writer."};
69+
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{RecordCountOutputAttribute, RecordErrorMessageOutputAttribute};
70+
71+
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
72+
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
73+
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
74+
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
75+
76+
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
77+
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
78+
79+
private:
80+
std::shared_ptr<core::RecordSetReader> record_set_reader_;
81+
std::shared_ptr<core::RecordSetWriter> record_set_writer_;
82+
bool include_zero_record_flow_files_ = true;
83+
};
84+
85+
} // namespace org::apache::nifi::minifi::processors

0 commit comments

Comments
 (0)