Skip to content

Commit 3424b6f

Browse files
authored
Cleanup and docs (#34)
* adding more tests * pushing last touches --------- Co-authored-by: Frank Lee <>
1 parent fc9bc8d commit 3424b6f

File tree

4 files changed

+197
-5
lines changed

4 files changed

+197
-5
lines changed

src/net/client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ impl BluefinClient {
4141
}
4242
}
4343

44+
#[inline]
45+
pub fn set_num_reader_workers(&mut self, num_reader_workers: u16) -> BluefinResult<()> {
46+
if num_reader_workers == 0 {
47+
return Err(BluefinError::Unexpected(
48+
"Cannot have zero reader values".to_string(),
49+
));
50+
}
51+
self.num_reader_workers = num_reader_workers;
52+
Ok(())
53+
}
54+
4455
pub async fn connect(&mut self, dst_addr: SocketAddr) -> BluefinResult<BluefinConnection> {
4556
let socket = Arc::new(UdpSocket::bind(self.src_addr).await?);
4657
self.socket = Some(Arc::clone(&socket));

src/net/ordered_bytes.rs

Lines changed: 184 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ impl OrderedBytes {
204204
}
205205
}
206206

207-
let mut ix = 0;
208207
let base = self.smallest_packet_number_index;
209208
let base_packet_number = {
210209
if let Some(ref _p) = self.packets[base] {
@@ -214,6 +213,7 @@ impl OrderedBytes {
214213
}
215214
};
216215

216+
let mut ix = 0;
217217
while ix < MAX_BUFFER_SIZE
218218
&& self.packets[(base + ix) % MAX_BUFFER_SIZE].is_some()
219219
&& num_bytes < len
@@ -257,3 +257,186 @@ impl OrderedBytes {
257257
Ok(ConsumeResult::new(ix, base_packet_number, num_bytes as u64))
258258
}
259259
}
260+
261+
#[cfg(test)]
262+
mod tests {
263+
use crate::{
264+
core::{
265+
error::BluefinError,
266+
header::{BluefinHeader, BluefinSecurityFields, PacketType},
267+
packet::BluefinPacket,
268+
},
269+
net::MAX_BLUEFIN_PAYLOAD_SIZE_BYTES,
270+
};
271+
272+
use super::OrderedBytes;
273+
274+
#[test]
275+
fn ordered_bytes_carry_over_behaves_as_expected() {
276+
let start_packet_num = rand::random();
277+
let mut ordered_bytes = OrderedBytes::new(0x0, start_packet_num);
278+
279+
assert!(ordered_bytes
280+
.peek()
281+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
282+
283+
// Buffer in one packet with payload of 1500 bytes
284+
let mut payload = vec![];
285+
while payload.len() != MAX_BLUEFIN_PAYLOAD_SIZE_BYTES {
286+
let r: [u8; 15] = rand::random();
287+
payload.extend(r);
288+
}
289+
290+
let security_fields = BluefinSecurityFields::new(false, 0x0);
291+
let mut header =
292+
BluefinHeader::new(0x0, 0x0, PacketType::UnencryptedData, 0, security_fields);
293+
header.packet_number = start_packet_num;
294+
let packet = BluefinPacket::builder()
295+
.header(header)
296+
.payload(payload.clone())
297+
.build();
298+
assert!(ordered_bytes.buffer_in_packet(packet).is_ok());
299+
300+
let mut buf = [0u8; 100];
301+
let consume_res = ordered_bytes.consume(100, &mut buf);
302+
assert!(consume_res.is_ok());
303+
304+
// Consumed 100 bytes. This means 1500 - 100 = 1400 bytes are buffered in the left-over
305+
// bytes buffer
306+
let consume = consume_res.unwrap();
307+
assert_eq!(consume.base_packet_number, start_packet_num);
308+
assert_eq!(consume.num_packets_consumed, 1);
309+
assert_eq!(consume.bytes_consumed, 100);
310+
assert_eq!(payload[..100], buf[..100]);
311+
312+
// Insert another packet with 1500 bytes
313+
let mut second_payload = vec![];
314+
while second_payload.len() != MAX_BLUEFIN_PAYLOAD_SIZE_BYTES {
315+
let r: [u8; 15] = rand::random();
316+
second_payload.extend(r);
317+
}
318+
header.packet_number = start_packet_num + 1;
319+
let packet = BluefinPacket::builder()
320+
.header(header)
321+
.payload(second_payload.clone())
322+
.build();
323+
assert!(ordered_bytes.buffer_in_packet(packet).is_ok());
324+
325+
// Consume another 100 bytes. These 100 bytes should still come from the first payload.
326+
let consume_res = ordered_bytes.consume(100, &mut buf);
327+
assert!(consume_res.is_ok());
328+
329+
// We now have 1400 - 100 = 1300 bytes left in the carry over.
330+
let consume = consume_res.unwrap();
331+
// Base packet number should be zero since it's all coming from the carry over
332+
assert_eq!(consume.base_packet_number, 0);
333+
assert_eq!(consume.num_packets_consumed, 0);
334+
assert_eq!(consume.bytes_consumed, 100);
335+
assert_eq!(payload[100..200], buf[..100]);
336+
337+
// Concume 1400 bytes.
338+
let mut buf = [0u8; 1400];
339+
let consume_res = ordered_bytes.consume(1400, &mut buf);
340+
assert!(consume_res.is_ok());
341+
342+
// 1300 of these bytes come from the carry over. The remaining 100 bytes are from the second
343+
// packet we inserted
344+
let consume = consume_res.unwrap();
345+
assert_eq!(consume.base_packet_number, start_packet_num + 1);
346+
assert_eq!(consume.num_packets_consumed, 1);
347+
assert_eq!(consume.bytes_consumed, 1400);
348+
assert_eq!(payload[200..], buf[..1300]);
349+
assert_eq!(second_payload[..100], buf[1300..]);
350+
}
351+
352+
#[test]
353+
fn ordered_bytes_consume_behaves_as_expected() {
354+
let start_packet_num = rand::random();
355+
let mut ordered_bytes = OrderedBytes::new(0x0, start_packet_num);
356+
357+
assert!(ordered_bytes
358+
.peek()
359+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
360+
361+
let security_fields = BluefinSecurityFields::new(false, 0x0);
362+
let mut header =
363+
BluefinHeader::new(0x0, 0x0, PacketType::UnencryptedData, 0, security_fields);
364+
header.packet_number = start_packet_num + 1;
365+
let mut packet = BluefinPacket::builder()
366+
.header(header)
367+
.payload([1, 2, 3].to_vec())
368+
.build();
369+
370+
assert!(ordered_bytes.buffer_in_packet(packet.clone()).is_ok());
371+
assert!(ordered_bytes
372+
.peek()
373+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
374+
375+
packet.header.packet_number = start_packet_num + 2;
376+
assert!(ordered_bytes.buffer_in_packet(packet.clone()).is_ok());
377+
assert!(ordered_bytes
378+
.peek()
379+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
380+
381+
packet.header.packet_number = start_packet_num + 3;
382+
assert!(ordered_bytes.buffer_in_packet(packet.clone()).is_ok());
383+
assert!(ordered_bytes
384+
.peek()
385+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
386+
387+
packet.header.packet_number = start_packet_num + 5;
388+
assert!(ordered_bytes.buffer_in_packet(packet.clone()).is_ok());
389+
assert!(ordered_bytes
390+
.peek()
391+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
392+
393+
packet.header.packet_number = start_packet_num;
394+
assert!(ordered_bytes.buffer_in_packet(packet.clone()).is_ok());
395+
assert!(ordered_bytes.peek().is_ok());
396+
397+
let mut buf = [0u8; 10];
398+
let consume_res = ordered_bytes.consume(1, &mut buf);
399+
assert!(consume_res.is_ok());
400+
401+
let consume = consume_res.unwrap();
402+
assert_eq!(consume.base_packet_number, start_packet_num);
403+
assert_eq!(consume.num_packets_consumed, 1);
404+
assert_eq!(consume.bytes_consumed, 1);
405+
assert_eq!(buf, [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
406+
407+
// From carry over, 0 packets
408+
let consume_res = ordered_bytes.consume(1, &mut buf);
409+
assert!(consume_res.is_ok());
410+
let consume = consume_res.unwrap();
411+
assert_eq!(consume.num_packets_consumed, 0);
412+
assert_eq!(consume.bytes_consumed, 1);
413+
assert_eq!(buf, [2, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
414+
415+
let consume_res = ordered_bytes.consume(3, &mut buf);
416+
assert!(consume_res.is_ok());
417+
let consume = consume_res.unwrap();
418+
assert_eq!(consume.num_packets_consumed, 1);
419+
assert_eq!(consume.bytes_consumed, 3);
420+
assert_eq!(buf, [3, 1, 2, 0, 0, 0, 0, 0, 0, 0]);
421+
422+
let consume_res = ordered_bytes.consume(4, &mut buf);
423+
assert!(consume_res.is_ok());
424+
let consume = consume_res.unwrap();
425+
assert_eq!(consume.num_packets_consumed, 1);
426+
assert_eq!(consume.bytes_consumed, 4);
427+
assert_eq!(buf, [3, 1, 2, 3, 0, 0, 0, 0, 0, 0]);
428+
429+
let mut buf = [0u8; 10];
430+
let consume_res = ordered_bytes.consume(10, &mut buf);
431+
assert!(consume_res.is_ok());
432+
let consume = consume_res.unwrap();
433+
assert_eq!(consume.num_packets_consumed, 1);
434+
assert_eq!(consume.bytes_consumed, 3);
435+
assert_eq!(buf, [1, 2, 3, 0, 0, 0, 0, 0, 0, 0]);
436+
437+
assert!(ordered_bytes
438+
.peek()
439+
.is_err_and(|e| e == BluefinError::BufferEmptyError));
440+
assert!(ordered_bytes.consume(1, &mut buf).is_err());
441+
}
442+
}

src/worker/reader.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl ReaderRxChannel {
9595
let base_packet_num = consume_res.get_base_packet_number();
9696

9797
// We need to send an ack.
98-
if num_packets_consumed > 0 {
98+
if num_packets_consumed > 0 && base_packet_num != 0 {
9999
if let Err(e) = self
100100
.writer_tx_channel
101101
.send_ack(base_packet_num, num_packets_consumed)
@@ -232,11 +232,9 @@ impl ReaderTxChannel {
232232
if !is_client_ack && !is_hello && packet.header.type_field == PacketType::Ack {
233233
let mut ack_buff = buffers.ack_buff.lock().unwrap();
234234
Self::buffer_to_ack_buffer(&mut ack_buff, packet)?;
235-
drop(ack_buff);
236235
} else {
237236
let mut conn_buff = buffers.conn_buff.lock().unwrap();
238237
Self::buffer_to_conn_buffer(&mut conn_buff, packet, addr, is_hello, is_client_ack)?;
239-
drop(conn_buff);
240238
}
241239
Ok(())
242240
}

src/worker/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl WriterTxChannel {
248248
}
249249

250250
self.num_runs_without_sleep += 1;
251-
if self.num_runs_without_sleep >= 137 {
251+
if self.num_runs_without_sleep >= 100 {
252252
sleep(Duration::from_nanos(10)).await;
253253
self.num_runs_without_sleep = 0;
254254
}

0 commit comments

Comments
 (0)