98
98
*/
99
99
@ ConfigurationClass ("journal" )
100
100
//TODO: conf.xml refactoring <recovery> => <recovery><journal/></recovery>
101
- public final class Journal {
101
+ public final class Journal implements Closeable {
102
102
/**
103
103
* Logger for this class
104
104
*/
@@ -187,13 +187,7 @@ public final class Journal {
187
187
* the current output channel
188
188
* Only valid after switchFiles() was called at least once!
189
189
*/
190
- private SeekableByteChannel channel ;
191
-
192
- /**
193
- * Syncing the journal is done by a background thread
194
- */
195
- private final FileSyncRunnable fileSyncRunnable ;
196
- private final Thread fileSyncThread ;
190
+ private FileChannel channel ;
197
191
198
192
/**
199
193
* latch used to synchronize writes to the channel
@@ -263,10 +257,6 @@ public Journal(final BrokerPool pool, final Path directory) throws EXistExceptio
263
257
this .fsJournalDir = directory .resolve ("fs.journal" );
264
258
this .currentBuffer = ByteBuffer .allocateDirect (BUFFER_SIZE );
265
259
266
- this .fileSyncRunnable = new FileSyncRunnable (latch );
267
- this .fileSyncThread = newInstanceThread (pool , "file-sync-thread" , fileSyncRunnable );
268
- fileSyncThread .start (); //this makes us to use class as a final only - no inheritance allowed
269
-
270
260
this .syncOnCommit = pool .getConfiguration ().getProperty (PROPERTY_RECOVERY_SYNC_ON_COMMIT , DEFAULT_SYNC_ON_COMMIT );
271
261
if (LOG .isDebugEnabled ()) {
272
262
LOG .debug ("SyncOnCommit = " + syncOnCommit );
@@ -409,11 +399,18 @@ public synchronized void flushToLog(final boolean fsync, final boolean forceSync
409
399
if (inRecovery ) {
410
400
return ;
411
401
}
402
+
412
403
flushBuffer ();
413
- if (forceSync || (fsync && syncOnCommit && currentLsn .compareTo (lastSyncLsn ) > 0 )) {
414
- fileSyncRunnable .triggerSync ();
415
- lastSyncLsn = currentLsn ;
404
+
405
+ try {
406
+ if (forceSync || (fsync && syncOnCommit && currentLsn .compareTo (lastSyncLsn ) > 0 )) {
407
+ sync ();
408
+ lastSyncLsn = currentLsn ;
409
+ }
410
+ } catch (final IOException e ) {
411
+ LOG .error ("Could not sync Journal to disk: " + e .getMessage (), e );
416
412
}
413
+
417
414
try {
418
415
if (channel != null && channel .size () >= journalSizeLimit ) {
419
416
pool .triggerCheckpoint ();
@@ -423,6 +420,10 @@ public synchronized void flushToLog(final boolean fsync, final boolean forceSync
423
420
}
424
421
}
425
422
423
+ private void sync () throws IOException {
424
+ channel .force (true );
425
+ }
426
+
426
427
/**
427
428
* Flush the buffer to disk.
428
429
*/
@@ -541,11 +542,13 @@ public void switchFiles() throws LogException {
541
542
}
542
543
543
544
synchronized (latch ) {
544
- close ();
545
545
try {
546
- channel = Files .newByteChannel (file , CREATE_NEW , WRITE );
546
+ // close current file
547
+ close ();
548
+
549
+ // open new file
550
+ channel = (FileChannel ) Files .newByteChannel (file , CREATE_NEW , WRITE );
547
551
writeJournalHeader (channel );
548
- fileSyncRunnable .setChannel ((FileChannel ) channel );
549
552
initialised = true ;
550
553
} catch (final IOException e ) {
551
554
throw new LogException ("Failed to open new journal: " + file .toAbsolutePath ().toString (), e );
@@ -571,13 +574,15 @@ private void writeJournalHeader(final SeekableByteChannel channel) throws IOExce
571
574
/**
572
575
* Close the journal.
573
576
*/
574
- public void close () {
577
+ @ Override
578
+ public void close () throws IOException {
575
579
if (channel != null ) {
576
580
try {
577
- channel . close ();
581
+ sync ();
578
582
} catch (final IOException e ) {
579
- LOG .warn ( "Failed to close journal channel" , e );
583
+ LOG .error ( e . getMessage () , e );
580
584
}
585
+ channel .close ();
581
586
}
582
587
}
583
588
@@ -647,23 +652,23 @@ public void shutdown(final long txnId, final boolean checkpoint) {
647
652
648
653
if (!BrokerPool .FORCE_CORRUPTION ) {
649
654
if (checkpoint ) {
650
- LOG .info ("Transaction journal cleanly shutting down with checkpoint..." );
655
+ LOG .info ("Shutting down Journal with checkpoint..." );
651
656
try {
652
657
writeToLog (new Checkpoint (txnId ));
653
658
} catch (final JournalException e ) {
654
- LOG .error ("An error occurred while closing the journal file : " + e .getMessage (), e );
659
+ LOG .error ("An error occurred whilst writing a checkpoint to the Journal : " + e .getMessage (), e );
655
660
}
656
661
}
657
662
flushBuffer ();
658
663
}
659
- fileLock .release ();
660
- fileSyncRunnable .shutdown ();
661
- fileSyncThread .interrupt ();
664
+
662
665
try {
663
- fileSyncThread . join ();
664
- } catch (final InterruptedException e ) {
665
- //Nothing to do
666
+ channel . close ();
667
+ } catch (final IOException e ) {
668
+ LOG . error ( "Unable to close Journal file: " + e . getMessage (), e );
666
669
}
670
+ channel = null ;
671
+ fileLock .release ();
667
672
currentBuffer = null ;
668
673
}
669
674
0 commit comments