@@ -19,10 +19,8 @@ async fn create_fake_msg(bytes: &[u8]) -> DmqMessage {
19
19
let dmq_builder = DmqMessageBuilder :: new (
20
20
{
21
21
let ( kes_signature, operational_certificate) = KesSignerFake :: dummy_signature ( ) ;
22
- let kes_signer = KesSignerFake :: new ( vec ! [
23
- Ok ( ( kes_signature, operational_certificate. clone( ) ) ) ,
24
- Ok ( ( kes_signature, operational_certificate. clone( ) ) ) , // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
25
- ] ) ;
22
+ let kes_signer =
23
+ KesSignerFake :: new ( vec ! [ Ok ( ( kes_signature, operational_certificate. clone( ) ) ) ] ) ;
26
24
27
25
Arc :: new ( kes_signer)
28
26
} ,
@@ -41,6 +39,7 @@ async fn dmq_publisher_client_server() {
41
39
. join ( "node.socket" ) ;
42
40
let ( stop_tx, stop_rx) = watch:: channel ( ( ) ) ;
43
41
42
+ // Start the server
44
43
let ( signature_dmq_tx, signature_dmq_rx) = unbounded_channel :: < DmqMessage > ( ) ;
45
44
let server = tokio:: spawn ( {
46
45
let socket_path = socket_path. clone ( ) ;
@@ -59,6 +58,7 @@ async fn dmq_publisher_client_server() {
59
58
}
60
59
} ) ;
61
60
61
+ // Start a first client, publish messages and wait for its deconnection
62
62
let client = tokio:: spawn ( {
63
63
let socket_path = socket_path. clone ( ) ;
64
64
async move {
@@ -68,8 +68,6 @@ async fn dmq_publisher_client_server() {
68
68
let kes_signer = KesSignerFake :: new ( vec ! [
69
69
Ok ( ( kes_signature, operational_certificate. clone( ) ) ) ,
70
70
Ok ( ( kes_signature, operational_certificate. clone( ) ) ) ,
71
- Ok ( ( kes_signature, operational_certificate. clone( ) ) ) , // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
72
- Ok ( ( kes_signature, operational_certificate. clone( ) ) ) , // TODO: remove this line once the hack of KES signature is removed in DMQ message builder
73
71
] ) ;
74
72
75
73
Arc :: new ( kes_signer)
@@ -94,13 +92,49 @@ async fn dmq_publisher_client_server() {
94
92
. publish_message ( DmqMessageTestPayload :: new ( b"msg_2" ) )
95
93
. await
96
94
. unwrap ( ) ;
95
+ }
96
+ } ) ;
97
+ client. await . unwrap ( ) ;
98
+
99
+ // Sleep to avoid refused connection from the server
100
+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) . await ;
101
+
102
+ // Start a second client and publish messages
103
+ let client = tokio:: spawn ( {
104
+ let socket_path = socket_path. clone ( ) ;
105
+ async move {
106
+ let dmq_builder = DmqMessageBuilder :: new (
107
+ {
108
+ let ( kes_signature, operational_certificate) = KesSignerFake :: dummy_signature ( ) ;
109
+ let kes_signer = KesSignerFake :: new ( vec ! [
110
+ Ok ( ( kes_signature, operational_certificate. clone( ) ) ) ,
111
+ Ok ( ( kes_signature, operational_certificate. clone( ) ) ) ,
112
+ ] ) ;
113
+
114
+ Arc :: new ( kes_signer)
115
+ } ,
116
+ Arc :: new ( FakeChainObserver :: default ( ) ) ,
117
+ )
118
+ . set_ttl ( 100 ) ;
119
+ let publisher_client = DmqPublisherClientPallas :: < DmqMessageTestPayload > :: new (
120
+ socket_path,
121
+ cardano_network,
122
+ dmq_builder,
123
+ slog_scope:: logger ( ) ,
124
+ ) ;
125
+
126
+ publisher_client
127
+ . publish_message ( DmqMessageTestPayload :: new ( b"msg_3" ) )
128
+ . await
129
+ . unwrap ( ) ;
97
130
98
131
stop_tx
99
132
. send ( ( ) )
100
133
. expect ( "Failed to send stop signal to DMQ publisher server" ) ;
101
134
}
102
135
} ) ;
103
136
137
+ // Record messages received by the server
104
138
let recorder = tokio:: spawn ( async move {
105
139
let messages: Vec < DmqMessage > = {
106
140
let mut messages = vec ! [ ] ;
@@ -115,9 +149,14 @@ async fn dmq_publisher_client_server() {
115
149
messages
116
150
} ) ;
117
151
152
+ // Check that all messages have been correctly received
118
153
let ( _, _, messages) = tokio:: try_join!( server, client, recorder) . unwrap ( ) ;
119
154
assert_eq ! (
120
- vec![ create_fake_msg( b"msg_1" ) . await , create_fake_msg( b"msg_2" ) . await ] ,
155
+ vec![
156
+ create_fake_msg( b"msg_1" ) . await ,
157
+ create_fake_msg( b"msg_2" ) . await ,
158
+ create_fake_msg( b"msg_3" ) . await ,
159
+ ] ,
121
160
messages
122
161
) ;
123
162
}
0 commit comments