18
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
19
// DEALINGS IN THE SOFTWARE.
20
20
21
- use async_io:: Async ;
22
21
use futures:: prelude:: * ;
23
- use libp2p_core:: transport:: Transport ;
24
- use libp2p_core:: upgrade:: Negotiated ;
25
- use libp2p_core:: { transport, upgrade} ;
22
+ use libp2p_core:: transport:: { MemoryTransport , Transport } ;
23
+ use libp2p_core:: { upgrade, InboundUpgrade , OutboundUpgrade } ;
26
24
use libp2p_identity as identity;
27
- use libp2p_identity:: PeerId ;
28
25
use libp2p_noise as noise;
29
- use libp2p_tcp as tcp;
30
26
use log:: info;
31
27
use quickcheck:: * ;
32
- use std:: { convert:: TryInto , io, net :: TcpStream } ;
28
+ use std:: { convert:: TryInto , io} ;
33
29
34
30
#[ allow( dead_code) ]
35
31
fn core_upgrade_compat ( ) {
36
32
// Tests API compaibility with the libp2p-core upgrade API,
37
33
// i.e. if it compiles, the "test" is considered a success.
38
34
let id_keys = identity:: Keypair :: generate_ed25519 ( ) ;
39
35
let noise = noise:: Config :: new ( & id_keys) . unwrap ( ) ;
40
- let _ = tcp :: async_io :: Transport :: default ( )
36
+ let _ = MemoryTransport :: default ( )
41
37
. upgrade ( upgrade:: Version :: V1 )
42
38
. authenticate ( noise) ;
43
39
}
@@ -50,119 +46,65 @@ fn xx() {
50
46
let server_id = identity:: Keypair :: generate_ed25519 ( ) ;
51
47
let client_id = identity:: Keypair :: generate_ed25519 ( ) ;
52
48
53
- let server_id_public = server_id. public ( ) ;
54
- let client_id_public = client_id. public ( ) ;
55
-
56
- let server_transport = tcp:: async_io:: Transport :: default ( )
57
- . and_then ( move |output, endpoint| {
58
- upgrade:: apply (
59
- output,
60
- noise:: Config :: new ( & server_id) . unwrap ( ) ,
61
- endpoint,
62
- upgrade:: Version :: V1 ,
63
- )
64
- } )
65
- . map ( move |out, _| {
66
- assert_eq ! ( out. 0 , client_id_public. to_peer_id( ) ) ;
67
-
68
- out
69
- } )
70
- . boxed ( ) ;
71
-
72
- let client_transport = tcp:: async_io:: Transport :: default ( )
73
- . and_then ( move |output, endpoint| {
74
- upgrade:: apply (
75
- output,
76
- noise:: Config :: new ( & client_id) . unwrap ( ) ,
77
- endpoint,
78
- upgrade:: Version :: V1 ,
79
- )
80
- } )
81
- . map ( move |out, _| {
82
- assert_eq ! ( out. 0 , server_id_public. to_peer_id( ) ) ;
49
+ let ( client, server) = futures_ringbuf:: Endpoint :: pair ( 100 , 100 ) ;
50
+
51
+ futures:: executor:: block_on ( async move {
52
+ let (
53
+ ( reported_client_id, mut client_session) ,
54
+ ( reported_server_id, mut server_session) ,
55
+ ) = futures:: future:: try_join (
56
+ noise:: Config :: new ( & server_id)
57
+ . unwrap ( )
58
+ . upgrade_inbound ( server, b"" ) ,
59
+ noise:: Config :: new ( & client_id)
60
+ . unwrap ( )
61
+ . upgrade_outbound ( client, b"" ) ,
62
+ )
63
+ . await
64
+ . unwrap ( ) ;
83
65
84
- out
85
- } )
86
- . boxed ( ) ;
66
+ assert_eq ! ( reported_client_id, client_id. public( ) . to_peer_id( ) ) ;
67
+ assert_eq ! ( reported_server_id, server_id. public( ) . to_peer_id( ) ) ;
68
+
69
+ let client_fut = async {
70
+ for m in & messages {
71
+ let n = ( m. 0 . len ( ) as u64 ) . to_be_bytes ( ) ;
72
+ client_session. write_all ( & n[ ..] ) . await . expect ( "len written" ) ;
73
+ client_session. write_all ( & m. 0 ) . await . expect ( "no error" )
74
+ }
75
+ client_session. flush ( ) . await . expect ( "no error" ) ;
76
+ } ;
77
+
78
+ let server_fut = async {
79
+ for m in & messages {
80
+ let len = {
81
+ let mut n = [ 0 ; 8 ] ;
82
+ match server_session. read_exact ( & mut n) . await {
83
+ Ok ( ( ) ) => u64:: from_be_bytes ( n) ,
84
+ Err ( e) if e. kind ( ) == io:: ErrorKind :: UnexpectedEof => 0 ,
85
+ Err ( e) => panic ! ( "error reading len: {e}" ) ,
86
+ }
87
+ } ;
88
+ info ! ( "server: reading message ({} bytes)" , len) ;
89
+ let mut server_buffer = vec ! [ 0 ; len. try_into( ) . unwrap( ) ] ;
90
+ server_session
91
+ . read_exact ( & mut server_buffer)
92
+ . await
93
+ . expect ( "no error" ) ;
94
+ assert_eq ! ( server_buffer, m. 0 )
95
+ }
96
+ } ;
97
+
98
+ futures:: future:: join ( client_fut, server_fut) . await ;
99
+ } ) ;
87
100
88
- run ( server_transport, client_transport, messages) ;
89
101
true
90
102
}
91
103
QuickCheck :: new ( )
92
104
. max_tests ( 30 )
93
105
. quickcheck ( prop as fn ( Vec < Message > ) -> bool )
94
106
}
95
107
96
- type Output = ( PeerId , noise:: Output < Negotiated < Async < TcpStream > > > ) ;
97
-
98
- fn run < I > ( mut server : transport:: Boxed < Output > , mut client : transport:: Boxed < Output > , messages : I )
99
- where
100
- I : IntoIterator < Item = Message > + Clone ,
101
- {
102
- futures:: executor:: block_on ( async {
103
- server
104
- . listen_on ( "/ip4/127.0.0.1/tcp/0" . parse ( ) . unwrap ( ) )
105
- . unwrap ( ) ;
106
-
107
- let server_address = server
108
- . next ( )
109
- . await
110
- . expect ( "some event" )
111
- . into_new_address ( )
112
- . expect ( "listen address" ) ;
113
-
114
- let outbound_msgs = messages. clone ( ) ;
115
- let client_fut = async {
116
- let mut client_session = client
117
- . dial ( server_address. clone ( ) )
118
- . unwrap ( )
119
- . await
120
- . map ( |( _, session) | session)
121
- . expect ( "no error" ) ;
122
-
123
- for m in outbound_msgs {
124
- let n = ( m. 0 . len ( ) as u64 ) . to_be_bytes ( ) ;
125
- client_session. write_all ( & n[ ..] ) . await . expect ( "len written" ) ;
126
- client_session. write_all ( & m. 0 ) . await . expect ( "no error" )
127
- }
128
- client_session. flush ( ) . await . expect ( "no error" ) ;
129
- } ;
130
-
131
- let server_fut = async {
132
- let mut server_session = server
133
- . next ( )
134
- . await
135
- . expect ( "some event" )
136
- . into_incoming ( )
137
- . expect ( "listener upgrade" )
138
- . 0
139
- . await
140
- . map ( |( _, session) | session)
141
- . expect ( "no error" ) ;
142
-
143
- for m in messages {
144
- let len = {
145
- let mut n = [ 0 ; 8 ] ;
146
- match server_session. read_exact ( & mut n) . await {
147
- Ok ( ( ) ) => u64:: from_be_bytes ( n) ,
148
- Err ( e) if e. kind ( ) == io:: ErrorKind :: UnexpectedEof => 0 ,
149
- Err ( e) => panic ! ( "error reading len: {e}" ) ,
150
- }
151
- } ;
152
- info ! ( "server: reading message ({} bytes)" , len) ;
153
- let mut server_buffer = vec ! [ 0 ; len. try_into( ) . unwrap( ) ] ;
154
- server_session
155
- . read_exact ( & mut server_buffer)
156
- . await
157
- . expect ( "no error" ) ;
158
- assert_eq ! ( server_buffer, m. 0 )
159
- }
160
- } ;
161
-
162
- futures:: future:: join ( server_fut, client_fut) . await ;
163
- } )
164
- }
165
-
166
108
#[ derive( Debug , Clone , PartialEq , Eq ) ]
167
109
struct Message ( Vec < u8 > ) ;
168
110
0 commit comments