11package org .apache .nifi .processors .ngsi ;
22
3+ import com .google .gson .JsonObject ;
34import org .apache .nifi .annotation .behavior .InputRequirement ;
45import org .apache .nifi .annotation .behavior .SupportsBatching ;
56import org .apache .nifi .annotation .documentation .CapabilityDescription ;
1112import org .apache .nifi .processor .exception .ProcessException ;
1213import org .apache .nifi .processor .util .StandardValidators ;
1314import org .apache .nifi .processor .util .pattern .RollbackOnFailure ;
14- import org .apache .nifi .processors .ngsi .ngsi .backends . ckan . CkanBackend ;
15- import org .apache .nifi .processors .ngsi .ngsi .utils . Entity ;
16- import org .apache .nifi .processors .ngsi .ngsi .utils .NGSIEvent ;
17- import org . apache . nifi . processors . ngsi . ngsi . utils . NGSIUtils ;
15+ import org .apache .nifi .processors .ngsi .ngsi .aggregators . CKANAggregator ;
16+ import org .apache .nifi .processors .ngsi .ngsi .backends . ckan . CKANBackend ;
17+ import org .apache .nifi .processors .ngsi .ngsi .utils .* ;
18+
1819import java .util .ArrayList ;
1920import java .util .HashSet ;
2021import java .util .List ;
@@ -134,6 +135,15 @@ public class NGSIToCKAN extends AbstractProcessor {
134135 .addValidator (StandardValidators .NON_EMPTY_VALIDATOR )
135136 .build ();
136137
138+ protected static final PropertyDescriptor CREATE_DATASTORE = new PropertyDescriptor .Builder ()
139+ .name ("create-datastore" )
140+ .displayName ("Create DataStore" )
141+ .description ("true or false, true applies create the DataStore resource" )
142+ .required (false )
143+ .allowableValues ("true" , "false" )
144+ .defaultValue ("true" )
145+ .build ();
146+
137147 protected static final PropertyDescriptor ENABLE_ENCODING = new PropertyDescriptor .Builder ()
138148 .name ("enable-encoding" )
139149 .displayName ("Enable Encoding" )
@@ -198,8 +208,6 @@ public class NGSIToCKAN extends AbstractProcessor {
198208 + "such as an invalid query or an integrity constraint violation" )
199209 .build ();
200210
201-
202-
203211 @ Override
204212 protected List <PropertyDescriptor > getSupportedPropertyDescriptors () {
205213 final List <PropertyDescriptor > properties = new ArrayList <>();
@@ -214,6 +222,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
214222 properties .add (ATTR_PERSISTENCE );
215223 properties .add (DEFAULT_SERVICE );
216224 properties .add (DEFAULT_SERVICE_PATH );
225+ properties .add (CREATE_DATASTORE );
217226 properties .add (ENABLE_ENCODING );
218227 properties .add (ENABLE_LOWERCASE );
219228 properties .add (BATCH_SIZE );
@@ -244,24 +253,65 @@ protected void persistFlowFile(final ProcessContext context, final FlowFile flow
244253 final int maxConnectionsPerRoute = context .getProperty (MAX_CONNECTIONS_PER_ROUTE ).asInteger ();
245254 final boolean enableEncoding = context .getProperty (ENABLE_ENCODING ).asBoolean ();
246255 final boolean enableLowercase = context .getProperty (ENABLE_LOWERCASE ).asBoolean ();
247- final CkanBackend ckanBackend = new CkanBackend (apiKey ,host ,port ,orioUrl ,ssl ,maxConnections ,maxConnectionsPerRoute ,ckanViewer );
248- NGSIUtils n = new NGSIUtils ();
256+ final boolean createDataStore = context .getProperty (CREATE_DATASTORE ).asBoolean ();
257+ final String attrPersistence = context .getProperty (ATTR_PERSISTENCE ).getValue ();
258+ final CKANBackend ckanBackend = new CKANBackend (apiKey ,host ,port ,orioUrl ,ssl ,maxConnections ,maxConnectionsPerRoute ,ckanViewer );
259+ final NGSIUtils n = new NGSIUtils ();
260+ final BuildDCATMetadata buildDCATMetadata = new BuildDCATMetadata ();
261+ final DCATMetadata dcatMetadata = buildDCATMetadata .getMetadataFromFlowFile (flowFile ,session );
249262 final String ngsiVersion =context .getProperty (NGSI_VERSION ).getValue ();
250263 final String dataModel =context .getProperty (DATA_MODEL ).getValue ();
251-
252264 final NGSIEvent event =n .getEventFromFlowFile (flowFile ,session ,ngsiVersion );
253265 final long creationTime = event .getCreationTime ();
254266 final String fiwareService = (event .getFiwareService ().compareToIgnoreCase ("nd" )==0 )?context .getProperty (DEFAULT_SERVICE ).getValue ():event .getFiwareService ();
255267 final String fiwareServicePath = ("ld" .equals (context .getProperty (NGSI_VERSION ).getValue ()))?"" :(event .getFiwareServicePath ().compareToIgnoreCase ("/nd" )==0 )?context .getProperty (DEFAULT_SERVICE_PATH ).getValue ():event .getFiwareServicePath ();
268+ CKANAggregator aggregator = new CKANAggregator () {
269+ @ Override
270+ public void aggregate (Entity entity , long creationTime , String dataModel ) {
271+
272+ }
273+ };
274+ aggregator = aggregator .getAggregator (("row" .equals (attrPersistence ))?true :false );
256275 try {
257- final String orgName = ckanBackend .buildOrgName (fiwareService ,dataModel ,enableEncoding ,enableLowercase ,ngsiVersion );
276+
277+ final String orgName = ckanBackend .buildOrgName (fiwareService ,dataModel ,enableEncoding ,enableLowercase ,ngsiVersion ,dcatMetadata );
258278 ArrayList <Entity > entities = new ArrayList <>();
259279 entities = ("ld" .equals (context .getProperty (NGSI_VERSION ).getValue ()))?event .getEntitiesLD ():event .getEntities ();
260- for (Entity entity : event .getEntities ()) {
261- final String pkgName = ckanBackend .buildPkgName (fiwareService ,entity ,dataModel ,enableEncoding ,enableLowercase ,ngsiVersion );
262- final String resName = ckanBackend .buildResName (entity ,dataModel ,enableEncoding ,enableLowercase ,ngsiVersion );
263-
264- } // for
280+ getLogger ().info ("[] Persisting data at NGSICKANSink (orgName=" + orgName + ", " );
281+ System .out .println (dcatMetadata .toString ());
282+
283+ for (Entity entity : entities ) {
284+ final String pkgName = ckanBackend .buildPkgName (fiwareService ,entity ,dataModel ,enableEncoding ,enableLowercase ,ngsiVersion ,dcatMetadata );
285+ final String resName = ckanBackend .buildResName (entity ,dataModel ,enableEncoding ,enableLowercase ,ngsiVersion ,dcatMetadata );
286+ aggregator .initialize (entity ,context .getProperty (NGSI_VERSION ).getValue ());
287+ aggregator .aggregate (entity , creationTime , context .getProperty (NGSI_VERSION ).getValue ());
288+ ArrayList <JsonObject > jsonObjects = CKANAggregator .linkedHashMapToJson (aggregator .getAggregationToPersist ());
289+ String aggregation = "" ;
290+
291+ for (JsonObject jsonObject : jsonObjects ) {
292+ if (aggregation .isEmpty ()) {
293+ aggregation = jsonObject .toString ();
294+ } else {
295+ aggregation += "," + jsonObject ;
296+ }
297+ }
298+
299+
300+ getLogger ().info ("[] Persisting data at NGSICKANSink (orgName=" + orgName
301+ + ", pkgName=" + pkgName + ", resName=" + resName + ", data=(" + aggregation + ")" );
302+
303+ // Do try-catch only for metrics gathering purposes... after that, re-throw
304+ try {
305+ if (aggregator instanceof CKANAggregator .RowAggregator ) {
306+ ckanBackend .persist (orgName , pkgName , resName , aggregation , true , dcatMetadata ,createDataStore );
307+ } else {
308+ ckanBackend .persist (orgName , pkgName , resName , aggregation , false , dcatMetadata ,createDataStore );
309+ } // if else
310+
311+ } catch (Exception e ) {
312+ throw e ;
313+ } // catch
314+ } // for
265315
266316 }catch (Exception e ){
267317 getLogger ().error (e .toString ());
0 commit comments