31
31
import java .util .Optional ;
32
32
import java .util .stream .Stream ;
33
33
34
+ import net .jpountz .xxhash .XXHash64 ;
35
+ import net .jpountz .xxhash .XXHashFactory ;
34
36
import org .apache .logging .log4j .LogManager ;
35
37
import org .apache .logging .log4j .Logger ;
36
38
import org .exist .EXistException ;
49
51
import static org .exist .util .ThreadUtils .newInstanceThread ;
50
52
51
53
/**
52
- * Manages the journalling log. The database uses one central journal for
54
+ * Manages the journal log. The database uses one central journal for
53
55
* all data files. If the journal exceeds the predefined maximum size, a new file is created.
54
56
* Every journal file has a unique number, which keeps growing during the lifetime of the db.
55
57
* The name of the file corresponds to the file number. The file with the highest
56
58
* number will be used for recovery.
57
- * <p>
59
+ *
58
60
* A buffer is used to temporarily buffer journal entries. To guarantee consistency, the buffer will be flushed
59
- * and the journal is synched after every commit or whenever a db page is written to disk.
60
- * <p>
61
- * Each entry has the structure:
61
+ * and the journal is synced after every commit or whenever a db page is written to disk.
62
+ *
63
+ * Each journal file has the following format:
64
+ *
65
+ * <pre>{@code
66
+ * [magicNumber, version, entry*]
67
+ * }</pre>
68
+ *
69
+ * {@code magicNumber} 4 bytes with the value {@link #JOURNAL_MAGIC_NUMBER}.
70
+ * {@code version} 2 bytes (java.lang.short) with the value {@link #JOURNAL_VERSION}.
71
+ * {@code entry} one or more variable length journal {@code entry} records.
72
+ *
73
+ * Each {@code entry} record has the format:
74
+ *
75
+ * <pre>{@code
76
+ * [entryHeader, data, backLink, checksum]
77
+ * }</pre>
78
+ *
79
+ * {@code entryHeader} 11 bytes describes the entry (see below).
80
+ * {@code data} {@code entryHeader->length} bytes of data for the entry.
81
+ * {@code backLink} 2 bytes (java.lang.short) offset to the start of the entry record, calculated by {@code entryHeader.length + dataLength}.
82
+ * The offset for the start of the entry record can be calculated as {@code endOfRecordOffset - 8 - 2 - backLink}.
83
+ * This is used when scanning the log file backwards for recovery.
84
+ * {@code checksum} 8 bytes for a 64 bit checksum. The checksum includes the {@code entryHeader}, {@code data}, and {@code backLink}.
62
85
*
63
- * <pre>[byte: entryType, long: transactionId, short length, byte[] data, short backLink]</pre>
86
+ * The {@code entryHeader} has the format:
64
87
*
65
- * <ul>
66
- * <li>entryType is a unique id that identifies the log record. Entry types are registered via the
67
- * {@link org.exist.storage.journal.LogEntryTypes} class.</li>
68
- * <li>transactionId: the id of the transaction that created the record.</li>
69
- * <li>length: the length of the log entry data.</li>
70
- * <li>data: the payload data provided by the {@link org.exist.storage.journal.Loggable} object.</li>
71
- * <li>backLink: offset to the start of the record. Used when scanning the log file backwards.</li>
72
- * </ul>
88
+ * <pre>{@code
89
+ * [entryType, transactionId, dataLength]
90
+ * }</pre>
91
+ *
92
+ * {@code entryType} 1 byte indicates the type of the entry.
93
+ * {@code transactionId} 8 bytes (java.lang.long) the id of the transaction that created the record.
94
+ * {@code dataLength} 2 bytes (java.lang.short) the length of the log entry {@code data}.
73
95
*
74
96
* @author wolf
97
+ * @author aretter
75
98
*/
76
99
@ ConfigurationClass ("journal" )
77
100
//TODO: conf.xml refactoring <recovery> => <recovery><journal/></recovery>
@@ -88,7 +111,7 @@ public final class Journal {
88
111
*/
89
112
public static final int JOURNAL_HEADER_LEN = 6 ;
90
113
public static final byte [] JOURNAL_MAGIC_NUMBER = {0x0E , 0x0D , 0x0B , 0x01 };
91
- public static final short JOURNAL_VERSION = 2 ;
114
+ public static final short JOURNAL_VERSION = 3 ;
92
115
93
116
public static final String RECOVERY_SYNC_ON_COMMIT_ATTRIBUTE = "sync-on-commit" ;
94
117
public static final String RECOVERY_JOURNAL_DIR_ATTRIBUTE = "journal-dir" ;
@@ -115,19 +138,35 @@ public final class Journal {
115
138
public static final int LOG_ENTRY_BACK_LINK_LEN = 2 ;
116
139
117
140
/**
118
- * header length + trailing back link
141
+ * the length of the checkum in a log entry
142
+ */
143
+ public static final int LOG_ENTRY_CHECKSUM_LEN = 8 ;
144
+
145
+ /**
146
+ * header length + trailing back link length + checksum length
119
147
*/
120
- public static final int LOG_ENTRY_BASE_LEN = LOG_ENTRY_HEADER_LEN + LOG_ENTRY_BACK_LINK_LEN ;
148
+ public static final int LOG_ENTRY_BASE_LEN = LOG_ENTRY_HEADER_LEN + LOG_ENTRY_BACK_LINK_LEN + LOG_ENTRY_CHECKSUM_LEN ;
121
149
122
150
/**
123
151
* default maximum journal size
124
152
*/
125
- public static final int DEFAULT_MAX_SIZE = 100 ; //MB
153
+ public static final int DEFAULT_MAX_SIZE = 100 ; //MB
126
154
127
155
/**
128
156
* minimal size the journal needs to have to be replaced by a new file during a checkpoint
129
157
*/
130
- private static final int DEFAULT_MIN_SIZE = 1 ; // MB
158
+ private static final int DEFAULT_MIN_SIZE = 1 ; // MB
159
+
160
+ /**
161
+ * We use a 1 megabyte buffer.
162
+ */
163
+ public static final int BUFFER_SIZE = 1024 * 1024 ; // bytes
164
+
165
+ /**
166
+ * Seed used for xxhash-64 checksums calculated
167
+ * by the journal.
168
+ */
169
+ public static final long XXHASH64_SEED = 0x9747b28c ;
131
170
132
171
/**
133
172
* Minimum size limit for the journal file before it is replaced by a new file.
@@ -217,11 +256,12 @@ public final class Journal {
217
256
218
257
private volatile boolean initialised = false ;
219
258
259
+ private final XXHash64 xxHash64 = XXHashFactory .fastestInstance ().hash64 ();
260
+
220
261
public Journal (final BrokerPool pool , final Path directory ) throws EXistException {
221
262
this .pool = pool ;
222
263
this .fsJournalDir = directory .resolve ("fs.journal" );
223
- // we use a 1 megabyte buffer:
224
- this .currentBuffer = ByteBuffer .allocateDirect (1024 * 1024 );
264
+ this .currentBuffer = ByteBuffer .allocateDirect (BUFFER_SIZE );
225
265
226
266
this .fileSyncRunnable = new FileSyncRunnable (latch );
227
267
this .fileSyncThread = newInstanceThread (pool , "file-sync-thread" , fileSyncRunnable );
@@ -296,6 +336,11 @@ public synchronized void writeToLog(final Loggable entry) throws JournalExceptio
296
336
297
337
SanityCheck .ASSERT (!inRecovery , "Write to log during recovery. Should not happen!" );
298
338
final int size = entry .getLogSize ();
339
+
340
+ if (size > Short .MAX_VALUE ) {
341
+ throw new JournalException ("Journal can only write log entries of less that 32KB" );
342
+ }
343
+
299
344
final int required = size + LOG_ENTRY_BASE_LEN ;
300
345
if (required > currentBuffer .remaining ()) {
301
346
flushToLog (false );
@@ -313,11 +358,22 @@ public synchronized void writeToLog(final Loggable entry) throws JournalExceptio
313
358
entry .setLsn (currentLsn );
314
359
315
360
try {
361
+ final int currentBufferEntryOffset = currentBuffer .position ();
362
+
363
+ // write entryHeader
316
364
currentBuffer .put (entry .getLogType ());
317
365
currentBuffer .putLong (entry .getTransactionId ());
318
- currentBuffer .putShort ((short ) entry .getLogSize ());
366
+ currentBuffer .putShort ((short ) size );
367
+
368
+ // write entry data
319
369
entry .write (currentBuffer );
370
+
371
+ // write backlink
320
372
currentBuffer .putShort ((short ) (size + LOG_ENTRY_HEADER_LEN ));
373
+
374
+ // write checksum
375
+ final long checksum = xxHash64 .hash (currentBuffer , currentBufferEntryOffset , currentBuffer .position () - currentBufferEntryOffset , XXHASH64_SEED );
376
+ currentBuffer .putLong (checksum );
321
377
} catch (final BufferOverflowException e ) {
322
378
throw new JournalException ("Buffer overflow while writing log record: " + entry .dump (), e );
323
379
}
@@ -499,7 +555,7 @@ public void switchFiles() throws LogException {
499
555
}
500
556
501
557
private void writeJournalHeader (final SeekableByteChannel channel ) throws IOException {
502
- final ByteBuffer buf = ByteBuffer .allocate (JOURNAL_HEADER_LEN );
558
+ final ByteBuffer buf = ByteBuffer .allocateDirect (JOURNAL_HEADER_LEN );
503
559
504
560
// write the magic number
505
561
buf .put (JOURNAL_MAGIC_NUMBER );
0 commit comments