Skip to content

Commit ca4e825

Browse files
committed
NIFI-14861: Added support for complex field types
1 parent 67786cd commit ca4e825

File tree

3 files changed

+49
-25
lines changed

3 files changed

+49
-25
lines changed

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphClientService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,4 @@ public interface GraphClientService extends ControllerService {
5656
default String generateSetPropertiesStatement(final String componentType, final List<Tuple<String, String>> identifiersAndValues, final String nodeType, final Map<String, Object> propertyMap) {
5757
throw new UnsupportedOperationException("This capability is not implemented for this GraphClientService");
5858
}
59-
}
59+
}

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@
4545
import org.apache.nifi.serialization.RecordSetWriter;
4646
import org.apache.nifi.serialization.RecordSetWriterFactory;
4747
import org.apache.nifi.serialization.WriteResult;
48+
import org.apache.nifi.serialization.record.DataType;
49+
import org.apache.nifi.serialization.record.MapRecord;
4850
import org.apache.nifi.serialization.record.Record;
51+
import org.apache.nifi.serialization.record.RecordFieldType;
52+
import org.apache.nifi.serialization.record.type.ArrayDataType;
4953
import org.apache.nifi.util.Tuple;
5054

5155
import java.io.IOException;
@@ -70,7 +74,7 @@
7074
+ "will be added as properties on the node/edge",
7175
value = "The variable name to be set", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
7276
description = "A dynamic property specifying a RecordField Expression identifying field(s) for whose values will be added to the matched node as properties")
73-
public class EnrichGraphRecord extends AbstractGraphExecutor {
77+
public class EnrichGraphRecord extends AbstractGraphExecutor {
7478

7579
private static final AllowableValue NODES = new AllowableValue(
7680
GraphClientService.NODES_TYPE,
@@ -147,16 +151,16 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
147151
}
148152

149153
public static final Relationship ORIGINAL = new Relationship.Builder().name("original")
150-
.description("Original flow files that successfully interacted with " +
151-
"graph server.")
152-
.build();
154+
.description("Original flow files that successfully interacted with " +
155+
"graph server.")
156+
.build();
153157
public static final Relationship FAILURE = new Relationship.Builder().name("failure")
154-
.description("Flow files that fail to interact with graph server.")
155-
.build();
158+
.description("Flow files that fail to interact with graph server.")
159+
.build();
156160
public static final Relationship GRAPH = new Relationship.Builder().name("response")
157-
.description("The response object from the graph server.")
158-
.autoTerminateDefault(true)
159-
.build();
161+
.description("The response object from the graph server.")
162+
.autoTerminateDefault(true)
163+
.build();
160164

161165
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
162166
CLIENT_SERVICE,
@@ -210,7 +214,7 @@ private List<FieldValue> getRecordValue(Record record, RecordPath recordPath) {
210214
@Override
211215
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
212216
FlowFile input = session.get();
213-
if ( input == null ) {
217+
if (input == null) {
214218
return;
215219
}
216220

@@ -221,12 +225,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
221225
.keySet().stream()
222226
.filter(PropertyDescriptor::isDynamic)
223227
.forEach(it ->
224-
dynamic.put(it.getName(), recordPathCache.getCompiled(
225-
context
226-
.getProperty(it.getName())
227-
.evaluateAttributeExpressions(finalInput)
228-
.getValue()))
229-
);
228+
dynamic.put(it.getName(), recordPathCache.getCompiled(
229+
context
230+
.getProperty(it.getName())
231+
.evaluateAttributeExpressions(finalInput)
232+
.getValue()))
233+
);
230234

231235
long delta;
232236
FlowFile failedRecords = session.create(input);
@@ -261,29 +265,49 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
261265
}
262266
final List<FieldValue> propertyValues = getRecordValue(record, recordPathCache.getCompiled("/" + fieldName));
263267
// Use the first value if multiple are found
264-
if (propertyValues == null || propertyValues.isEmpty() || propertyValues.get(0).getValue() == null) {
265-
throw new IOException("Dynamic property field(s) not found in record (check the RecordPath Expression), sending this record to failure");
268+
if (propertyValues == null || propertyValues.isEmpty() || propertyValues.getFirst().getValue() == null) {
269+
continue;
266270
}
267271

268-
dynamicPropertyMap.put(fieldName, propertyValues.get(0).getValue());
272+
Object rawValue = propertyValues.getFirst().getValue();
273+
DataType rawDataType = propertyValues.getFirst().getField().getDataType();
274+
RecordFieldType rawValueType = rawDataType.getFieldType();
275+
// Change MapRecords to Maps recursively as needed
276+
if (RecordFieldType.ARRAY.equals(rawValueType)) {
277+
DataType arrayElementType = ((ArrayDataType) rawDataType).getElementType();
278+
if (RecordFieldType.RECORD.getDataType().equals(arrayElementType)) {
279+
Object[] rawValueArray = (Object[]) rawValue;
280+
Object[] mappedValueArray = new Object[rawValueArray.length];
281+
for (int i = 0; i < rawValueArray.length; i++) {
282+
MapRecord mapRecord = (MapRecord) rawValueArray[i];
283+
mappedValueArray[i] = mapRecord.toMap(true);
284+
}
285+
dynamicPropertyMap.put(fieldName, mappedValueArray);
286+
}
287+
} else if (RecordFieldType.RECORD.equals(rawValueType)) {
288+
MapRecord mapRecord = (MapRecord) rawValue;
289+
dynamicPropertyMap.put(fieldName, mapRecord.toMap(true));
290+
} else {
291+
dynamicPropertyMap.put(fieldName, rawValue);
292+
}
269293
}
270294
} else {
271-
for (String entry : dynamic.keySet()) {
295+
for (String entry : keySet) {
272296
if (!dynamicPropertyMap.containsKey(entry)) {
273297
final List<FieldValue> propertyValues = getRecordValue(record, dynamic.get(entry));
274298
// Use the first value if multiple are found
275-
if (propertyValues == null || propertyValues.isEmpty() || propertyValues.get(0).getValue() == null) {
299+
if (propertyValues == null || propertyValues.isEmpty() || propertyValues.getFirst().getValue() == null) {
276300
throw new IOException("Dynamic property field(s) not found in record (check the RecordPath Expression), sending this record to failure");
277301
}
278302

279-
dynamicPropertyMap.put(entry, propertyValues.get(0).getValue());
303+
dynamicPropertyMap.put(entry, propertyValues.getFirst().getValue());
280304
}
281305
}
282306
}
283307

284308
final String nodeType = context.getProperty(NODE_TYPE).evaluateAttributeExpressions(input).getValue();
285309
List<Tuple<String, String>> identifiersAndValues = new ArrayList<>(identifierValues.size());
286-
for (FieldValue fieldValue: identifierValues) {
310+
for (FieldValue fieldValue : identifierValues) {
287311
if (fieldValue.getValue() == null) {
288312
throw new IOException(String.format("Identifier field '%s' is null for record at index %d, sending this record to failure", identifierField, records));
289313
}

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/TestEnrichGraphRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,4 @@ public void testFailedProcessing() {
166166
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 1);
167167
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 0);
168168
}
169-
}
169+
}

0 commit comments

Comments
 (0)