1
1
use std:: collections:: BTreeMap ;
2
- use std:: io:: BufWriter ;
2
+ use std:: hash:: Hasher ;
3
+ use std:: io:: { BufWriter , ErrorKind , Write } ;
3
4
use std:: mem:: size_of;
4
5
use std:: ops:: Deref ;
5
6
use std:: path:: { Path , PathBuf } ;
@@ -15,6 +16,7 @@ use crate::error::Result;
15
16
use crate :: io:: buf:: { IoBufMut , ZeroCopyBuf } ;
16
17
use crate :: io:: file:: { BufCopy , FileExt } ;
17
18
use crate :: LIBSQL_MAGIC ;
19
+ use crate :: io:: Inspect ;
18
20
19
21
use super :: compacted:: { CompactedSegmentDataFooter , CompactedSegmentDataHeader } ;
20
22
use super :: { frame_offset, page_offset, Frame , FrameHeader , Segment , SegmentHeader , SegmentFlags } ;
@@ -198,7 +200,8 @@ impl<F: FileExt> SealedSegment<F> {
198
200
199
201
// This happens in case of crash: the segment is not empty, but it wasn't sealed. We need to
200
202
// recover the index, and seal the segment.
201
- if index_offset == 0 {
203
+ if !header. flags ( ) . contains ( SegmentFlags :: SEALED ) {
204
+ assert_eq ! ( header. index_offset. get( ) , 0 ) ;
202
205
return Self :: recover ( file, path, header) . map ( Some ) ;
203
206
}
204
207
@@ -218,32 +221,79 @@ impl<F: FileExt> SealedSegment<F> {
218
221
}
219
222
220
223
fn recover ( file : Arc < F > , path : PathBuf , mut header : SegmentHeader ) -> Result < Self > {
224
+ assert ! ( !header. is_empty( ) ) ;
225
+ assert_eq ! ( header. index_size. get( ) , 0 ) ;
226
+ assert_eq ! ( header. index_offset. get( ) , 0 ) ;
227
+ assert ! ( !header. flags( ) . contains( SegmentFlags :: SEALED ) ) ;
228
+ // recovery for replica log should take a different path (i.e: resync with primary)
229
+ assert ! ( !header. flags( ) . contains( SegmentFlags :: FRAME_UNORDERED ) ) ;
230
+
231
+ let mut current_checksum = header. salt . get ( ) ;
221
232
tracing:: trace!( "recovering unsealed segment at {path:?}" ) ;
222
233
let mut index = BTreeMap :: new ( ) ;
223
- assert ! ( !header. is_empty( ) ) ;
224
- let mut frame_header = FrameHeader :: new_zeroed ( ) ;
225
- for i in 0 ..header. count_committed ( ) {
226
- let offset = frame_offset ( i as u32 ) ;
227
- file. read_exact_at ( frame_header. as_bytes_mut ( ) , offset) ?;
228
- index. insert ( frame_header. page_no . get ( ) , i as u32 ) ;
234
+ let mut frame: Box < CheckedFrame > = CheckedFrame :: new_box_zeroed ( ) ;
235
+ let mut current_tx = Vec :: new ( ) ;
236
+ let mut last_committed = 0 ;
237
+ let mut size_after = 0 ;
238
+ let mut frame_count = 0 ;
239
+ for i in 0 .. {
240
+ let offset = checked_frame_offset ( i as u32 ) ;
241
+ match file. read_exact_at ( frame. as_bytes_mut ( ) , offset) {
242
+ Ok ( _) => {
243
+ let new_checksum = frame. frame . checksum ( current_checksum) ;
244
+ // this is the first checksum that don't match the checksum chain, drop the
245
+ // transaction and any frame after that.
246
+ if new_checksum != frame. checksum . get ( ) {
247
+ tracing:: warn!(
248
+ "found invalid checksum in segment, dropping {} frames" ,
249
+ header. last_committed( ) - last_committed
250
+ ) ;
251
+ break ;
252
+ }
253
+ current_checksum = new_checksum;
254
+ frame_count += 1 ;
255
+
256
+ current_tx. push ( frame. frame . header ( ) . page_no ( ) ) ;
257
+ if frame. frame . header . is_commit ( ) {
258
+ last_committed = frame. frame . header ( ) . frame_no ( ) ;
259
+ size_after = frame. frame . header ( ) . size_after ( ) ;
260
+ let base_offset = ( i + 1 ) - current_tx. len ( ) ;
261
+ for ( frame_offset, page_no) in current_tx. drain ( ..) . enumerate ( ) {
262
+ index. insert ( page_no, ( base_offset + frame_offset) as u32 ) ;
263
+ }
264
+ }
265
+ }
266
+ Err ( e) if e. kind ( ) == ErrorKind :: UnexpectedEof => break ,
267
+ Err ( e) => return Err ( e. into ( ) ) ,
268
+ }
229
269
}
230
270
231
- let index_offset = header . count_committed ( ) as u32 ;
232
- let index_byte_offset = frame_offset ( index_offset) ;
271
+ let index_offset = frame_count as u32 ;
272
+ let index_byte_offset = checked_frame_offset ( index_offset) ;
233
273
let cursor = file. cursor ( index_byte_offset) ;
234
274
let writer = BufCopy :: new ( cursor) ;
235
- let mut writer = BufWriter :: new ( writer) ;
275
+ let writer = BufWriter :: new ( writer) ;
276
+ let mut digest = crc32fast:: Hasher :: new_with_initial ( current_checksum) ;
277
+ let mut writer = Inspect :: new ( writer, |data : & [ u8 ] | {
278
+ digest. write ( data) ;
279
+ } ) ;
236
280
let mut builder = MapBuilder :: new ( & mut writer) ?;
237
281
for ( k, v) in index. into_iter ( ) {
238
282
builder. insert ( k. to_be_bytes ( ) , v as u64 ) . unwrap ( ) ;
239
283
}
240
284
builder. finish ( ) . unwrap ( ) ;
241
- let ( cursor, index_bytes) = writer
285
+ let mut writer = writer. into_inner ( ) ;
286
+ let index_size = writer. get_ref ( ) . get_ref ( ) . count ( ) ;
287
+ let index_checksum = digest. finalize ( ) ;
288
+ writer. write_all ( & index_checksum. to_le_bytes ( ) ) ?;
289
+ let ( _, index_bytes) = writer
242
290
. into_inner ( )
243
291
. map_err ( |e| e. into_parts ( ) . 0 ) ?
244
292
. into_parts ( ) ;
245
293
header. index_offset = index_byte_offset. into ( ) ;
246
- header. index_size = cursor. count ( ) . into ( ) ;
294
+ header. index_size = index_size. into ( ) ;
295
+ header. last_commited_frame_no = last_committed. into ( ) ;
296
+ header. size_after = size_after. into ( ) ;
247
297
header. recompute_checksum ( ) ;
248
298
file. write_all_at ( header. as_bytes ( ) , 0 ) ?;
249
299
let index = Map :: new ( index_bytes. into ( ) ) . unwrap ( ) ;
0 commit comments