19
19
*/
20
20
package org .exist .storage .journal ;
21
21
22
+ import net .jpountz .xxhash .StreamingXXHash64 ;
23
+ import net .jpountz .xxhash .XXHashFactory ;
22
24
import org .apache .logging .log4j .LogManager ;
23
25
import org .apache .logging .log4j .Logger ;
24
26
import org .exist .storage .DBBroker ;
@@ -52,6 +54,8 @@ public class JournalReader implements AutoCloseable {
52
54
@ Nullable
53
55
private SeekableByteChannel fc ;
54
56
57
+ private final StreamingXXHash64 xxHash64 = XXHashFactory .fastestInstance ().newStreamingHash64 (Journal .XXHASH64_SEED );
58
+
55
59
/**
56
60
* Opens the specified file for reading.
57
61
*
@@ -74,7 +78,7 @@ public JournalReader(final DBBroker broker, final Path file, final int fileNumbe
74
78
75
79
private void validateJournalHeader (final Path file , final SeekableByteChannel fc ) throws IOException , LogException {
76
80
// read the magic number
77
- final ByteBuffer buf = ByteBuffer .allocate (JOURNAL_HEADER_LEN );
81
+ final ByteBuffer buf = ByteBuffer .allocateDirect (JOURNAL_HEADER_LEN );
78
82
fc .read (buf );
79
83
buf .flip ();
80
84
@@ -137,18 +141,18 @@ Loggable previousEntry() throws LogException {
137
141
return null ;
138
142
}
139
143
140
- // go back two bytes and read the back-link of the last entry
141
- fc .position (fc .position () - LOG_ENTRY_BACK_LINK_LEN );
144
+ // go back 8 bytes (checksum length) + 2 bytes (backLink length) and read the backLink (2 bytes) of the last entry
145
+ fc .position (fc .position () - LOG_ENTRY_CHECKSUM_LEN - LOG_ENTRY_BACK_LINK_LEN );
142
146
header .clear ().limit (LOG_ENTRY_BACK_LINK_LEN );
143
147
final int read = fc .read (header );
144
148
if (read != LOG_ENTRY_BACK_LINK_LEN ) {
145
149
throw new LogException ("Unable to read journal entry back-link!" );
146
150
}
147
151
header .flip ();
148
- final short prevLink = header .getShort ();
152
+ final short backLink = header .getShort ();
149
153
150
154
// position the channel to the start of the previous entry and mark it
151
- final long prevStart = fc .position () - LOG_ENTRY_BACK_LINK_LEN - prevLink ;
155
+ final long prevStart = fc .position () - LOG_ENTRY_BACK_LINK_LEN - backLink ;
152
156
fc .position (prevStart );
153
157
final Loggable loggable = readEntry ();
154
158
@@ -204,6 +208,19 @@ Loggable readEntry() throws LogException {
204
208
}
205
209
header .flip ();
206
210
211
+ // prepare the checksum for the header
212
+ xxHash64 .reset ();
213
+ if (header .hasArray ()) {
214
+ xxHash64 .update (header .array (), 0 , LOG_ENTRY_HEADER_LEN );
215
+ } else {
216
+ final int mark = header .position ();
217
+ header .position (0 );
218
+ final byte buf [] = new byte [LOG_ENTRY_HEADER_LEN ];
219
+ header .get (buf );
220
+ xxHash64 .update (buf , 0 , LOG_ENTRY_HEADER_LEN );
221
+ header .position (mark );
222
+ }
223
+
207
224
final byte entryType = header .get ();
208
225
final long transactId = header .getLong ();
209
226
final short size = header .getShort ();
@@ -218,23 +235,51 @@ Loggable readEntry() throws LogException {
218
235
}
219
236
loggable .setLsn (lsn );
220
237
221
- if (size + LOG_ENTRY_BACK_LINK_LEN > payload .capacity ()) {
238
+ final int remainingEntryBytes = size + LOG_ENTRY_BACK_LINK_LEN + LOG_ENTRY_CHECKSUM_LEN ;
239
+
240
+ if (remainingEntryBytes > payload .capacity ()) {
222
241
// resize the payload buffer
223
- payload = ByteBuffer .allocate ( size + LOG_ENTRY_BACK_LINK_LEN );
242
+ payload = ByteBuffer .allocateDirect ( remainingEntryBytes );
224
243
}
225
- payload .clear ().limit (size + LOG_ENTRY_BACK_LINK_LEN );
244
+ payload .clear ().limit (remainingEntryBytes );
226
245
read = fc .read (payload );
227
- if (read < size + LOG_ENTRY_BACK_LINK_LEN ) {
246
+ if (read < remainingEntryBytes ) {
228
247
throw new LogException ("Incomplete log entry found!" );
229
248
}
230
249
payload .flip ();
250
+
251
+ // read entry data
231
252
loggable .read (payload );
232
- final short prevLink = payload .getShort ();
233
- if (prevLink != size + LOG_ENTRY_HEADER_LEN ) {
234
- LOG .error ("Bad pointer to previous: prevLink = " + prevLink + "; size = " + size +
253
+
254
+ // read entry backLink
255
+ final short backLink = payload .getShort ();
256
+ if (backLink != size + LOG_ENTRY_HEADER_LEN ) {
257
+ LOG .error ("Bad pointer to previous: backLink = " + backLink + "; size = " + size +
235
258
"; transactId = " + transactId );
236
259
throw new LogException ("Bad pointer to previous in entry: " + loggable .dump ());
237
260
}
261
+
262
+ // update the checksum for the entry data and backLink
263
+ if (payload .hasArray ()) {
264
+ xxHash64 .update (payload .array (), 0 , size + LOG_ENTRY_BACK_LINK_LEN );
265
+ } else {
266
+ final int mark = payload .position ();
267
+ payload .position (0 );
268
+ final byte buf [] = new byte [size + LOG_ENTRY_BACK_LINK_LEN ];
269
+ payload .get (buf );
270
+ xxHash64 .update (buf , 0 , size + LOG_ENTRY_BACK_LINK_LEN );
271
+ payload .position (mark );
272
+ }
273
+
274
+ // read the entry checksum
275
+ final long checksum = payload .getLong ();
276
+
277
+ // verify the checksum
278
+ final long calculatedChecksum = xxHash64 .getValue ();
279
+ if (checksum != calculatedChecksum ) {
280
+ throw new LogException ("Checksum mismatch whilst reading log entry. read=" + checksum + " calculated=" + calculatedChecksum );
281
+ }
282
+
238
283
return loggable ;
239
284
} catch (final IOException e ) {
240
285
throw new LogException (e .getMessage (), e );
0 commit comments