1
- use crate :: p2p:: { BroadcastMessage , Peer , PeerEvent } ;
1
+ use std:: { path:: Path , sync:: Arc } ;
2
+
2
3
use anyhow:: anyhow;
3
4
use libp2p:: Multiaddr ;
5
+ use mithril_dmq:: { DmqConsumerServer , DmqConsumerServerPallas , DmqMessage } ;
6
+ use reqwest:: StatusCode ;
7
+ use slog:: { Logger , error, info} ;
8
+ use tokio:: sync:: {
9
+ mpsc:: { UnboundedReceiver , UnboundedSender , unbounded_channel} ,
10
+ watch:: { self , Receiver } ,
11
+ } ;
12
+
4
13
use mithril_common:: {
5
- StdResult ,
14
+ CardanoNetwork , StdResult ,
6
15
logging:: LoggerExtensions ,
7
16
messages:: { RegisterSignatureMessageHttp , RegisterSignerMessage } ,
8
17
} ;
9
- use reqwest :: StatusCode ;
10
- use slog :: { Logger , error , info } ;
18
+
19
+ use crate :: p2p :: { BroadcastMessage , Peer , PeerEvent } ;
11
20
12
21
/// A relay for a Mithril aggregator
13
22
pub struct AggregatorRelay {
14
23
aggregator_endpoint : String ,
15
24
peer : Peer ,
25
+ signature_dmq_tx : UnboundedSender < DmqMessage > ,
16
26
logger : Logger ,
17
27
}
18
28
19
29
impl AggregatorRelay {
20
30
/// Start a relay for a Mithril aggregator
21
31
pub async fn start (
22
32
addr : & Multiaddr ,
33
+ dmq_node_socket_path : & Path ,
34
+ cardano_network : & CardanoNetwork ,
23
35
aggregator_endpoint : & str ,
24
36
logger : & Logger ,
25
37
) -> StdResult < Self > {
38
+ let ( _stop_tx, stop_rx) = watch:: channel ( ( ) ) ;
39
+ let ( signature_dmq_tx, signature_dmq_rx) = unbounded_channel :: < DmqMessage > ( ) ;
40
+ let _dmq_consumer_server = Self :: start_dmq_consumer_server (
41
+ dmq_node_socket_path,
42
+ cardano_network,
43
+ signature_dmq_rx,
44
+ stop_rx,
45
+ logger. clone ( ) ,
46
+ )
47
+ . await ?;
48
+
26
49
Ok ( Self {
27
50
aggregator_endpoint : aggregator_endpoint. to_owned ( ) ,
28
51
peer : Peer :: new ( addr) . with_logger ( logger) . start ( ) . await ?,
52
+ signature_dmq_tx,
29
53
logger : logger. new_with_component_name :: < Self > ( ) ,
30
54
} )
31
55
}
32
56
57
+ async fn start_dmq_consumer_server (
58
+ socket : & Path ,
59
+ cardano_network : & CardanoNetwork ,
60
+ signature_dmq_rx : UnboundedReceiver < DmqMessage > ,
61
+ stop_rx : Receiver < ( ) > ,
62
+ logger : Logger ,
63
+ ) -> StdResult < Arc < DmqConsumerServerPallas > > {
64
+ let dmq_consumer_server = Arc :: new ( DmqConsumerServerPallas :: new (
65
+ socket. to_path_buf ( ) ,
66
+ cardano_network. to_owned ( ) ,
67
+ stop_rx,
68
+ logger. clone ( ) ,
69
+ ) ) ;
70
+ dmq_consumer_server. register_receiver ( signature_dmq_rx) . await ?;
71
+ let dmq_consumer_server_clone = dmq_consumer_server. clone ( ) ;
72
+ tokio:: spawn ( async move {
73
+ if let Err ( err) = dmq_consumer_server_clone. run ( ) . await {
74
+ error ! ( logger. to_owned( ) , "DMQ Consumer server failed" ; "error" => ?err) ;
75
+ }
76
+ } ) ;
77
+
78
+ Ok ( dmq_consumer_server)
79
+ }
80
+
33
81
async fn notify_signature_to_aggregator (
34
82
& self ,
35
83
signature_message : & RegisterSignatureMessageHttp ,
@@ -100,7 +148,7 @@ impl AggregatorRelay {
100
148
pub async fn tick ( & mut self ) -> StdResult < ( ) > {
101
149
if let Some ( peer_event) = self . peer . tick_swarm ( ) . await ? {
102
150
match self . peer . convert_peer_event_to_message ( peer_event) {
103
- Ok ( Some ( BroadcastMessage :: RegisterSigner ( signer_message_received) ) ) => {
151
+ Ok ( Some ( BroadcastMessage :: RegisterSignerHttp ( signer_message_received) ) ) => {
104
152
let retry_max = 3 ;
105
153
let mut retry_count = 0 ;
106
154
while let Err ( e) =
@@ -113,7 +161,7 @@ impl AggregatorRelay {
113
161
}
114
162
}
115
163
}
116
- Ok ( Some ( BroadcastMessage :: RegisterSignature ( signature_message_received) ) ) => {
164
+ Ok ( Some ( BroadcastMessage :: RegisterSignatureHttp ( signature_message_received) ) ) => {
117
165
let retry_max = 3 ;
118
166
let mut retry_count = 0 ;
119
167
while let Err ( e) =
@@ -126,6 +174,11 @@ impl AggregatorRelay {
126
174
}
127
175
}
128
176
}
177
+ Ok ( Some ( BroadcastMessage :: RegisterSignatureDmq ( signature_message_received) ) ) => {
178
+ self . signature_dmq_tx . send ( signature_message_received) . map_err ( |e| {
179
+ anyhow ! ( "Failed to send signature message to DMQ consumer server: {e}" )
180
+ } ) ?;
181
+ }
129
182
Ok ( None ) => { }
130
183
Err ( e) => return Err ( e) ,
131
184
}
@@ -179,9 +232,15 @@ mod tests {
179
232
then. status ( 201 ) . body ( "ok" ) ;
180
233
} ) ;
181
234
let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0" . parse ( ) . unwrap ( ) ;
182
- let relay = AggregatorRelay :: start ( & addr, & server. url ( "" ) , & TestLogger :: stdout ( ) )
183
- . await
184
- . unwrap ( ) ;
235
+ let relay = AggregatorRelay :: start (
236
+ & addr,
237
+ & Path :: new ( "test" ) ,
238
+ & CardanoNetwork :: TestNet ( 123 ) ,
239
+ & server. url ( "" ) ,
240
+ & TestLogger :: stdout ( ) ,
241
+ )
242
+ . await
243
+ . unwrap ( ) ;
185
244
186
245
relay
187
246
. notify_signature_to_aggregator ( & RegisterSignatureMessageHttp :: dummy ( ) )
0 commit comments