@@ -41,6 +41,7 @@ use log::debug;
41
41
use log:: info;
42
42
use maplit:: btreeset;
43
43
use pretty_assertions:: assert_eq;
44
+ use raft_log:: DumpApi ;
44
45
use test_harness:: test;
45
46
46
47
use crate :: testing:: meta_service_test_harness;
@@ -67,6 +68,70 @@ async fn test_impl_raft_storage() -> anyhow::Result<()> {
67
68
Ok ( ( ) )
68
69
}
69
70
71
+ /// Ensure purged logs to be removed from the cache
72
+ #[ test( harness = meta_service_test_harness) ]
73
+ #[ fastrace:: trace]
74
+ async fn test_meta_store_purge_cache ( ) -> anyhow:: Result < ( ) > {
75
+ let id = 3 ;
76
+ let mut tc = MetaSrvTestContext :: new ( id) ;
77
+ tc. config . raft_config . log_cache_max_items = 100 ;
78
+ // Build with small chunk, because all entries in the last open chunk will be cached.
79
+ tc. config . raft_config . log_wal_chunk_max_records = 5 ;
80
+
81
+ {
82
+ let mut sto = RaftStore :: open ( & tc. config . raft_config ) . await ?;
83
+
84
+ sto. save_vote ( & Vote :: new ( 10 , 5 ) ) . await ?;
85
+
86
+ sto. blocking_append ( [
87
+ Entry :: new_blank ( log_id ( 1 , 2 , 1 ) ) ,
88
+ Entry :: new_blank ( log_id ( 1 , 2 , 2 ) ) ,
89
+ Entry :: new_blank ( log_id ( 1 , 2 , 3 ) ) ,
90
+ Entry :: new_blank ( log_id ( 1 , 2 , 4 ) ) ,
91
+ Entry :: new_blank ( log_id ( 1 , 2 , 5 ) ) ,
92
+ ] )
93
+ . await ?;
94
+
95
+ let stat = sto. log . read ( ) . await . stat ( ) ;
96
+ assert_eq ! ( stat. payload_cache_item_count, 5 ) ;
97
+
98
+ {
99
+ let r = sto. log . read ( ) . await ;
100
+ let got = r. dump ( ) . write_to_string ( ) ?;
101
+ println ! ( "dump: {}" , got) ;
102
+ let want_dumped = r#"RaftLog:
103
+ ChunkId(00_000_000_000_000_000_000)
104
+ R-00000: [000_000_000, 000_000_018) Size(18): State(RaftLogState { vote: None, last: None, committed: None, purged: None, user_data: None })
105
+ R-00001: [000_000_018, 000_000_046) Size(28): State(RaftLogState { vote: None, last: None, committed: None, purged: None, user_data: Some(LogStoreMeta { node_id: Some(3) }) })
106
+ R-00002: [000_000_046, 000_000_096) Size(50): SaveVote(Cw(Vote { leader_id: LeaderId { term: 10, node_id: 5 }, committed: false }))
107
+ R-00003: [000_000_096, 000_000_148) Size(52): Append(Cw(LogId { leader_id: LeaderId { term: 1, node_id: 2 }, index: 1 }), Cw(blank))
108
+ R-00004: [000_000_148, 000_000_200) Size(52): Append(Cw(LogId { leader_id: LeaderId { term: 1, node_id: 2 }, index: 2 }), Cw(blank))
109
+ ChunkId(00_000_000_000_000_000_200)
110
+ R-00000: [000_000_000, 000_000_100) Size(100): State(RaftLogState { vote: Some(Cw(Vote { leader_id: LeaderId { term: 10, node_id: 5 }, committed: false })), last: Some(Cw(LogId { leader_id: LeaderId { term: 1, node_id: 2 }, index: 2 })), committed: None, purged: None, user_data: Some(LogStoreMeta { node_id: Some(3) }) })
111
+ R-00001: [000_000_100, 000_000_152) Size(52): Append(Cw(LogId { leader_id: LeaderId { term: 1, node_id: 2 }, index: 3 }), Cw(blank))
112
+ R-00002: [000_000_152, 000_000_204) Size(52): Append(Cw(LogId { leader_id: LeaderId { term: 1, node_id: 2 }, index: 4 }), Cw(blank))
113
+ R-00003: [000_000_204, 000_000_256) Size(52): Append(Cw(LogId { leader_id: LeaderId { term: 1, node_id: 2 }, index: 5 }), Cw(blank))
114
+ "# ;
115
+ assert_eq ! ( want_dumped, got) ;
116
+ }
117
+
118
+ // When purging up to index=4, all entries in the last open chunk will still be cached.
119
+ // All previous entries are purge, although the cache is not full.
120
+
121
+ sto. purge ( log_id ( 1 , 2 , 4 ) ) . await ?;
122
+
123
+ let r = sto. log . read ( ) . await ;
124
+ let got = r. dump ( ) . write_to_string ( ) ?;
125
+ println ! ( "dump: {}" , got) ;
126
+
127
+ let stat = sto. log . read ( ) . await . stat ( ) ;
128
+ println ! ( "stat: {:#}" , stat) ;
129
+ assert_eq ! ( stat. payload_cache_item_count, 3 ) ;
130
+ }
131
+
132
+ Ok ( ( ) )
133
+ }
134
+
70
135
#[ test( harness = meta_service_test_harness) ]
71
136
#[ fastrace:: trace]
72
137
async fn test_meta_store_restart ( ) -> anyhow:: Result < ( ) > {
0 commit comments