Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions benches/memorypool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn bench_memorypool_rc_new<const N: usize>(c: &mut Criterion, op_kcount: usize)
let instances = &mut *instances.borrow_mut();
let mut next_value: [u64; N] = [0; N];
while next_value[0] < op_count as u64 {
let n = memorypool::Rc::new(next_value, &memory).unwrap();
let n = memorypool::Rc::try_new_in(next_value, &memory).unwrap();
instances.push(n);
next_value[0] += 1;
}
Expand Down Expand Up @@ -77,7 +77,7 @@ fn bench_memorypool_rc_drop<const N: usize>(c: &mut Criterion, op_kcount: usize)
let instances = &mut *instances.borrow_mut();
let mut next_value: [u64; N] = [0; N];
while next_value[0] < op_count as u64 {
let n = memorypool::Rc::new(next_value, &memory).unwrap();
let n = memorypool::Rc::try_new_in(next_value, &memory).unwrap();
instances.push(n);
next_value[0] += 1;
}
Expand Down Expand Up @@ -120,7 +120,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut instances = Vec::new();
let mut next_value: u64 = 0;
while next_value < OP_COUNT as u64 {
let n = memorypool::Rc::new(next_value, &memory).unwrap();
let n = memorypool::Rc::try_new_in(next_value, &memory).unwrap();
instances.push(n);
next_value += 1;
}
Expand Down Expand Up @@ -173,7 +173,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut instances = Vec::new();
let mut next_value: u64 = 0;
while next_value < OP_COUNT as u64 {
let n = memorypool::Rc::new(next_value, &memory).unwrap();
let n = memorypool::Rc::try_new_in(next_value, &memory).unwrap();
instances.push(n);
next_value += 1;
}
Expand Down Expand Up @@ -226,15 +226,15 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut instances = Vec::new();
let mut next_value: u64 = 0;
while next_value < OP_COUNT as u64 {
let n = memorypool::Rc::new(next_value, &memory).unwrap();
let n = memorypool::Rc::try_new_in(next_value, &memory).unwrap();
instances.push(n);
next_value += 1;
}

