Skip to content

Commit a819f61

Browse files
committed
Add Column mode to NGSIToMongo Processor
1 parent 81a6c31 commit a819f61

File tree

2 files changed

+7
-11
lines changed

2 files changed

+7
-11
lines changed

nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/AbstractMongoProcessor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
7676
.displayName("Attribute Persistence")
7777
.description("The mode of storing the data inside of the table")
7878
.required(false)
79-
.allowableValues("row")
79+
.allowableValues("row","column")
8080
.defaultValue("row")
8181
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
8282
.build();
@@ -215,8 +215,6 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
215215

216216
static {
217217
descriptors.add(URI);
218-
// descriptors.add(SSL_CONTEXT_SERVICE);
219-
// descriptors.add(CLIENT_AUTH);
220218
descriptors.add(NGSI_VERSION);
221219
descriptors.add(DATA_MODEL);
222220
descriptors.add(ATTR_PERSISTENCE);

nifi-ngsi-bundle/nifi-ngsi-processors/src/main/java/org/apache/nifi/processors/ngsi/ngsi/aggregators/MongoAggregator.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,24 +159,22 @@ public void aggregate(Entity entity, long creationTime,String dataModel) {
159159
// get the event body
160160
String entityId = entity.getEntityId();
161161
String entityType = entity.getEntityType();
162-
// iterate on all this context element attributes, if there are attributes
162+
// iterate on all this context element attributes, if there are attributes
163163
ArrayList<Attributes> contextAttributes = entity.getEntityAttrs();
164-
Document doc = createDoc(recvTimeTs, entityId, entityType);
164+
Document doc = createDoc(recvTimeTs, entityId, entityType, dataModel);
165165

166-
for (Attributes contextAttribute : contextAttributes) {
166+
for (Attributes contextAttribute : contextAttributes) {
167167
String attrName = contextAttribute.getAttrName();
168168
String attrType = contextAttribute.getAttrType();
169169
String attrValue = contextAttribute.getAttrValue();
170-
doc.append("attrName", attrName)
171-
.append("attrType", attrType)
172-
.append("attrValue", attrValue);
170+
doc.append( attrName,attrValue);
173171
} // for
174-
175172
aggregation.add(doc);
176173
} // aggregate
177174

178-
private Document createDoc(long recvTimeTs, String entityId, String entityType) {
175+
private Document createDoc(long recvTimeTs, String entityId, String entityType,String dataModel) {
179176
Document doc = new Document("recvTime", new Date(recvTimeTs));
177+
doc.append("recvTimeTs", recvTimeTs);
180178

181179
switch (dataModel) {
182180
case "db-by-service-path":

0 commit comments

Comments
 (0)