3333 } ,
3434 std:: {
3535 borrow:: Cow ,
36- collections:: { BTreeMap , HashMap } ,
36+ collections:: { BTreeMap , BTreeSet , HashMap } ,
3737 sync:: {
3838 atomic:: { AtomicBool , Ordering } ,
3939 Arc ,
@@ -783,12 +783,19 @@ impl SolanaRpc {
783783 hash,
784784 time,
785785 height,
786- parent_slot : _parent_slot ,
786+ parent_slot,
787787 parent_hash : _parent_hash,
788788 transactions,
789789 } => {
790- let info =
791- StreamsSlotInfo :: new ( leader, slot, hash, time, height, transactions) ;
790+ let info = StreamsSlotInfo :: new (
791+ leader,
792+ slot,
793+ parent_slot,
794+ hash,
795+ time,
796+ height,
797+ transactions,
798+ ) ;
792799 let _ = streams_tx. send ( Arc :: new ( StreamsUpdateMessage :: Slot { info } ) ) ;
793800 }
794801 } ,
@@ -832,13 +839,13 @@ impl SolanaRpc {
832839 hash,
833840 time,
834841 height,
835- parent_slot: _parent_slot ,
842+ parent_slot,
836843 parent_hash: _parent_hash,
837844 transactions,
838845 } => {
839- latest_blockhash_storage. push_block( slot, height, hash) ;
846+ latest_blockhash_storage. push_block( slot, parent_slot , height, hash) ;
840847
841- let info = StreamsSlotInfo :: new( leader, slot, hash, time, height, transactions) ;
848+ let info = StreamsSlotInfo :: new( leader, slot, parent_slot , hash, time, height, transactions) ;
842849 slots_info. insert( slot, info. clone( ) ) ;
843850 while slots_info. len( ) > MAX_NUM_RECENT_SLOT_INFO {
844851 slots_info. pop_first( ) ;
@@ -923,37 +930,17 @@ impl SolanaRpc {
923930 SolanaRpc :: internal_error_with_data ( "no slot" ) ,
924931 ) ;
925932 } ;
926- if rollback > 0 {
927- let Some ( first_available_slot) =
928- latest_blockhash_storage. slots . keys ( ) . next ( ) . copied ( )
933+ for _ in 0 ..rollback {
934+ let Some ( parent_value) = latest_blockhash_storage. slots . get ( & value. parent )
929935 else {
930936 return Self :: create_failure (
931937 jsonrpc,
932938 id,
933- JsonrpcError :: invalid_params ( "empty slots storage" ) ,
939+ JsonrpcError :: invalid_params ( "not enought slots in the storage" ) ,
934940 ) ;
935941 } ;
936-
937- for _ in 0 ..rollback {
938- loop {
939- slot -= 1 ;
940-
941- if let Some ( prev_value) = latest_blockhash_storage. slots . get ( & slot) {
942- if prev_value. height + 1 == value. height {
943- value = prev_value;
944- break ;
945- }
946- } else if slot < first_available_slot {
947- return Self :: create_failure (
948- jsonrpc,
949- id,
950- JsonrpcError :: invalid_params (
951- "not enought slots in the storage" ,
952- ) ,
953- ) ;
954- }
955- }
956- }
942+ slot = value. parent ;
943+ value = parent_value;
957944 }
958945
959946 Self :: create_success2 (
@@ -1154,60 +1141,138 @@ enum RpcRequest {
11541141
11551142#[ derive( Debug , Default ) ]
11561143struct LatestBlockhashStorage {
1157- slots : BTreeMap < Slot , LatestBlockhashSlot > ,
1158- finalized_total : usize ,
11591144 slot_processed : Slot ,
11601145 slot_confirmed : Slot ,
11611146 slot_finalized : Slot ,
1147+ confirmed : BTreeSet < Slot > ,
1148+ finalized : BTreeSet < Slot > ,
1149+ slots : BTreeMap < Slot , LatestBlockhashSlot > ,
11621150}
11631151
11641152impl LatestBlockhashStorage {
1165- fn push_block ( & mut self , slot : Slot , height : Slot , hash : Hash ) {
1153+ fn update_slots ( & mut self ) {
1154+ for ( slot, entry) in self . slots . iter ( ) . rev ( ) {
1155+ match entry. commitment {
1156+ CommitmentLevel :: Processed => {
1157+ self . slot_processed = self . slot_processed . max ( * slot) ;
1158+ }
1159+ CommitmentLevel :: Confirmed => {
1160+ self . slot_confirmed = self . slot_confirmed . max ( * slot) ;
1161+ }
1162+ CommitmentLevel :: Finalized => {
1163+ self . slot_finalized = self . slot_finalized . max ( * slot) ;
1164+ break ;
1165+ }
1166+ }
1167+ }
1168+
1169+ // in case of epoch change with a lot of forks we can receive confirmed directly
1170+ // and processed would be behind, so we take `max(processed, confirmed)`
1171+ self . slot_processed = self . slot_processed . max ( self . slot_confirmed ) ;
1172+ }
1173+
1174+ fn push_block ( & mut self , slot : Slot , parent : Slot , height : Slot , hash : Hash ) {
1175+ // in case if slot status was received before block
1176+ let mut commitment = CommitmentLevel :: Processed ;
1177+ if self . confirmed . contains ( & slot) {
1178+ commitment = CommitmentLevel :: Confirmed ;
1179+ }
1180+ if self . finalized . contains ( & slot) {
1181+ commitment = CommitmentLevel :: Finalized ;
1182+ }
11661183 self . slots . insert (
11671184 slot,
11681185 LatestBlockhashSlot {
1169- hash,
1186+ commitment,
1187+ parent,
11701188 height,
1171- commitment : CommitmentLevel :: Processed ,
1189+ hash ,
11721190 } ,
11731191 ) ;
1192+
1193+ // keep slots under the limit (based only on finalized count)
1194+ let slots_to_remove = self
1195+ . slots
1196+ . values ( )
1197+ . filter ( |entry| entry. commitment == CommitmentLevel :: Finalized )
1198+ . count ( )
1199+ . checked_sub ( MAX_PROCESSING_AGE + 10 )
1200+ . unwrap_or_default ( ) ;
1201+ for _ in 0 ..slots_to_remove {
1202+ while let Some ( ( _slot, value) ) = self . slots . pop_first ( ) {
1203+ if value. commitment == CommitmentLevel :: Finalized {
1204+ break ;
1205+ }
1206+ }
1207+ }
1208+
1209+ // update tips
1210+ self . update_slots ( ) ;
11741211 }
11751212
11761213 fn update_commitment ( & mut self , slot : Slot , commitment : CommitmentLevel ) {
1177- if let Some ( value) = self . slots . get_mut ( & slot) {
1178- value. commitment = commitment;
1179-
1180- if commitment == CommitmentLevel :: Processed && slot > self . slot_processed {
1181- self . slot_processed = slot;
1182- } else if commitment == CommitmentLevel :: Confirmed {
1183- self . slot_confirmed = slot;
1184- } else if commitment == CommitmentLevel :: Finalized {
1185- self . finalized_total += 1 ;
1186- self . slot_finalized = slot;
1187- }
1214+ // save commitment
1215+ if commitment == CommitmentLevel :: Confirmed {
1216+ self . confirmed . insert ( slot) ;
1217+ } else if commitment == CommitmentLevel :: Finalized {
1218+ self . confirmed . insert ( slot) ;
1219+ self . finalized . insert ( slot) ;
1220+ }
1221+ while self . confirmed . len ( ) > MAX_PROCESSING_AGE {
1222+ self . confirmed . pop_first ( ) ;
1223+ }
1224+ while self . finalized . len ( ) > MAX_PROCESSING_AGE {
1225+ self . finalized . pop_first ( ) ;
11881226 }
11891227
1190- while self . finalized_total > MAX_PROCESSING_AGE + 10 {
1191- if let Some ( ( _slot, value) ) = self . slots . pop_first ( ) {
1192- if value. commitment == CommitmentLevel :: Finalized {
1193- self . finalized_total -= 1 ;
1228+ // update current and all slots before
1229+ if let Some ( value) = self . slots . get_mut ( & slot) {
1230+ value. commitment = value. commitment . max ( commitment) ;
1231+
1232+ let mut parent_slot = value. parent ;
1233+ if commitment == CommitmentLevel :: Confirmed {
1234+ loop {
1235+ if let Some ( value) = self . slots . get_mut ( & parent_slot) {
1236+ if value. commitment == CommitmentLevel :: Processed {
1237+ value. commitment = CommitmentLevel :: Confirmed ;
1238+ parent_slot = value. parent ;
1239+ continue ;
1240+ }
1241+ }
1242+ break ;
1243+ }
1244+ } else if commitment == CommitmentLevel :: Finalized {
1245+ loop {
1246+ if let Some ( value) = self . slots . get_mut ( & parent_slot) {
1247+ if value. commitment != CommitmentLevel :: Finalized {
1248+ value. commitment = CommitmentLevel :: Finalized ;
1249+ parent_slot = value. parent ;
1250+ continue ;
1251+ }
1252+ }
1253+ break ;
11941254 }
11951255 }
11961256 }
1257+
1258+ // update tips
1259+ self . update_slots ( ) ;
11971260 }
11981261}
11991262
12001263#[ derive( Debug ) ]
12011264struct LatestBlockhashSlot {
1202- hash : Hash ,
1203- height : Slot ,
12041265 commitment : CommitmentLevel ,
1266+ parent : Slot ,
1267+ height : Slot ,
1268+ hash : Hash ,
12051269}
12061270
12071271#[ derive( Debug , Clone ) ]
12081272struct StreamsSlotInfo {
12091273 leader : Option < Pubkey > ,
12101274 slot : Slot ,
1275+ parent_slot : Slot ,
12111276 commitment : CommitmentLevel ,
12121277 hash : Hash ,
12131278 time : UnixTimestamp ,
@@ -1223,6 +1288,7 @@ impl StreamsSlotInfo {
12231288 fn new (
12241289 leader : Option < Pubkey > ,
12251290 slot : Slot ,
1291+ parent_slot : Slot ,
12261292 hash : Hash ,
12271293 time : UnixTimestamp ,
12281294 height : Slot ,
@@ -1242,6 +1308,7 @@ impl StreamsSlotInfo {
12421308 Self {
12431309 leader,
12441310 slot,
1311+ parent_slot,
12451312 commitment : CommitmentLevel :: Processed ,
12461313 hash,
12471314 time,
@@ -1292,6 +1359,7 @@ impl StreamsSlotInfo {
12921359 SlotsSubscribeOutput :: Slot {
12931360 leader : self . leader . map ( |pk| pk. to_string ( ) ) . unwrap_or_default ( ) ,
12941361 slot : self . slot ,
1362+ parent_slot : self . parent_slot ,
12951363 commitment : self . commitment ,
12961364 hash : self . hash . to_string ( ) ,
12971365 time : self . time ,
@@ -1379,14 +1447,14 @@ enum StreamsUpdateMessage {
13791447 } ,
13801448}
13811449
1382- #[ derive( Debug , Deserialize ) ]
1450+ #[ derive( Debug , Default , Deserialize ) ]
1451+ #[ serde( default ) ]
13831452struct ReqParamsSlotsSubscribe {
1384- #[ serde( default ) ]
13851453 config : Option < ReqParamsSlotsSubscribeConfig > ,
13861454}
13871455
13881456#[ derive( Debug , Default , Deserialize ) ]
1389- #[ serde( rename_all = "camelCase" ) ]
1457+ #[ serde( default , rename_all = "camelCase" ) ]
13901458struct ReqParamsSlotsSubscribeConfig {
13911459 read_write : Vec < String > ,
13921460 read_only : Vec < String > ,
@@ -1467,6 +1535,7 @@ enum SlotsSubscribeOutput {
14671535 Slot {
14681536 leader : String ,
14691537 slot : Slot ,
1538+ parent_slot : Slot ,
14701539 commitment : CommitmentLevel ,
14711540 hash : String ,
14721541 time : UnixTimestamp ,
0 commit comments