@@ -37,22 +37,21 @@ public class AliDip2BK implements Runnable {
3737 public static String KAFKAtopic_EOR = "aliecs.env_leave_state.RUNNING" ;
3838 public static String KAFKA_group_id = "AliDip" ;
3939 public static String STORE_HIST_FILE_DIR = "HistFiles" ;
40- public static boolean SIMULATE_DIP_EVENTS = false ;
40+ private static boolean simulateDipEvents = false ;
4141 public static SimpleDateFormat myDateFormat = new SimpleDateFormat ("dd-MM-yy HH:mm" );
4242 public static SimpleDateFormat logDateFormat = new SimpleDateFormat ("dd-MM HH:mm:ss" );
4343 public static double DIFF_ENERGY = 5 ;
4444 public static double DIFF_BETA = 0.001 ;
4545 public static double DIFF_CURRENT = 5 ;
4646 public static String ProgPath ;
47+ private final long startDate ;
4748 public String DipParametersFile = null ;
4849 String confFile = "AliDip2BK.properties" ;
4950 DipClient client ;
50- DipMessagesProcessor process ;
51+ DipMessagesProcessor dipMessagesProcessor ;
5152 BookkeepingClient bookkeepingClient ;
5253 StartOfRunKafkaConsumer kcs ;
5354 EndOfRunKafkaConsumer kce ;
54- private long startDate ;
55- private long stopDate ;
5655
5756 public AliDip2BK () {
5857 startDate = (new Date ()).getTime ();
@@ -66,19 +65,22 @@ public AliDip2BK() {
6665 verifyDirs ();
6766
6867 bookkeepingClient = new BookkeepingClient (bookkeepingUrl , bookkeepingToken );
69- process = new DipMessagesProcessor (bookkeepingClient );
68+ dipMessagesProcessor = new DipMessagesProcessor (bookkeepingClient );
69+ if (AliDip2BK .simulateDipEvents ) {
70+ new SimDipEventsFill (dipMessagesProcessor );
71+ }
7072
71- client = new DipClient (DipParametersFile , process );
73+ client = new DipClient (DipParametersFile , dipMessagesProcessor );
7274
7375 try {
7476 Thread .sleep (5000 );
7577 } catch (InterruptedException ex ) {
7678 Thread .currentThread ().interrupt ();
7779 }
7880
79- kcs = new StartOfRunKafkaConsumer (process );
81+ kcs = new StartOfRunKafkaConsumer (dipMessagesProcessor );
8082
81- kce = new EndOfRunKafkaConsumer (process );
83+ kce = new EndOfRunKafkaConsumer (dipMessagesProcessor );
8284
8385 shutdownProc ();
8486
@@ -95,8 +97,7 @@ static public void log(int level, String module, String mess) {
9597 }
9698
9799 public static void main (String [] args ) {
98- @ SuppressWarnings ("unused" )
99- AliDip2BK service = new AliDip2BK ();
100+ @ SuppressWarnings ("unused" ) AliDip2BK service = new AliDip2BK ();
100101 }
101102
102103 public void run () {
@@ -123,25 +124,25 @@ public void shutdownProc() {
123124 public void run () {
124125 log (4 , "AliDip2BK" , " Main class ENTERS in Shutdown hook" );
125126 client .closeSubscriptions ();
126- process .closeInputQueue ();
127- if (process . QueueSize () > 0 ) {
127+ dipMessagesProcessor .closeInputQueue ();
128+ if (dipMessagesProcessor . queueSize () > 0 ) {
128129 for (int i = 0 ; i < 5 ; i ++) {
129130 try {
130131 Thread .sleep (1000 );
131132 } catch (InterruptedException ex ) {
132133 Thread .currentThread ().interrupt ();
133134 }
134135
135- if (process . QueueSize () == 0 ) break ;
136+ if (dipMessagesProcessor . queueSize () == 0 ) break ;
136137 }
137138 }
138139
139- if (process . QueueSize () != 0 ) {
140+ if (dipMessagesProcessor . queueSize () != 0 ) {
140141 log (4 , "AliDip2BK Shutdown" , " Data Proc queue is not EMPTY ! Close it anyway " );
141142 } else {
142143 log (2 , "AliDip2BK Shutdown" , " Data Proc queue is EMPTY and it was correctly closed " );
143144 }
144- process .saveState ();
145+ dipMessagesProcessor .saveState ();
145146 writeStat ("AliDip2BK.stat" , true );
146147 }
147148 });
@@ -183,7 +184,11 @@ private void loadConf(String filename) {
183184
184185 DipParametersFile = ProgPath + para_file_name ;
185186 } else {
186- log (4 , "AliDip2BK.loadConf" , " Dip Data Providers Subscription file name is undefined in the conf file " );
187+ log (
188+ 4 ,
189+ "AliDip2BK.loadConf" ,
190+ " Dip Data Providers Subscription file name is undefined in the conf file "
191+ );
187192 }
188193
189194 String list_param = prop .getProperty ("ListDataProvidersPattern" );
@@ -193,7 +198,11 @@ private void loadConf(String filename) {
193198 LIST_PARAM = true ;
194199 LIST_PARAM_PAT = list_param ;
195200 } else {
196- log (4 , "AliDip2BK.loadConf " , " List DIP Data Providers Pattern is undefined ! The DIP broswer will not start " );
201+ log (
202+ 4 ,
203+ "AliDip2BK.loadConf " ,
204+ " List DIP Data Providers Pattern is undefined ! The DIP broswer will not start "
205+ );
197206 }
198207
199208 String debug_n = prop .getProperty ("DEBUG_LEVEL" );
@@ -211,8 +220,7 @@ private void loadConf(String filename) {
211220 String keh = prop .getProperty ("SAVE_PARAMETERS_HISTORY_PER_RUN" );
212221 if (keh != null ) {
213222 keh = keh .trim ();
214- SAVE_PARAMETERS_HISTORY_PER_RUN = false ;
215- if (keh .equalsIgnoreCase ("Y" )) SAVE_PARAMETERS_HISTORY_PER_RUN = true ;
223+ SAVE_PARAMETERS_HISTORY_PER_RUN = keh .equalsIgnoreCase ("Y" );
216224 if (keh .equalsIgnoreCase ("YES" )) SAVE_PARAMETERS_HISTORY_PER_RUN = true ;
217225 if (keh .equalsIgnoreCase ("true" )) SAVE_PARAMETERS_HISTORY_PER_RUN = true ;
218226 }
@@ -230,9 +238,9 @@ private void loadConf(String filename) {
230238 String sde = prop .getProperty ("SIMULATE_DIP_EVENTS" );
231239 if (sde != null ) {
232240
233- if (sde .equalsIgnoreCase ("Y" )) SIMULATE_DIP_EVENTS = true ;
234- if (sde .equalsIgnoreCase ("YES" )) SIMULATE_DIP_EVENTS = true ;
235- if (sde .equalsIgnoreCase ("true" )) SIMULATE_DIP_EVENTS = true ;
241+ if (sde .equalsIgnoreCase ("Y" )) simulateDipEvents = true ;
242+ if (sde .equalsIgnoreCase ("YES" )) simulateDipEvents = true ;
243+ if (sde .equalsIgnoreCase ("true" )) simulateDipEvents = true ;
236244 }
237245
238246 String kgid = prop .getProperty ("KAFKA_group_id" );
@@ -275,7 +283,7 @@ private void loadConf(String filename) {
275283 public void writeStat (String file , boolean final_report ) {
276284 String full_file = ProgPath + AliDip2BK .KEEP_STATE_DIR + file ;
277285
278- stopDate = (new Date ()).getTime ();
286+ var stopDate = (new Date ()).getTime ();
279287 double dur = (double ) (stopDate - startDate ) / (1000 * 60 * 60 );
280288
281289 Runtime rt = Runtime .getRuntime ();
@@ -288,14 +296,14 @@ public void writeStat(String file, boolean final_report) {
288296 }
289297 mess = mess + " Duration [h]=" + dur + "\n " ;
290298 mess = mess + " Memory Used [MB]=" + usedMB + "\n " ;
291- mess = mess + " No of DIP messages=" + process .statNoDipMess + "\n " ;
292- mess = mess + " No of KAFKA messages=" + process .statNoKafMess + "\n " ;
299+ mess = mess + " No of DIP messages=" + dipMessagesProcessor .statNoDipMess + "\n " ;
300+ mess = mess + " No of KAFKA messages=" + dipMessagesProcessor .statNoKafMess + "\n " ;
293301 mess = mess + " No of KAFKA SOR messages=" + kcs .NoMess + "\n " ;
294302 mess = mess + " No of KAFKA EOR messages=" + kce .NoMess + "\n " ;
295- mess = mess + " No of new Fill messgaes =" + process .statNoNewFills + "\n " ;
296- mess = mess + " No of new Run messgaes =" + process .statNoNewRuns + "\n " ;
297- mess = mess + " No of end Run messages =" + process .statNoEndRuns + "\n " ;
298- mess = mess + " No of Duplicated end Run messages =" + process .statNoDuplicateEndRuns + "\n " ;
303+ mess = mess + " No of new Fill messgaes =" + dipMessagesProcessor .statNoNewFills + "\n " ;
304+ mess = mess + " No of new Run messgaes =" + dipMessagesProcessor .statNoNewRuns + "\n " ;
305+ mess = mess + " No of end Run messages =" + dipMessagesProcessor .statNoEndRuns + "\n " ;
306+ mess = mess + " No of Duplicated end Run messages =" + dipMessagesProcessor .statNoDuplicateEndRuns + "\n " ;
299307
300308 try {
301309 File of = new File (full_file );
@@ -322,7 +330,7 @@ public void verifyDirs() {
322330 public void verifyDir (String name ) {
323331 if (name != null ) {
324332
325- File directory = new File (String . valueOf ( ProgPath + "/" + name ) );
333+ File directory = new File (ProgPath + "/" + name );
326334
327335 if (!directory .exists ()) {
328336 directory .mkdir ();
0 commit comments