1919package org .apache .atlas .impala ;
2020
2121import org .apache .atlas .impala .hook .ImpalaLineageHook ;
22-
23- import java .io .*;
24- import java .util .ArrayList ;
25- import java .util .Arrays ;
26- import java .util .List ;
27- import org .apache .commons .cli .DefaultParser ;
2822import org .apache .commons .cli .CommandLine ;
23+ import org .apache .commons .cli .DefaultParser ;
2924import org .apache .commons .cli .Options ;
3025import org .apache .commons .cli .ParseException ;
3126import org .apache .commons .io .FileUtils ;
3530import org .slf4j .Logger ;
3631import org .slf4j .LoggerFactory ;
3732
33+ import java .io .BufferedWriter ;
34+ import java .io .File ;
35+ import java .io .FileFilter ;
36+ import java .io .FileWriter ;
37+ import java .util .ArrayList ;
38+ import java .util .Arrays ;
39+ import java .util .List ;
40+
3841/**
3942 * Entry point of actual implementation of Impala lineage tool. It reads the lineage records in
4043 * lineage log. It then calls instance of ImpalaLineageHook to convert lineage records to
4144 * lineage notifications and send them to Atlas.
4245 */
4346public class ImpalaLineageTool {
44- private static final Logger LOG = LoggerFactory .getLogger (ImpalaLineageTool .class );
45- private static final String WAL_FILE_EXTENSION = ".wal" ;
46- private static final String WAL_FILE_PREFIX = "WAL" ;
47- private String directoryName ;
48- private String prefix ;
49-
50- public ImpalaLineageTool (String [] args ) {
51- try {
52- Options options = new Options ();
53- options .addOption ("d" , "directory" , true , "the lineage files' folder" );
54- options .addOption ("p" , "prefix" , true , "the prefix of the lineage files" );
55-
56- CommandLine cmd = new DefaultParser ().parse (options , args );
57- directoryName = cmd .getOptionValue ("d" );
58- prefix = cmd .getOptionValue ("p" );
59- } catch (ParseException e ) {
60- LOG .warn ("Failed to parse command arguments. Error: " , e .getMessage ());
61- printUsage ();
62-
63- throw new RuntimeException (e );
47+ private static final Logger LOG = LoggerFactory .getLogger (ImpalaLineageTool .class );
48+ private static final String WAL_FILE_EXTENSION = ".wal" ;
49+ private static final String WAL_FILE_PREFIX = "WAL" ;
50+ private String directoryName ;
51+ private String prefix ;
52+
53+ public ImpalaLineageTool (String [] args ) {
54+ try {
55+ Options options = new Options ();
56+ options .addOption ("d" , "directory" , true , "the lineage files' folder" );
57+ options .addOption ("p" , "prefix" , true , "the prefix of the lineage files" );
58+
59+ CommandLine cmd = new DefaultParser ().parse (options , args );
60+ directoryName = cmd .getOptionValue ("d" );
61+ prefix = cmd .getOptionValue ("p" );
62+ } catch (ParseException e ) {
63+ LOG .warn ("Failed to parse command arguments. Error: " , e .getMessage ());
64+ printUsage ();
65+
66+ throw new RuntimeException (e );
67+ }
6468 }
65- }
66-
67- public void run () {
68- ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook ();
6969
70- File [] currentFiles = getCurrentFiles ();
71- int fileNum = currentFiles . length ;
70+ public void run () {
71+ ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook () ;
7272
73- for (int i = 0 ; i < fileNum ; i ++) {
74- String filename = currentFiles [i ].getAbsolutePath ();
75- String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles [i ].getName () + WAL_FILE_EXTENSION ;
73+ File [] currentFiles = getCurrentFiles ();
74+ int fileNum = currentFiles .length ;
7675
77- LOG .info ("Importing: {}" , filename );
78- importHImpalaEntities (impalaLineageHook , filename , walFilename );
76+ for (int i = 0 ; i < fileNum ; i ++) {
77+ String filename = currentFiles [i ].getAbsolutePath ();
78+ String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles [i ].getName () + WAL_FILE_EXTENSION ;
79+ LOG .info ("Importing: {}" , filename );
80+ importHImpalaEntities (impalaLineageHook , filename , walFilename );
7981
80- if (i != fileNum - 1 ) {
81- deleteLineageAndWal (currentFiles [i ], walFilename );
82- }
83- }
84- LOG .info ("Impala bridge processing: Done! " );
85- }
86-
87- public static void main (String [] args ) {
88- if (args != null && args .length != 4 ) {
89- // The lineage file location and prefix should be input as the parameters
90- System .out .println ("Impala bridge: wrong number of arguments. Please try again" );
91- printUsage ();
92- return ;
82+ if (i != fileNum - 1 ) {
83+ deleteLineageAndWal (currentFiles [i ], walFilename );
84+ }
85+ }
86+ LOG .info ("Impala bridge processing: Done! " );
9387 }
9488
95- ImpalaLineageTool instance = new ImpalaLineageTool (args );
96- instance .run ();
97- }
89+ public static void main (String [] args ) {
90+ if (args != null && args .length != 4 ) {
91+ // The lineage file location and prefix should be input as the parameters
92+ System .out .println ("Impala bridge: wrong number of arguments. Please try again" );
93+ printUsage ();
94+ return ;
95+ }
96+
97+ ImpalaLineageTool instance = new ImpalaLineageTool (args );
98+ instance .run ();
99+ }
98100
99101 /**
100102 * Delete the used lineage file and wal file
101103 * @param currentFile The current file
102104 * @param wal The wal file
103105 */
104- public static void deleteLineageAndWal (File currentFile , String wal ) {
105- if (currentFile .exists () && currentFile .delete ()) {
106- LOG .info ("Lineage file {} is deleted successfully" , currentFile .getPath ());
107- } else {
108- LOG .info ("Failed to delete the lineage file {}" , currentFile .getPath ());
106+ public static void deleteLineageAndWal (File currentFile , String wal ) {
107+ if (currentFile .exists () && currentFile .delete ()) {
108+ LOG .info ("Lineage file {} is deleted successfully" , currentFile .getPath ());
109+ } else {
110+ LOG .info ("Failed to delete the lineage file {}" , currentFile .getPath ());
111+ }
112+
113+ File file = new File (wal );
114+
115+ if (file .exists () && file .delete ()) {
116+ LOG .info ("Wal file {} deleted successfully" , wal );
117+ } else {
118+ LOG .info ("Failed to delete the wal file {}" , wal );
119+ }
109120 }
110121
111- File file = new File ( wal );
112-
113- if ( file . exists () && file . delete ()) {
114- LOG . info ( "Wal file {} deleted successfully" , wal );
115- } else {
116- LOG . info ( "Failed to delete the wal file {}" , wal );
122+ private static void printUsage () {
123+ System . out . println ();
124+ System . out . println ();
125+ System . out . println ( "Usage: import-impala.sh [-d <directory>] [-p <prefix>]" );
126+ System . out . println ( "Imports specified lineage files by given directory and file prefix." );
127+ System . out . println ( );
117128 }
118- }
119-
120- private static void printUsage () {
121- System .out .println ();
122- System .out .println ();
123- System .out .println ("Usage: import-impala.sh [-d <directory>] [-p <prefix>]" );
124- System .out .println (" Imports specified lineage files by given directory and file prefix." );
125- System .out .println ();
126- }
127129
128130 /**
129131 * This function figures out the right lineage file path+name to process sorted by the last
130132 * time they are modified. (old -> new)
131133 * @return get the lineage files from given directory with given prefix.
132134 */
133- public File [] getCurrentFiles () {
134- try {
135- LOG .info ("Scanning: " + directoryName );
136- File folder = new File (directoryName );
137- File [] listOfFiles = folder .listFiles ((FileFilter ) new PrefixFileFilter (prefix , IOCase .SENSITIVE ));
138-
139- if ((listOfFiles == null ) || (listOfFiles .length == 0 )) {
140- LOG .info ("Found no lineage files." );
135+ public File [] getCurrentFiles () {
136+ try {
137+ LOG .info ("Scanning: " + directoryName );
138+ File folder = new File (directoryName );
139+ File [] listOfFiles = folder .listFiles ((FileFilter ) new PrefixFileFilter (prefix , IOCase .SENSITIVE ));
140+
141+ if ((listOfFiles == null ) || (listOfFiles .length == 0 )) {
142+ LOG .info ("Found no lineage files." );
143+ return new File [0 ];
144+ }
145+
146+ if (listOfFiles .length > 1 ) {
147+ Arrays .sort (listOfFiles , LastModifiedFileComparator .LASTMODIFIED_COMPARATOR );
148+ }
149+
150+ LOG .info ("Found {} lineage files" + listOfFiles .length );
151+ return listOfFiles ;
152+ } catch (Exception e ) {
153+ LOG .error ("Import lineage file failed." , e );
154+ }
141155 return new File [0 ];
142- }
156+ }
143157
144- if (listOfFiles .length > 1 ) {
145- Arrays .sort (listOfFiles , LastModifiedFileComparator .LASTMODIFIED_COMPARATOR );
146- }
158+ private boolean processImpalaLineageHook (ImpalaLineageHook impalaLineageHook , List <String > lineageList ) {
159+ boolean allSucceed = true ;
147160
148- LOG .info ("Found {} lineage files" + listOfFiles .length );
149- return listOfFiles ;
150- } catch (Exception e ) {
151- LOG .error ("Import lineage file failed." , e );
152- }
153- return new File [0 ];
154- }
155-
156- private boolean processImpalaLineageHook (ImpalaLineageHook impalaLineageHook , List <String > lineageList ) {
157- boolean allSucceed = true ;
158-
159- // returns true if successfully sent to Atlas
160- for (String lineageRecord : lineageList ) {
161- try {
162- impalaLineageHook .process (lineageRecord );
163- } catch (Exception ex ) {
164- String errorMessage = String .format ("Exception at query {} \n " , lineageRecord );
165- LOG .error (errorMessage , ex );
166-
167- allSucceed = false ;
168- }
169- }
161+ // returns true if successfully sent to Atlas
162+ for (String lineageRecord : lineageList ) {
163+ try {
164+ impalaLineageHook .process (lineageRecord );
165+ } catch (Exception ex ) {
166+ String errorMessage = String .format ("Exception at query {} \n " , lineageRecord );
167+ LOG .error (errorMessage , ex );
170168
171- return allSucceed ;
172- }
169+ allSucceed = false ;
170+ }
171+ }
172+
173+ return allSucceed ;
174+ }
173175
174176 /**
175177 * Create a list of lineage queries based on the lineage file and the wal file
176178 * @param name
177179 * @param walfile
178180 * @return
179181 */
180- public void importHImpalaEntities (ImpalaLineageHook impalaLineageHook , String name , String walfile ) {
181- List <String > lineageList = new ArrayList <>();
182+ public void importHImpalaEntities (ImpalaLineageHook impalaLineageHook , String name , String walfile ) {
183+ List <String > lineageList = new ArrayList <>();
182184
183- try {
184- File lineageFile = new File (name ); //use current file length to minus the offset
185- File walFile = new File (walfile );
185+ try {
186+ File lineageFile = new File (name ); //use current file length to minus the offset
187+ File walFile = new File (walfile );
186188 // if the wal file does not exist, create one with 0 byte read, else, read the number
187- if (!walFile .exists ()) {
188- BufferedWriter writer = new BufferedWriter (new FileWriter (walfile ));
189- writer .write ("0, " + name );
190- writer .close ();
191- }
189+ if (!walFile .exists ()) {
190+ BufferedWriter writer = new BufferedWriter (new FileWriter (walfile ));
191+ writer .write ("0, " + name );
192+ writer .close ();
193+ }
192194
193- LOG .debug ("Reading: " + name );
194- String lineageRecord = FileUtils .readFileToString (lineageFile , "UTF-8" );
195+ LOG .debug ("Reading: " + name );
196+ String lineageRecord = FileUtils .readFileToString (lineageFile , "UTF-8" );
195197
196- lineageList .add (lineageRecord );
198+ lineageList .add (lineageRecord );
197199
198200 // call instance of ImpalaLineageHook to process the list of Impala lineage record
199- if (processImpalaLineageHook (impalaLineageHook , lineageList )) {
201+ if (processImpalaLineageHook (impalaLineageHook , lineageList )) {
200202 // write how many bytes the current file is to the wal file
201- FileWriter newWalFile = new FileWriter (walfile , true );
202- BufferedWriter newWalFileBuf = new BufferedWriter (newWalFile );
203- newWalFileBuf .newLine ();
204- newWalFileBuf .write (String .valueOf (lineageFile .length ()) + "," + name );
205-
206- newWalFileBuf .close ();
207- newWalFile .close ();
208- } else {
209- LOG .error ("Error sending some of impala lineage records to ImpalaHook" );
210- }
211- } catch (Exception e ) {
212- LOG .error ("Error in processing lineage records. Exception: " + e .getMessage ());
203+ FileWriter newWalFile = new FileWriter (walfile , true );
204+ BufferedWriter newWalFileBuf = new BufferedWriter (newWalFile );
205+ newWalFileBuf .newLine ();
206+ newWalFileBuf .write (String .valueOf (lineageFile .length ()) + "," + name );
207+
208+ newWalFileBuf .close ();
209+ newWalFile .close ();
210+ } else {
211+ LOG .error ("Error sending some of impala lineage records to ImpalaHook" );
212+ }
213+ } catch (Exception e ) {
214+ LOG .error ("Error in processing lineage records. Exception: " + e .getMessage ());
215+ }
213216 }
214- }
215-
216- }
217+ }
0 commit comments