@@ -3,6 +3,7 @@ use super::store::Store;
3
3
use crate :: bee_msg:: misc:: AuthenticateChannel ;
4
4
use crate :: bee_msg:: { Header , Msg , deserialize_body, deserialize_header, serialize} ;
5
5
use crate :: bee_serde:: { Deserializable , Serializable } ;
6
+ use crate :: conn:: TCP_BUF_LEN ;
6
7
use crate :: conn:: store:: StoredStream ;
7
8
use crate :: conn:: stream:: Stream ;
8
9
use crate :: types:: { AuthSecret , Uid } ;
@@ -52,9 +53,7 @@ impl Pool {
52
53
let mut buf = self . store . pop_buf_or_create ( ) ;
53
54
54
55
let msg_len = serialize ( msg, & mut buf) ?;
55
- let resp_header = self
56
- . comm_stream ( node_uid, & mut buf[ 0 ..msg_len] , true )
57
- . await ?;
56
+ let resp_header = self . comm_stream ( node_uid, & mut buf, msg_len, true ) . await ?;
58
57
let resp_msg = deserialize_body ( & resp_header, & buf[ Header :: LEN ..] ) ?;
59
58
60
59
self . store . push_buf ( buf) ;
@@ -71,8 +70,7 @@ impl Pool {
71
70
let mut buf = self . store . pop_buf_or_create ( ) ;
72
71
73
72
let msg_len = serialize ( msg, & mut buf) ?;
74
- self . comm_stream ( node_uid, & mut buf[ 0 ..msg_len] , false )
75
- . await ?;
73
+ self . comm_stream ( node_uid, & mut buf, msg_len, false ) . await ?;
76
74
77
75
self . store . push_buf ( buf) ;
78
76
@@ -95,12 +93,15 @@ impl Pool {
95
93
& self ,
96
94
node_uid : Uid ,
97
95
buf : & mut [ u8 ] ,
96
+ send_len : usize ,
98
97
expect_response : bool ,
99
98
) -> Result < Header > {
99
+ debug_assert_eq ! ( buf. len( ) , TCP_BUF_LEN ) ;
100
+
100
101
// 1. Pop open streams until communication succeeds or none are left
101
102
while let Some ( stream) = self . store . try_pop_stream ( node_uid) {
102
103
match self
103
- . write_and_read_stream ( buf, stream, expect_response)
104
+ . write_and_read_stream ( buf, stream, send_len , expect_response)
104
105
. await
105
106
{
106
107
Ok ( header) => return Ok ( header) ,
@@ -147,7 +148,7 @@ impl Pool {
147
148
// Communication using the newly opened stream should usually not fail. If
148
149
// it does, abort. It might be better to just try the next address though.
149
150
let resp_header = self
150
- . write_and_read_stream ( buf, stream, expect_response)
151
+ . write_and_read_stream ( buf, stream, send_len , expect_response)
151
152
. await
152
153
. with_context ( err_context) ?;
153
154
@@ -166,7 +167,7 @@ impl Pool {
166
167
let stream = self . store . pop_stream ( node_uid) . await ?;
167
168
168
169
let resp_header = self
169
- . write_and_read_stream ( buf, stream, expect_response)
170
+ . write_and_read_stream ( buf, stream, send_len , expect_response)
170
171
. await
171
172
. with_context ( || {
172
173
format ! ( "Communication using existing stream to {node_uid:?} failed" )
@@ -181,9 +182,10 @@ impl Pool {
181
182
& self ,
182
183
buf : & mut [ u8 ] ,
183
184
mut stream : StoredStream ,
185
+ send_len : usize ,
184
186
expect_response : bool ,
185
187
) -> Result < Header > {
186
- stream. as_mut ( ) . write_all ( buf) . await ?;
188
+ stream. as_mut ( ) . write_all ( & buf[ 0 ..send_len ] ) . await ?;
187
189
188
190
let header = if expect_response {
189
191
// Read header
0 commit comments