@@ -27,8 +27,8 @@ const CMD_COMPACT: u8 = 0x02;
2727
2828const DEFAULT_LOG_ITEM_BATCH_CAP : usize = 64 ;
2929const MAX_LOG_BATCH_BUFFER_CAP : usize = 8 * 1024 * 1024 ;
30- // 2GiB, The maximum content length accepted by lz4 compression.
31- const MAX_LOG_ENTRIES_SIZE_PER_BATCH : usize = i32 :: MAX as usize ;
30+ // LZ4_MAX_INPUT_SIZE = 0x7E000000
31+ const MAX_LOG_ENTRIES_SIZE_PER_BATCH : usize = 0x7E000000 ;
3232
3333/// `MessageExt` trait allows for probing log index from a specific type of
3434/// protobuf messages.
@@ -421,15 +421,15 @@ impl LogItemBatch {
421421
422422 #[ inline]
423423 pub fn pop_save_point ( & mut self ) {
424- self . save_points . pop ( ) ;
424+ self . save_points . pop ( ) . unwrap ( ) ;
425425 }
426426
427427 #[ inline]
428- pub fn rollback ( & mut self ) {
429- let ( a, b, c) = self . save_points . last ( ) . unwrap ( ) ;
430- self . items . truncate ( * a) ;
431- self . item_size = * b;
432- self . entries_size = * c;
428+ pub fn rollback_to_save_point ( & mut self ) {
429+ let ( a, b, c) = self . save_points . pop ( ) . unwrap ( ) ;
430+ self . items . truncate ( a) ;
431+ self . item_size = b;
432+ self . entries_size = c;
433433 }
434434
435435 pub ( crate ) fn finish_populate ( & mut self , compression_type : CompressionType ) {
@@ -675,15 +675,16 @@ impl LogBatch {
675675 #[ inline]
676676 pub fn pop_save_point ( & mut self ) {
677677 assert ! ( self . buf_state == BufState :: Open ) ;
678+ self . item_batch . pop_save_point ( ) ;
678679 self . save_points . pop ( ) ;
679680 }
680681
681682 /// No-op if there's no save point to rollback to.
682683 #[ inline]
683- pub fn rollback ( & mut self ) {
684+ pub fn rollback_to_save_point ( & mut self ) {
684685 assert ! ( self . buf_state == BufState :: Open ) ;
685- self . item_batch . rollback ( ) ;
686- self . buf . truncate ( * self . save_points . last ( ) . unwrap ( ) ) ;
686+ self . item_batch . rollback_to_save_point ( ) ;
687+ self . buf . truncate ( self . save_points . pop ( ) . unwrap ( ) ) ;
687688 }
688689
689690 /// Adds some protobuf log entries into the log batch.
@@ -1001,7 +1002,9 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option<u32>) -> Result<
10011002mod tests {
10021003 use super :: * ;
10031004 use crate :: pipe_log:: { LogQueue , Version } ;
1004- use crate :: test_util:: { catch_unwind_silent, generate_entries, generate_entry_indexes_opt} ;
1005+ use crate :: test_util:: {
1006+ catch_unwind_silent, generate_entries, generate_entry_indexes_opt, PanicGuard ,
1007+ } ;
10051008 use protobuf:: parse_from_bytes;
10061009 use raft:: eraftpb:: Entry ;
10071010 use strum:: IntoEnumIterator ;
@@ -1438,6 +1441,57 @@ mod tests {
14381441
14391442 #[ test]
14401443 fn test_save_point ( ) {
1444+ let ops = [
1445+ |b : & mut LogBatch | {
1446+ b. add_entries :: < Entry > ( 1 , & generate_entries ( 1 , 11 , None ) )
1447+ . unwrap ( )
1448+ } ,
1449+ |b : & mut LogBatch | {
1450+ b. add_entries :: < Entry > ( 7 , & generate_entries ( 1 , 11 , Some ( & vec ! [ b'x' ; 1024 ] ) ) )
1451+ . unwrap ( )
1452+ } ,
1453+ |b : & mut LogBatch | b. add_command ( 17 , Command :: Clean ) ,
1454+ |b : & mut LogBatch | b. put ( 27 , b"key27" . to_vec ( ) , b"value27" . to_vec ( ) ) ,
1455+ |b : & mut LogBatch | b. delete ( 37 , b"key37" . to_vec ( ) ) ,
1456+ |b : & mut LogBatch | b. add_command ( 47 , Command :: Compact { index : 777 } ) ,
1457+ |b : & mut LogBatch | {
1458+ b. add_entries :: < Entry > ( 57 , & generate_entries ( 1 , 51 , None ) )
1459+ . unwrap ( )
1460+ } ,
1461+ ] ;
1462+ for start in 0 ..ops. len ( ) {
1463+ for stripe in 1 ..=5 {
1464+ for num in ops. len ( ) ..ops. len ( ) * 5 {
1465+ for repeat in 1 ..=5 {
1466+ let _guard = PanicGuard :: with_prompt ( format ! (
1467+ "case: [{}, {}, {}, {}]" ,
1468+ start, stripe, num, repeat
1469+ ) ) ;
1470+ let mut to_verify = Vec :: new ( ) ;
1471+ let mut batch = LogBatch :: default ( ) ;
1472+ let mut op_idx = start;
1473+ let mut total_op = 0 ;
1474+ for _ in 0 ..num {
1475+ for _ in 0 ..repeat {
1476+ ops[ op_idx % ops. len ( ) ] ( & mut batch) ;
1477+ to_verify. push ( batch. clone ( ) ) ;
1478+ batch. set_save_point ( ) ;
1479+ total_op += 1 ;
1480+ if total_op % 5 == 0 {
1481+ to_verify. pop ( ) . unwrap ( ) ;
1482+ batch. pop_save_point ( ) ;
1483+ }
1484+ }
1485+ op_idx += stripe;
1486+ }
1487+ while let Some ( b) = to_verify. pop ( ) {
1488+ batch. rollback_to_save_point ( ) ;
1489+ assert_eq ! ( batch, b) ;
1490+ }
1491+ }
1492+ }
1493+ }
1494+ }
14411495 }
14421496
14431497 #[ cfg( feature = "nightly" ) ]
0 commit comments