c.bench_function(&format!("mp-rc-deref-x{OP_KCOUNT}k"), |b| {
b.iter(|| {
for (expected_value, r) in instances.iter().enumerate() {
assert_eq!(*r.get(), expected_value as u64);
assert_eq!(**r, expected_value as u64);
}
})
});
Expand Down
42 changes: 22 additions & 20 deletions src/connmgr/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl Connections {
fn batch_add(&self, ckey: usize) -> Result<(), ()> {
let items = &mut *self.items.borrow_mut();
let ci = &mut items.nodes[ckey].value;
let cshared = ci.shared.as_ref().unwrap().get();
let cshared = ci.shared.as_ref().unwrap();

// Only batch connections with known handler addresses
let addr_ref = cshared.to_addr();
Expand Down Expand Up @@ -403,7 +403,7 @@ impl Connections {
let group = {
let group = batch.take_group(|ckey| {
let ci = &nodes[ckey].value;
let cshared = ci.shared.as_ref().unwrap().get();
let cshared = ci.shared.as_ref().unwrap();

// Addr could have been removed after adding to the batch
cshared.to_addr().get()?;
Expand Down Expand Up @@ -455,7 +455,7 @@ impl Connections {

for &ckey in batch.last_group_ckeys() {
let ci = &mut nodes[ckey].value;
let cshared = ci.shared.as_ref().unwrap().get();
let cshared = ci.shared.as_ref().unwrap();

cshared.inc_out_seq();
ci.batch_key = None;
Expand Down Expand Up @@ -895,7 +895,7 @@ impl Worker {
// req_handle_recv
Select5::R5(result) => match result {
Ok((header, msg)) => {
let scratch = memorypool::Rc::new(
let scratch = memorypool::Rc::try_new_in(
RefCell::new(zhttppacket::ParseScratch::new()),
&req_scratch_mem,
)
Expand Down Expand Up @@ -942,7 +942,7 @@ impl Worker {
None
};

let zreq = memorypool::Rc::new(zreq, &req_req_mem).unwrap();
let zreq = memorypool::Rc::try_new_in(zreq, &req_req_mem).unwrap();

let (cstop, r_cstop) =
CancellationToken::new(&reactor.local_registration_memory());
Expand Down Expand Up @@ -1118,7 +1118,7 @@ impl Worker {
Ok(ret) => {
let (msg, session) = ret;

let scratch = memorypool::Rc::new(
let scratch = memorypool::Rc::try_new_in(
RefCell::new(zhttppacket::ParseScratch::new()),
&stream_scratch_mem,
)
Expand Down Expand Up @@ -1165,7 +1165,7 @@ impl Worker {
}
};

let zreq = memorypool::Rc::new(zreq, &stream_req_mem).unwrap();
let zreq = memorypool::Rc::try_new_in(zreq, &stream_req_mem).unwrap();

let (cstop, r_cstop) =
CancellationToken::new(&reactor.local_registration_memory());
Expand All @@ -1181,9 +1181,11 @@ impl Worker {
let (zstream_receiver_sender, zstream_receiver) =
zreceiver_pool.take().unwrap();

let shared =
memorypool::Rc::new(StreamSharedData::new(), &stream_shared_mem)
.unwrap();
let shared = memorypool::Rc::try_new_in(
StreamSharedData::new(),
&stream_shared_mem,
)
.unwrap();

let ckey = conns
.add(
Expand Down Expand Up @@ -1238,7 +1240,7 @@ impl Worker {
// stream_handle.recv_directed
Select6::R6(result) => match result {
Ok(msg) => {
let scratch = memorypool::Rc::new(
let scratch = memorypool::Rc::try_new_in(
RefCell::new(zhttppacket::ParseScratch::new()),
&stream_scratch_mem,
)
Expand All @@ -1252,9 +1254,9 @@ impl Worker {
}
};

let zreq = memorypool::Rc::new(zreq, &stream_req_mem).unwrap();
let zreq = memorypool::Rc::try_new_in(zreq, &stream_req_mem).unwrap();

let zreq_ref = zreq.get().get();
let zreq_ref = zreq.get();

let ids = zreq_ref.ids;

Expand Down Expand Up @@ -1453,8 +1455,7 @@ impl Worker {
shared,
&|| {
// handle task limits addr to FROM_MAX so this is guaranteed to succeed
let from: ArrayVec<u8, FROM_MAX> =
ArrayVec::try_from(zreq.get().get().from).unwrap();
let from: ArrayVec<u8, FROM_MAX> = ArrayVec::try_from(zreq.get().from).unwrap();

let cid = (from, cid.clone());
conns.set_id(ckey, Some(&cid))
Expand Down Expand Up @@ -1656,7 +1657,7 @@ impl Client {
let req_scratch_mem = Rc::new(memorypool::RcMemory::new(1));
let req_req_mem = Rc::new(memorypool::RcMemory::new(1));

let scratch = memorypool::Rc::new(
let scratch = memorypool::Rc::try_new_in(
RefCell::new(zhttppacket::ParseScratch::new()),
&req_scratch_mem,
)
Expand All @@ -1671,7 +1672,7 @@ impl Client {
let msg = Arc::new(zmq::Message::from(msg.as_bytes()));

let zreq = zhttppacket::OwnedRequest::parse(msg, 0, scratch).unwrap();
let zreq = memorypool::Rc::new(zreq, &req_req_mem).unwrap();
let zreq = memorypool::Rc::try_new_in(zreq, &req_req_mem).unwrap();

let resolver = Arc::new(Resolver::new(1, 1));
let tls_config_cache = Arc::new(TlsConfigCache::new());
Expand Down Expand Up @@ -1720,7 +1721,7 @@ impl Client {
let req_scratch_mem = Rc::new(memorypool::RcMemory::new(1));
let req_req_mem = Rc::new(memorypool::RcMemory::new(1));

let scratch = memorypool::Rc::new(
let scratch = memorypool::Rc::try_new_in(
RefCell::new(zhttppacket::ParseScratch::new()),
&req_scratch_mem,
)
Expand All @@ -1735,15 +1736,16 @@ impl Client {
let msg = Arc::new(zmq::Message::from(msg.as_bytes()));

let zreq = zhttppacket::OwnedRequest::parse(msg, 0, scratch).unwrap();
let zreq = memorypool::Rc::new(zreq, &req_req_mem).unwrap();
let zreq = memorypool::Rc::try_new_in(zreq, &req_req_mem).unwrap();

let resolver = Arc::new(Resolver::new(1, 1));
let tls_config_cache = Arc::new(TlsConfigCache::new());
let pool = Arc::new(ConnectionPool::new(0));

let stream_shared_mem = Rc::new(memorypool::RcMemory::new(1));

let shared = memorypool::Rc::new(StreamSharedData::new(), &stream_shared_mem).unwrap();
let shared =
memorypool::Rc::try_new_in(StreamSharedData::new(), &stream_shared_mem).unwrap();

let fut = Worker::stream_connection_task(
stop,
Expand Down
Loading
Loading