@@ -411,33 +411,33 @@ public void setChmod(int chmod) {
411411 final Set <PosixFilePermission > permissions = new HashSet <>();
412412 bits .stream ().forEach (b -> {
413413 switch (b ) {
414- case 0 :
415- permissions .add (PosixFilePermission .OTHERS_EXECUTE );
416- break ;
417- case 1 :
418- permissions .add (PosixFilePermission .OTHERS_WRITE );
419- break ;
420- case 2 :
421- permissions .add (PosixFilePermission .OTHERS_READ );
422- break ;
423- case 3 :
424- permissions .add (PosixFilePermission .GROUP_EXECUTE );
425- break ;
426- case 4 :
427- permissions .add (PosixFilePermission .GROUP_WRITE );
428- break ;
429- case 5 :
430- permissions .add (PosixFilePermission .GROUP_READ );
431- break ;
432- case 6 :
433- permissions .add (PosixFilePermission .OWNER_EXECUTE );
434- break ;
435- case 7 :
436- permissions .add (PosixFilePermission .OWNER_WRITE );
437- break ;
438- case 8 :
439- permissions .add (PosixFilePermission .OWNER_READ );
440- break ;
414+ case 0 :
415+ permissions .add (PosixFilePermission .OTHERS_EXECUTE );
416+ break ;
417+ case 1 :
418+ permissions .add (PosixFilePermission .OTHERS_WRITE );
419+ break ;
420+ case 2 :
421+ permissions .add (PosixFilePermission .OTHERS_READ );
422+ break ;
423+ case 3 :
424+ permissions .add (PosixFilePermission .GROUP_EXECUTE );
425+ break ;
426+ case 4 :
427+ permissions .add (PosixFilePermission .GROUP_WRITE );
428+ break ;
429+ case 5 :
430+ permissions .add (PosixFilePermission .GROUP_READ );
431+ break ;
432+ case 6 :
433+ permissions .add (PosixFilePermission .OWNER_EXECUTE );
434+ break ;
435+ case 7 :
436+ permissions .add (PosixFilePermission .OWNER_WRITE );
437+ break ;
438+ case 8 :
439+ permissions .add (PosixFilePermission .OWNER_READ );
440+ break ;
441441 }
442442 });
443443 this .permissions = permissions ;
@@ -474,12 +474,32 @@ public void start() {
474474 }
475475
476476 @ Override
477- public synchronized void stop () {
478- if (this .flushTask != null ) {
479- this .flushTask .cancel (true );
480- this .flushTask = null ;
477+ public void stop () {
478+ synchronized (this ) {
479+ if (this .flushTask != null ) {
480+ this .flushTask .cancel (true );
481+ this .flushTask = null ;
482+ }
483+ }
484+ Flusher flusher = new Flusher ();
485+ flusher .run ();
486+ boolean needInterrupt = this .fileStates .size () > 0 ;
487+ int n = 0 ;
488+ while (n ++ < 10 && this .fileStates .size () > 0 ) {
489+ try {
490+ Thread .sleep (1 );
491+ }
492+ catch (InterruptedException e ) {
493+ // cancel the interrupt
494+ }
495+ flusher .run ();
496+ }
497+ if (this .fileStates .size () > 0 ) {
498+ this .logger .error ("Failed to flush after multiple attempts, while stopping: " + this .fileStates .keySet ());
499+ }
500+ if (needInterrupt ) {
501+ Thread .currentThread ().interrupt ();
481502 }
482- new Flusher ().run ();
483503 }
484504
485505 @ Override
@@ -524,10 +544,10 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
524544 timestamp = ((File ) payload ).lastModified ();
525545 }
526546 boolean ignore = (FileExistsMode .IGNORE .equals (this .fileExistsMode )
527- && (exists || (StringUtils .hasText (this .temporaryFileSuffix ) && tempFile .exists ())))
547+ && (exists || (StringUtils .hasText (this .temporaryFileSuffix ) && tempFile .exists ())))
528548 || ((exists && FileExistsMode .REPLACE_IF_MODIFIED .equals (this .fileExistsMode ))
529- && (timestamp instanceof Number
530- && ((Number ) timestamp ).longValue () == resultFile .lastModified ()));
549+ && (timestamp instanceof Number
550+ && ((Number ) timestamp ).longValue () == resultFile .lastModified ()));
531551 if (!ignore ) {
532552 try {
533553 if (!exists &&
@@ -862,10 +882,10 @@ private File evaluateDestinationDirectoryExpression(Message<?> message) {
862882
863883 private synchronized FileState getFileState (final File fileToWriteTo , boolean isString )
864884 throws FileNotFoundException {
865- String absolutePath = fileToWriteTo .getAbsolutePath ();
866885 FileState state ;
867886 boolean appendNoFlush = FileExistsMode .APPEND_NO_FLUSH .equals (this .fileExistsMode );
868887 if (appendNoFlush ) {
888+ String absolutePath = fileToWriteTo .getAbsolutePath ();
869889 state = this .fileStates .get (absolutePath );
870890 if (state != null && ((isString && state .stream != null ) || (!isString && state .writer != null ))) {
871891 state .close ();
@@ -938,16 +958,10 @@ public void trigger(Message<?> message) {
938958 * @param flushPredicate the {@link FlushPredicate}.
939959 * @since 4.3
940960 */
941- public synchronized void flushIfNeeded (FlushPredicate flushPredicate ) {
942- Iterator <Entry <String , FileState >> iterator = this .fileStates .entrySet ().iterator ();
943- while (iterator .hasNext ()) {
944- Entry <String , FileState > entry = iterator .next ();
945- FileState state = entry .getValue ();
946- if (flushPredicate .shouldFlush (entry .getKey (), state .firstWrite , state .lastWrite )) {
947- iterator .remove ();
948- state .close ();
949- }
950- }
961+ public void flushIfNeeded (FlushPredicate flushPredicate ) {
962+ flushIfNeeded ((fileAbsolutePath , firstWrite , lastWrite , filterMessage ) ->
963+ flushPredicate .shouldFlush (fileAbsolutePath , firstWrite , lastWrite ),
964+ null );
951965 }
952966
953967 /**
@@ -959,16 +973,24 @@ public synchronized void flushIfNeeded(FlushPredicate flushPredicate) {
959973 * @param filterMessage an optional message passed into the predicate.
960974 * @since 4.3
961975 */
962- public synchronized void flushIfNeeded (MessageFlushPredicate flushPredicate , Message <?> filterMessage ) {
963- Iterator <Entry <String , FileState >> iterator = this .fileStates .entrySet ().iterator ();
964- while (iterator .hasNext ()) {
965- Entry <String , FileState > entry = iterator .next ();
966- FileState state = entry .getValue ();
967- if (flushPredicate .shouldFlush (entry .getKey (), state .firstWrite , state .lastWrite , filterMessage )) {
968- iterator .remove ();
969- state .close ();
976+ public void flushIfNeeded (MessageFlushPredicate flushPredicate , Message <?> filterMessage ) {
977+ doFlush (findFilesToFlush (flushPredicate , filterMessage ));
978+ }
979+
980+ private Map <String , FileState > findFilesToFlush (MessageFlushPredicate flushPredicate , Message <?> filterMessage ) {
981+ Map <String , FileState > toRemove = new HashMap <>();
982+ synchronized (this ) {
983+ Iterator <Entry <String , FileState >> iterator = this .fileStates .entrySet ().iterator ();
984+ while (iterator .hasNext ()) {
985+ Entry <String , FileState > entry = iterator .next ();
986+ FileState state = entry .getValue ();
987+ if (flushPredicate .shouldFlush (entry .getKey (), state .firstWrite , state .lastWrite , filterMessage )) {
988+ iterator .remove ();
989+ toRemove .put (entry .getKey (), state );
990+ }
970991 }
971992 }
993+ return toRemove ;
972994 }
973995
974996 private synchronized void clearState (final File fileToWriteTo , final FileState state ) {
@@ -977,6 +999,33 @@ private synchronized void clearState(final File fileToWriteTo, final FileState s
977999 }
9781000 }
9791001
1002+ private void doFlush (Map <String , FileState > toRemove ) {
1003+ Map <String , FileState > toRestore = new HashMap <>();
1004+ boolean interrupted = false ;
1005+ for (Entry <String , FileState > entry : toRemove .entrySet ()) {
1006+ if (!interrupted && entry .getValue ().close ()) {
1007+ if (FileWritingMessageHandler .this .logger .isDebugEnabled ()) {
1008+ FileWritingMessageHandler .this .logger .debug ("Flushed: " + entry .getKey ());
1009+ }
1010+ }
1011+ else { // interrupted (stop), re-add
1012+ interrupted = true ;
1013+ toRestore .put (entry .getKey (), entry .getValue ());
1014+ }
1015+ }
1016+ if (interrupted ) {
1017+ if (FileWritingMessageHandler .this .logger .isDebugEnabled ()) {
1018+ FileWritingMessageHandler .this .logger
1019+ .debug ("Interrupted during flush; not flushed: " + toRestore .keySet ());
1020+ }
1021+ synchronized (this ) {
1022+ for (Entry <String , FileState > entry : toRestore .entrySet ()) {
1023+ this .fileStates .putIfAbsent (entry .getKey (), entry .getValue ());
1024+ }
1025+ }
1026+ }
1027+ }
1028+
9801029 private static void rename (File source , File target ) throws IOException {
9811030 Files .move (source .toPath (), target .toPath (), StandardCopyOption .REPLACE_EXISTING );
9821031 }
@@ -1039,6 +1088,7 @@ private final class Flusher implements Runnable {
10391088
10401089 @ Override
10411090 public void run () {
1091+ Map <String , FileState > toRemove = new HashMap <>();
10421092 synchronized (FileWritingMessageHandler .this ) {
10431093 long expired = FileWritingMessageHandler .this .flushTask == null ? Long .MAX_VALUE
10441094 : (System .currentTimeMillis () - FileWritingMessageHandler .this .flushInterval );
@@ -1049,18 +1099,12 @@ public void run() {
10491099 FileState state = entry .getValue ();
10501100 if (state .lastWrite < expired ||
10511101 (!FileWritingMessageHandler .this .flushWhenIdle && state .firstWrite < expired )) {
1052- if (state .close ()) {
1053- if (FileWritingMessageHandler .this .logger .isDebugEnabled ()) {
1054- FileWritingMessageHandler .this .logger .debug ("Flushed: " + entry .getKey ());
1055- }
1056- iterator .remove ();
1057- }
1058- else {
1059- break ; // interrupted
1060- }
1102+ toRemove .put (entry .getKey (), state );
1103+ iterator .remove ();
10611104 }
10621105 }
10631106 }
1107+ doFlush (toRemove );
10641108 }
10651109
10661110 }
0 commit comments