@@ -4,9 +4,9 @@ pub mod models;
44pub mod services;
55pub mod storage;
66pub mod traits;
7+ pub mod worker_name_store;
78
89use crate :: config:: SETTINGS ;
9- use crate :: traits:: ShareData ;
1010use log:: info;
1111use std:: sync:: Arc ;
1212use tokio:: sync:: { mpsc:: { self , error:: TrySendError } , Mutex } ;
@@ -18,6 +18,8 @@ use crate::traits::ShareStorage;
1818use crate :: storage:: clickhouse:: ClickhouseBlockStorage ;
1919use crate :: models:: BlockFound ;
2020use std:: time:: Instant ;
21+ use serde:: Serialize ;
22+ use serde:: de:: DeserializeOwned ;
2123
2224lazy_static ! {
2325 static ref GLOBAL_LOGGER : ShareLogger <ShareLog > = {
@@ -45,18 +47,18 @@ pub fn log_block(block: BlockFound) {
4547 BLOCK_LOGGER . log_share ( block) ;
4648}
4749
48- pub struct ShareLogger < T : ShareData > {
50+ pub struct ShareLogger < T : Send + Sync + Clone + Serialize + DeserializeOwned > {
4951 primary_tx : mpsc:: Sender < T > ,
5052 backup_tx : mpsc:: UnboundedSender < T > ,
5153}
5254
53- pub struct ShareLoggerBuilder < T : ShareData > {
55+ pub struct ShareLoggerBuilder < T : Send + Sync + Clone + Serialize + DeserializeOwned > {
5456 storage : Arc < Mutex < Box < dyn ShareStorage < T > > > > ,
5557 primary_channel_size : Option < usize > ,
5658 backup_check_interval : Option < Duration > ,
5759}
5860
59- impl < T : ShareData + ' static > ShareLoggerBuilder < T > {
61+ impl < T : Send + Sync + Clone + Serialize + DeserializeOwned + ' static > ShareLoggerBuilder < T > {
6062
6163 pub fn new ( storage : Box < dyn ShareStorage < T > > ) -> Self {
6264 Self {
@@ -103,7 +105,7 @@ impl<T: ShareData + 'static> ShareLoggerBuilder<T> {
103105 }
104106}
105107
106- impl < T : ShareData + ' static > ShareLogger < T > {
108+ impl < T : Send + Sync + Clone + Serialize + DeserializeOwned + ' static > ShareLogger < T > {
107109 pub fn log_share ( & self , share : T ) {
108110 match self . primary_tx . try_send ( share. clone ( ) ) {
109111 Ok ( _) => ( ) ,
@@ -116,7 +118,7 @@ impl<T: ShareData + 'static> ShareLogger<T> {
116118 }
117119}
118120
119- async fn process_shares < T : ShareData > (
121+ async fn process_shares < T : Send + Sync + Clone + Serialize + DeserializeOwned > (
120122 mut primary_rx : mpsc:: Receiver < T > ,
121123 mut backup_rx : mpsc:: UnboundedReceiver < T > ,
122124 storage : Arc < Mutex < Box < dyn ShareStorage < T > > > > ,
@@ -129,36 +131,20 @@ async fn process_shares<T: ShareData>(
129131 }
130132 let init_duration = init_start. elapsed ( ) ;
131133 info ! ( "Storage initialized in: {:?}" , init_duration) ;
132-
133134 let mut backup_interval = tokio:: time:: interval ( backup_check_interval) ;
134-
135135 loop {
136136 tokio:: select! {
137137 Some ( share) = primary_rx. recv( ) => {
138138 info!( "Processing share from primary channel" ) ;
139-
140- if share. is_block_found( ) {
141- if let Err ( e) = storage. lock( ) . await . store_share( share) . await {
142- info!( "Failed to store block: {}" , e) ;
143- }
144- } else {
145- if let Err ( e) = storage. lock( ) . await . store_share( share) . await {
146- info!( "Failed to store share: {}" , e) ;
147- }
139+ if let Err ( e) = storage. lock( ) . await . store_share( share) . await {
140+ info!( "Failed to store share: {}" , e) ;
148141 }
149142 }
150143 _ = backup_interval. tick( ) => {
151144 let mut backup_shares = Vec :: new( ) ;
152145 while let Ok ( share) = backup_rx. try_recv( ) {
153- if share. is_block_found( ) {
154- if let Err ( e) = storage. lock( ) . await . store_share( share) . await {
155- info!( "Failed to store backup block: {}" , e) ;
156- }
157- } else {
158- backup_shares. push( share) ;
159- }
146+ backup_shares. push( share) ;
160147 }
161-
162148 if !backup_shares. is_empty( ) {
163149 if let Err ( e) = storage. lock( ) . await . store_batch( backup_shares) . await {
164150 info!( "Failed to store backup shares: {}" , e) ;
0 commit comments