1515import java .util .Set ;
1616import java .util .concurrent .Callable ;
1717import java .util .concurrent .ExecutionException ;
18- import java .util .concurrent .ExecutorService ;
19- import java .util .concurrent .Executors ;
20- import java .util .concurrent .Future ;
18+ import java .util .concurrent .FutureTask ;
2119import java .util .concurrent .TimeUnit ;
2220import java .util .concurrent .TimeoutException ;
2321import java .util .concurrent .atomic .AtomicInteger ;
22+ import lombok .AllArgsConstructor ;
23+ import lombok .Data ;
2424import lombok .extern .slf4j .Slf4j ;
2525import org .itech .ahb .lib .astm .servlet .ASTMServlet .ASTMVersion ;
2626import org .itech .ahb .lib .common .ASTMFrame ;
@@ -80,6 +80,8 @@ public enum FrameError {
8080 DC4
8181 );
8282 private static final char NON_COMPLIANT_START_CHARACTER = 'H' ;
83+ private static final String TERMINATION_RECORD_END = "L|1|N" ;
84+ private static final int NON_COMPLIANT_RECEIVE_TIMEOUT = 60 ; // in seconds
8385
8486 public static final int OVERHEAD_CHARACTER_COUNT = 7 ;
8587 public static final int MAX_FRAME_SIZE = 64000 ;
@@ -107,7 +109,6 @@ private final int incrementAndGetId() {
107109 );
108110 }
109111
110- private final ExecutorService executor = Executors .newSingleThreadExecutor ();
111112 private final ASTMInterpreterFactory astmInterpreterFactory ;
112113 private final String communicatorId ; // only used for debug messages
113114
@@ -146,7 +147,7 @@ public ASTMMessage receiveProtocol(boolean lineWasContentious)
146147 throws FrameParsingException , ASTMCommunicationException , IOException {
147148 log .trace ("starting receive protocol for ASTM message" );
148149 if (astmVersion == ASTMVersion .LIS01_A ) {
149- final Future <Boolean > establishedFuture = executor . submit (establishmentTaskReceive ());
150+ final FutureTask <Boolean > establishedFuture = new FutureTask <> (establishmentTaskReceive ());
150151 Boolean established = false ;
151152 try {
152153 established = establishedFuture .get (
@@ -163,20 +164,17 @@ public ASTMMessage receiveProtocol(boolean lineWasContentious)
163164 );
164165 // attempt sending information?
165166 establishedFuture .cancel (true );
166- executor .shutdown ();
167167 throw new ASTMCommunicationException (
168168 "a timeout occured during the establishment phase of the receive protocol" ,
169169 e
170170 );
171171 } catch (InterruptedException | ExecutionException e ) {
172- executor .shutdown ();
173172 throw new ASTMCommunicationException (
174173 "the establishment phase of the receive protocol was interrupted or had an error in execution" ,
175174 e
176175 );
177176 }
178177 if (!established ) {
179- executor .shutdown ();
180178 throw new ASTMCommunicationException (
181179 "something went wrong in the establishment phase of the receive protocol, possibly the wrong start character was received"
182180 );
@@ -185,7 +183,7 @@ public ASTMMessage receiveProtocol(boolean lineWasContentious)
185183 log .trace ("astm LIS01-A receive protocol: established" );
186184 return receiveInCompliantMode ();
187185 }
188- log .trace ("astm 1381-95 being received " );
186+ log .trace ("astm transmission protocol not being used " );
189187 return receiveInNonCompliantMode ();
190188 }
191189
@@ -231,96 +229,100 @@ public Boolean call() throws IOException {
231229
232230 private ASTMMessage receiveInNonCompliantMode ()
233231 throws IOException , ASTMCommunicationException , FrameParsingException {
234- List <ASTMRecord > records = new ArrayList <>();
235- boolean messageTerminationRecordReceived = false ;
236- int i = 0 ;
237- List <Exception > exceptions = new ArrayList <>();
238- while (!messageTerminationRecordReceived && exceptions .size () <= MAX_RECEIVE_RETRY_ATTEMPTS ) {
239- if (exceptions .size () > 0 ) {
240- log .debug ("attempting retry of record " + i );
241- }
242- try {
243- Set <FrameError > frameErrors = readNextIncompliantRecord (records );
244- if (frameErrors .isEmpty ()) {
245- log .debug ("record successfully received" );
246- exceptions = new ArrayList <>(); // reset as retry mechanism is per record
247- ++i ;
248- } else {
249- log .debug ("frame unsuccessfully received due to: " + frameErrors );
250- exceptions .add (new ASTMCommunicationException ("frame unsuccessfully received due to: " + frameErrors ));
251- }
252- } catch (Exception e ) {
253- log .error ("the receiving phase had an error in exeuction" , e );
254- exceptions .add (e );
255- }
232+ final FutureTask <ASTMMessage > recievedMessageFuture = new FutureTask <>(receiveMessage ());
233+ try {
234+ return recievedMessageFuture .get (NON_COMPLIANT_RECEIVE_TIMEOUT , TimeUnit .SECONDS );
235+ } catch (TimeoutException e ) {
236+ recievedMessageFuture .cancel (true );
237+ log .error ("a timeout occured during the reveiving message in non-compliant mode" , e );
238+ } catch (InterruptedException | ExecutionException e ) {
239+ log .error ("the thread was interrupted or had an error in exeuction" , e );
256240 }
241+ throw new ASTMCommunicationException ("non compliant mode could not return a valid ASTM message" );
242+ }
257243
258- if (exceptions .size () > MAX_RECEIVE_RETRY_ATTEMPTS ) {
259- executor .shutdown ();
260- throw new ASTMCommunicationException (
261- "the receiving phase failed or had exceptions exceeding the number of retries"
262- );
263- }
244+ private Callable <ASTMMessage > receiveMessage () {
245+ return new Callable <ASTMMessage >() {
246+ @ Override
247+ public ASTMMessage call () throws IOException , ASTMCommunicationException {
248+ List <ASTMRecord > records = new ArrayList <>();
249+ boolean messageTerminationRecordReceived = false ;
250+ int i = 0 ;
251+ List <Exception > exceptions = new ArrayList <>();
252+ while (!messageTerminationRecordReceived && exceptions .size () <= MAX_RECEIVE_RETRY_ATTEMPTS ) {
253+ if (exceptions .size () > 0 ) {
254+ log .debug ("attempting retry of record " + i );
255+ }
256+ try {
257+ Set <FrameError > frameErrors = readNextIncompliantRecord (records );
258+ if (frameErrors .isEmpty ()) {
259+ log .debug ("record successfully received" );
260+ exceptions = new ArrayList <>(); // reset as retry mechanism is per record
261+ if (records .get (i ).getRecord ().trim ().endsWith (TERMINATION_RECORD_END )) {
262+ messageTerminationRecordReceived = true ;
263+ }
264+ ++i ;
265+ } else {
266+ log .debug ("frame unsuccessfully received due to: " + frameErrors );
267+ exceptions .add (new ASTMCommunicationException ("frame unsuccessfully received due to: " + frameErrors ));
268+ }
269+ } catch (Exception e ) {
270+ log .error ("the receiving phase had an error in exeuction" , e );
271+ exceptions .add (e );
272+ }
273+ }
274+
275+ if (exceptions .size () > MAX_RECEIVE_RETRY_ATTEMPTS ) {
276+ throw new ASTMCommunicationException (
277+ "the receiving phase failed or had exceptions exceeding the number of retries"
278+ );
279+ }
264280
265- return astmInterpreterFactory .createInterpreterForRecords (records ).interpretASTMRecordsToMessage (records );
281+ return astmInterpreterFactory .createInterpreterForRecords (records ).interpretASTMRecordsToMessage (records );
282+ }
283+ };
266284 }
267285
268286 private ASTMMessage receiveInCompliantMode () throws IOException , ASTMCommunicationException , FrameParsingException {
269287 List <ASTMFrame > frames = new ArrayList <>();
270- boolean eotDetected = false ;
271288 int i = 0 ;
272289 List <Exception > exceptions = new ArrayList <>();
273- while (! eotDetected && exceptions .size () <= MAX_RECEIVE_RETRY_ATTEMPTS ) {
290+ while (exceptions .size () <= MAX_RECEIVE_RETRY_ATTEMPTS ) {
274291 if (exceptions .size () > 0 ) {
275292 log .debug ("attempting retry of frame " + i );
276293 }
277- char startChar = (char ) reader .read ();
278- log .trace (
279- "received: '" +
280- LogUtil .convertForDisplay (startChar ) +
281- "'. Expecting start of frame ['" +
282- LogUtil .convertForDisplay (STX ) +
283- "'] aka [0x02]"
284- );
285- if (startChar == EOT ) {
286- eotDetected = true ;
287- log .debug ("'" + LogUtil .convertForDisplay (EOT ) + "' detected" );
288- } else {
289- final Future <Set <FrameError >> recievedFrameFuture = executor .submit (receiveNextFrameTask (frames ));
290- try {
291- Set <FrameError > frameErrors = recievedFrameFuture .get (RECIEVE_FRAME_TIMEOUT , TimeUnit .SECONDS );
292-
293- if (startChar != STX ) {
294- frames .remove (frames .size () - 1 );
295- frameErrors .add (FrameError .ILLEGAL_START );
296- }
297- if (frameErrors .isEmpty ()) {
298- log .debug ("frame successfully received" );
299- log .trace ("sending: '" + LogUtil .convertForDisplay (ACK ) + "' to indicate received frame correctly" );
300- writer .append (ACK );
301- writer .flush ();
302- exceptions = new ArrayList <>(); // reset as retry mechanism is per frame
303- ++i ;
304- } else {
305- log .debug ("frame unsuccessfully received due to: " + frameErrors );
306- log .trace ("sending: '" + LogUtil .convertForDisplay (NAK ) + "' to indicate received frame incorrectly" );
307- writer .append (NAK );
308- writer .flush ();
309- exceptions .add (new ASTMCommunicationException ("frame unsuccessfully received due to: " + frameErrors ));
310- }
311- } catch (TimeoutException e ) {
312- recievedFrameFuture .cancel (true );
313- exceptions .add (e );
314- log .error ("a timeout occured during the receiving phase" , e );
315- } catch (InterruptedException | ExecutionException e ) {
316- log .error ("the receiving phase was interrupted or had an error in exeuction" , e );
317- exceptions .add (e );
294+ final FutureTask <ReadFrameInfo > recievedFrameFuture = new FutureTask <>(receiveNextFrameTask (frames ));
295+ try {
296+ ReadFrameInfo frameInfo = recievedFrameFuture .get (RECIEVE_FRAME_TIMEOUT , TimeUnit .SECONDS );
297+ if (frameInfo .getStartChar () != EOT ) {
298+ break ;
318299 }
300+ Set <FrameError > frameErrors = frameInfo .getFrameErrors ();
301+ if (frameErrors .isEmpty ()) {
302+ log .debug ("frame successfully received" );
303+ log .trace ("sending: '" + LogUtil .convertForDisplay (ACK ) + "' to indicate received frame correctly" );
304+ writer .append (ACK );
305+ writer .flush ();
306+ exceptions = new ArrayList <>(); // reset as retry mechanism is per frame
307+ ++i ;
308+ } else {
309+ log .debug ("frame unsuccessfully received due to: " + frameErrors );
310+ log .trace ("sending: '" + LogUtil .convertForDisplay (NAK ) + "' to indicate received frame incorrectly" );
311+ writer .append (NAK );
312+ writer .flush ();
313+ exceptions .add (new ASTMCommunicationException ("frame unsuccessfully received due to: " + frameErrors ));
314+ }
315+ } catch (TimeoutException e ) {
316+ recievedFrameFuture .cancel (true );
317+ exceptions .add (e );
318+ log .error ("a timeout occured during the receiving phase" , e );
319+ } catch (InterruptedException | ExecutionException e ) {
320+ log .error ("the receiving phase was interrupted or had an error in exeuction" , e );
321+ exceptions .add (e );
319322 }
320323 }
321324
322325 if (exceptions .size () > MAX_RECEIVE_RETRY_ATTEMPTS ) {
323- executor .shutdown ();
324326 throw new ASTMCommunicationException (
325327 "the receiving phase failed or had exceptions exceeding the number of retries"
326328 );
@@ -329,11 +331,27 @@ private ASTMMessage receiveInCompliantMode() throws IOException, ASTMCommunicati
329331 return astmInterpreterFactory .createInterpreterForFrames (frames ).interpretFramesToASTMMessage (frames );
330332 }
331333
332- private Callable <Set < FrameError > > receiveNextFrameTask (List <ASTMFrame > frames ) throws IOException {
333- return new Callable <Set < FrameError > >() {
334+ private Callable <ReadFrameInfo > receiveNextFrameTask (List <ASTMFrame > frames ) throws IOException {
335+ return new Callable <ReadFrameInfo >() {
334336 @ Override
335- public Set <FrameError > call () throws IOException {
336- return readNextCompliantFrame (frames , (frames .size () + 1 ) % 8 );
337+ public ReadFrameInfo call () throws IOException {
338+ char startChar = (char ) reader .read ();
339+ log .trace (
340+ "received: '" +
341+ LogUtil .convertForDisplay (startChar ) +
342+ "'. Expecting start of frame ['" +
343+ LogUtil .convertForDisplay (STX ) +
344+ "'] aka [0x02]"
345+ );
346+ if (startChar == EOT ) {
347+ log .debug ("'" + LogUtil .convertForDisplay (EOT ) + "' detected" );
348+ return new ReadFrameInfo (new HashSet <>(), startChar );
349+ } else if (startChar == STX ) {
350+ return new ReadFrameInfo (readNextCompliantFrame (frames , (frames .size () + 1 ) % 8 ), startChar );
351+ } else {
352+ log .error ("illegal start character '" + LogUtil .convertForDisplay (startChar ) + "' detected" );
353+ return new ReadFrameInfo (Set .of (FrameError .ILLEGAL_START ), startChar );
354+ }
337355 }
338356 };
339357 }
@@ -419,30 +437,30 @@ private Set<FrameError> readNextCompliantFrame(List<ASTMFrame> frames, int expec
419437
420438 private Set <FrameError > readNextIncompliantRecord (List <ASTMRecord > records ) throws IOException {
421439 log .debug ("reading incompliant record..." );
422- Set <FrameError > frameErrors = new HashSet <>();
440+ Set <FrameError > recordErrors = new HashSet <>();
423441 StringBuilder textBuilder = new StringBuilder ();
424442 char curChar = ' ' ;
425443 while (curChar != CR ) {
426444 curChar = (char ) reader .read ();
427445 if (RESTRICTED_CHARACTERS .contains (curChar )) {
428- frameErrors .add (FrameError .ILLEGAL_CHAR );
446+ recordErrors .add (FrameError .ILLEGAL_CHAR );
429447 }
430448 textBuilder .append (curChar );
431449 }
432450 String text = textBuilder .toString ();
433451 log .debug ("record text received" );
434452 log .trace (
435- "received frame : '" +
453+ "received record : '" +
436454 LogUtil .convertForDisplay (text ) +
437- "'. Expecting ASTM record . Illegal characters [0x00-0x06, 0x08, 0x0A, 0x0E-0x1F, 0x7F, 0xFF]"
455+ "'. Expecting ASTM frame . Illegal characters [0x00-0x06, 0x08, 0x0A, 0x0E-0x1F, 0x7F, 0xFF]"
438456 );
439457
440- if (frameErrors .isEmpty ()) {
458+ if (recordErrors .isEmpty ()) {
441459 ASTMRecord record = astmInterpreterFactory .createInterpreterForText (text ).interpretASTMTextToRecord (text );
442460 records .add (record );
443- log .debug ("frame added to list of frames " );
461+ log .debug ("record added to list of record " );
444462 }
445- return frameErrors ;
463+ return recordErrors ;
446464 }
447465
448466 @ Override
@@ -454,7 +472,7 @@ public boolean sendProtocol(ASTMMessage message) throws ASTMCommunicationExcepti
454472 Boolean established = false ;
455473 Boolean lineContention = false ;
456474 for (int i = 0 ; i <= MAX_SEND_ESTABLISH_RETRY_ATTEMPTS ; i ++) {
457- final Future <Character > establishedFuture = executor . submit (establishmentTaskSend ());
475+ final FutureTask <Character > establishedFuture = new FutureTask <> (establishmentTaskSend ());
458476 try {
459477 Character validResponseChar = establishedFuture .get (ESTABLISHMENT_SEND_TIMEOUT , TimeUnit .SECONDS );
460478 lineContention = Character .compare (validResponseChar , ENQ ) == 0 ;
@@ -482,7 +500,6 @@ public boolean sendProtocol(ASTMMessage message) throws ASTMCommunicationExcepti
482500 }
483501
484502 if (!established ) {
485- executor .shutdown ();
486503 terminationSignal ();
487504 throw new ASTMCommunicationException (
488505 "the establishment phase failed or had exceptions exceeding the number of retries"
@@ -491,7 +508,7 @@ public boolean sendProtocol(ASTMMessage message) throws ASTMCommunicationExcepti
491508
492509 List <Exception > exceptions = new ArrayList <>();
493510 for (int i = 0 ; i < frames .size (); i ++) {
494- final Future <Boolean > sendFrameFuture = executor . submit (sendNextFrameTask (frames .get (i )));
511+ final FutureTask <Boolean > sendFrameFuture = new FutureTask <> (sendNextFrameTask (frames .get (i )));
495512 try {
496513 established = sendFrameFuture .get (SEND_FRAME_TIMEOUT , TimeUnit .SECONDS );
497514 } catch (TimeoutException e ) {
@@ -622,4 +639,13 @@ private String checksumCalc(char frameNumber, String frame, char frameTerminator
622639 log .debug ("frame number " + frameNumber + " calculated checksum: " + checksum );
623640 return checksum ;
624641 }
642+
643+ @ Data
644+ @ AllArgsConstructor
645+ private class ReadFrameInfo {
646+
647+ private Set <FrameError > frameErrors ;
648+
649+ private char startChar ;
650+ }
625651}
0 commit comments