11package com .ukubuka .core .execute ;
22
3- import java .io .IOException ;
4- import java .util .ArrayList ;
5- import java .util .HashMap ;
6- import java .util .HashSet ;
7- import java .util .List ;
83import java .util .Map ;
94
105import org .slf4j .Logger ;
116import org .slf4j .LoggerFactory ;
127import org .springframework .beans .factory .annotation .Autowired ;
13- import org .springframework .beans .factory .annotation .Qualifier ;
148import org .springframework .stereotype .Service ;
15- import org .springframework .util .CollectionUtils ;
169
17- import com .fasterxml .jackson .databind .ObjectMapper ;
18- import com .fasterxml .jackson .databind .ObjectReader ;
1910import com .ukubuka .core .exception .ParserException ;
20- import com .ukubuka .core .exception .ReaderException ;
2111import com .ukubuka .core .exception .TransformException ;
2212import com .ukubuka .core .exception .WriterException ;
2313import com .ukubuka .core .model .FileContents ;
24- import com .ukubuka .core .model .FileRecord ;
25- import com .ukubuka .core .model .LoadOperation ;
26- import com .ukubuka .core .model .SupportedFileType ;
27- import com .ukubuka .core .model .SupportedSource ;
2814import com .ukubuka .core .model .UkubukaSchema ;
29- import com .ukubuka .core .model .UkubukaSchema .Extract ;
30- import com .ukubuka .core .model .UkubukaSchema .Load ;
31- import com .ukubuka .core .model .UkubukaSchema .Transform ;
32- import com .ukubuka .core .model .UkubukaSchema .TransformOperations ;
15+ import com .ukubuka .core .operations .extract .UkubukaExtractor ;
16+ import com .ukubuka .core .operations .load .UkubukaLoader ;
3317import com .ukubuka .core .operations .transform .UkubukaTransformer ;
34- import com .ukubuka .core .parser .UkubukaParser ;
35- import com .ukubuka .core .reader .UkubukaReader ;
36- import com .ukubuka .core .utilities .Constants ;
37- import com .ukubuka .core .utilities .Utilities ;
38- import com .ukubuka .core .writer .UkubukaWriter ;
18+ import com .ukubuka .core .schema .UkubukaSchemaReader ;
3919
4020/**
4121 * Ukubuka Executor Service
@@ -50,27 +30,18 @@ public class UkubukaExecutorService {
5030 private static final Logger LOGGER = LoggerFactory
5131 .getLogger (UkubukaExecutorService .class );
5232
53- /*********************************** Global Variables ***********************************/
54- private static final ObjectReader SCHEMA_READER = new ObjectMapper ()
55- .readerFor (UkubukaSchema .class );
56-
5733 /********************************* Dependency Injections ********************************/
5834 @ Autowired
59- private UkubukaReader reader ;
60-
61- @ Autowired
62- private UkubukaWriter writer ;
35+ private UkubukaSchemaReader ukubukaSchemaReader ;
6336
6437 @ Autowired
65- @ Qualifier ("UkubukaXMLParser" )
66- private UkubukaParser xmlParser ;
38+ private UkubukaExtractor ukubukaExtractor ;
6739
6840 @ Autowired
69- @ Qualifier ("UkubukaDFileParser" )
70- private UkubukaParser delimitedFileParser ;
41+ private UkubukaTransformer ukubukaTransformer ;
7142
7243 @ Autowired
73- private UkubukaTransformer ukubukaTransformer ;
44+ private UkubukaLoader ukubukaLoader ;
7445
7546 /**
7647 * Execute Ukubuka
@@ -83,176 +54,26 @@ public class UkubukaExecutorService {
8354 public void execute (final String ukubukaSchemaFile )
8455 throws ParserException , TransformException , WriterException {
8556 /* Create An In-Memory Data Store */
86- Map <String , FileContents > dataFiles = new HashMap <>() ;
57+ Map <String , FileContents > dataFiles ;
8758
8859 /* Read File*/
89- UkubukaSchema ukubukaSchema = readSchema (ukubukaSchemaFile );
60+ LOGGER .info ("Reading Ukubuka Schema..." );
61+ UkubukaSchema ukubukaSchema = ukubukaSchemaReader
62+ .readSchema (ukubukaSchemaFile );
9063
91- /* Iterate Extracts */
92- LOGGER .info ("Performing Extracts..." );
93- for (final Extract extract : ukubukaSchema .getExtracts ()) {
94- LOGGER .info ("Performing Extract: HC" + extract .hashCode ());
95- FileContents fileContents = null ;
64+ /* Perform Extracts */
65+ LOGGER .info ("Performing Extract(s)..." );
66+ dataFiles = ukubukaExtractor
67+ .performOperations (ukubukaSchema .getExtracts ());
9668
97- /* Get File Type */
98- switch (extract .getType ()) {
99- /* Delimited File */
100- case CSV :
101- fileContents = delimitedFileParser
102- .parseFile (extract .getLocation (), extract .getFlags ());
103- break ;
104- /* XML File */
105- case XML :
106- fileContents = xmlParser .parseFile (extract .getLocation (),
107- extract .getFlags ());
108- break ;
109- /* Unsupported File */
110- default :
111- throw new ParserException ("File Type Not Supported!" );
112- }
113-
114- /* Perform Transformations */
115- LOGGER .info ("Performing Transformations..." );
116- performTransformation (extract .getId (),
117- ukubukaSchema .getTransforms (), fileContents );
118-
119- /* Store DataSet */
120- dataFiles .put (extract .getId (), fileContents );
121- }
69+ /* Perform Transformations */
70+ LOGGER .info ("Performing Transformation(s)..." );
71+ dataFiles = ukubukaTransformer .performOperations (dataFiles ,
72+ ukubukaSchema .getTransforms ());
12273
12374 /* Perform Load */
124- LOGGER .info ("Performing Load..." );
125- performLoad (ukubukaSchema .getLoads (), dataFiles );
126- }
127-
128- /**
129- * Perform Transformations
130- *
131- * @param fileId
132- * @param transforms
133- * @param fileContents
134- * @throws TransformException
135- */
136- private void performTransformation (final String fileId ,
137- final List <Transform > transforms , FileContents fileContents )
138- throws TransformException {
139- /* Get File Transformation */
140- List <TransformOperations > fileTransforms = getFileTransformationDetails (
141- fileId , transforms );
142- if (!CollectionUtils .isEmpty (fileTransforms )) {
143- LOGGER .info ("Transform Count: #" + fileTransforms .size ());
144- ukubukaTransformer .performOperations (fileContents .getHeader (),
145- fileContents .getData (), fileTransforms );
146- }
147- }
148-
149- /**
150- * Perform Loads
151- *
152- * @param load
153- * @param dataFiles
154- * @throws WriterException
155- */
156- private void performLoad (final Load load ,
157- final Map <String , FileContents > dataFiles ) throws WriterException {
158- /* Check Whether Valid Load Operations */
159- if (null != load ) {
160- LOGGER .info ("Performing Load: HC" + load .hashCode ());
161-
162- /* Get File Contents */
163- FileContents fileContents = new FileContents (
164- new ArrayList <String >(), new ArrayList <FileRecord >());
165- fileContents .setHeader (dataFiles
166- .get (load .getOperations ().getHeader ()).getHeader ());
167-
168- /* Iterate Data Sources */
169- for (final String fileId : load .getOperations ().getData ()) {
170- /* Check Flag For DISTINCT */
171- fileContents .getData ()
172- .addAll (LoadOperation .DISTINCT == load .getOperations ()
173- .getFilter ()
174- ? new HashSet <>(
175- dataFiles .get (fileId ).getData ())
176- : dataFiles .get (fileId ).getData ());
177- }
178-
179- /* Write File */
180- LOGGER .info ("Writing File..." );
181- try {
182- LOGGER .info ("ID: " + load .getId () + " | Type: " + load .getType ()
183- + " | Location: " + load .getLocation ());
184- writeFile (load .getType (), load .getLocation (),
185- fileContents .getHeader (), fileContents .getData ());
186- } catch (ParserException | IOException ex ) {
187- throw new WriterException (ex );
188- }
189- }
190- }
191-
192- /**
193- * Write File
194- *
195- * @param supportedFileType
196- * @param completeFileName
197- * @param header
198- * @param data
199- * @throws IOException
200- * @throws ParserException
201- */
202- private void writeFile (final SupportedFileType supportedFileType ,
203- final String completeFileName , List <String > header ,
204- List <FileRecord > data ) throws IOException , ParserException {
205- /* Get File Type */
206- switch (supportedFileType ) {
207- /* Delimited File */
208- case CSV :
209- Utilities .writeFile (completeFileName ,
210- writer .writeCSV (header , data ).toString ());
211- break ;
212- /* XML File */
213- case JSON :
214- Utilities .writeFile (completeFileName , writer
215- .prettyPrint (writer .writeJSON (header , data ).toString ()));
216- break ;
217- /* Unsupported File */
218- default :
219- throw new ParserException ("File Type Not Supported!" );
220- }
221- }
222-
223- /**
224- * Get File Transformation Details
225- *
226- * @param fileId
227- * @param transforms
228- * @return File Transforms
229- */
230- private List <TransformOperations > getFileTransformationDetails (
231- final String fileId , List <Transform > transforms ) {
232- /* Iterate Transforms */
233- for (final Transform transform : transforms ) {
234- if (transform .getId ().equals (fileId )) {
235- return transform .getOperations ();
236- }
237- }
238- return null ;
239- }
240-
241- /**
242- * Read Ukubuka Schema
243- *
244- * @param ukubukaSchemaFile
245- * @return Ukubuka Schema
246- * @throws ParserException
247- */
248- private UkubukaSchema readSchema (final String ukubukaSchemaFile )
249- throws ParserException {
250- try {
251- return SCHEMA_READER .readValue (reader .readFileAsString (
252- SupportedSource .FILE , ukubukaSchemaFile ,
253- Constants .DEFAULT_FILE_ENCODING ));
254- } catch (ReaderException | IOException ex ) {
255- throw new ParserException (ex );
256- }
75+ LOGGER .info ("Performing Load(s)..." );
76+ dataFiles = ukubukaLoader .performOperations (dataFiles ,
77+ ukubukaSchema .getLoads ());
25778 }
25879}
0 commit comments