Skip to content

Commit bd57bc4

Browse files
Manually track capacity of groups
1 parent 6949e9b commit bd57bc4

File tree

2 files changed

+113
-30
lines changed

2 files changed

+113
-30
lines changed

src/future/future_group.rs

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,15 @@ pub struct FutureGroup<F> {
6767
wakers: WakerVec,
6868
states: PollVec,
6969
keys: BTreeSet<usize>,
70+
capacity: usize,
7071
}
7172

7273
impl<T: Debug> Debug for FutureGroup<T> {
7374
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7475
f.debug_struct("FutureGroup")
7576
.field("slab", &"[..]")
76-
.field("len", &self.futures.len())
77-
.field("capacity", &self.futures.capacity())
77+
.field("len", &self.len())
78+
.field("capacity", &self.capacity)
7879
.finish()
7980
}
8081
}
@@ -110,6 +111,7 @@ impl<F> FutureGroup<F> {
110111
wakers: WakerVec::new(capacity),
111112
states: PollVec::new(capacity),
112113
keys: BTreeSet::new(),
114+
capacity,
113115
}
114116
}
115117

@@ -127,6 +129,7 @@ impl<F> FutureGroup<F> {
127129
/// group.insert(future::ready(12));
128130
/// assert_eq!(group.len(), 1);
129131
/// ```
132+
#[inline(always)]
130133
pub fn len(&self) -> usize {
131134
self.futures.len()
132135
}
@@ -144,7 +147,7 @@ impl<F> FutureGroup<F> {
144147
/// # let group: FutureGroup<usize> = group;
145148
/// ```
146149
pub fn capacity(&self) -> usize {
147-
self.futures.capacity()
150+
self.capacity
148151
}
149152

150153
/// Returns true if there are no futures currently active in the group.
@@ -209,6 +212,36 @@ impl<F> FutureGroup<F> {
209212
pub fn contains_key(&mut self, key: Key) -> bool {
210213
self.keys.contains(&key.0)
211214
}
215+
216+
/// Reserves capacity for `additional` more futures to be inserted.
217+
/// Does nothing if the capacity is already sufficient.
218+
///
219+
/// # Example
220+
///
221+
/// ```rust
222+
/// use futures_concurrency::future::FutureGroup;
223+
/// use std::future::Ready;
224+
/// # futures_lite::future::block_on(async {
225+
/// let mut group: FutureGroup<Ready<usize>> = FutureGroup::with_capacity(0);
226+
/// assert_eq!(group.capacity(), 0);
227+
/// group.reserve(10);
228+
/// assert_eq!(group.capacity(), 10);
229+
///
230+
/// // does nothing if capacity is sufficient
231+
/// group.reserve(5);
232+
/// assert_eq!(group.capacity(), 10);
233+
/// # })
234+
/// ```
235+
pub fn reserve(&mut self, additional: usize) {
236+
if self.len() + additional < self.capacity {
237+
return;
238+
}
239+
let new_cap = self.capacity + additional;
240+
self.wakers.resize(new_cap);
241+
self.states.resize(new_cap);
242+
self.futures.reserve_exact(additional);
243+
self.capacity = new_cap;
244+
}
212245
}
213246

214247
impl<F: Future> FutureGroup<F> {
@@ -223,26 +256,22 @@ impl<F: Future> FutureGroup<F> {
223256
/// let mut group = FutureGroup::with_capacity(2);
224257
/// group.insert(future::ready(12));
225258
/// ```
226-
pub fn insert(&mut self, stream: F) -> Key
259+
pub fn insert(&mut self, future: F) -> Key
227260
where
228261
F: Future,
229262
{
230-
let index = self.futures.insert(stream);
231-
self.keys.insert(index);
232-
let key = Key(index);
263+
if self.capacity <= self.len() {
264+
self.reserve(self.capacity * 2 + 1);
265+
}
233266

234-
// If our slab allocated more space we need to
235-
// update our tracking structures along with it.
236-
let max_len = self.capacity().max(index);
237-
self.wakers.resize(max_len);
238-
self.states.resize(max_len);
267+
let index = self.futures.insert(future);
268+
self.keys.insert(index);
239269

240270
// Set the corresponding state
241271
self.states[index].set_pending();
242-
let mut readiness = self.wakers.readiness();
243-
readiness.set_ready(index);
272+
self.wakers.readiness().set_ready(index);
244273

245-
key
274+
Key(index)
246275
}
247276

248277
/// Insert a value into a pinned `FutureGroup`
@@ -251,14 +280,14 @@ impl<F: Future> FutureGroup<F> {
251280
/// `ConcurrentStream`. We should never expose this publicly, as the entire
252281
/// point of this crate is that we abstract the futures poll machinery away
253282
/// from end-users.
254-
pub(crate) fn insert_pinned(self: Pin<&mut Self>, stream: F) -> Key
283+
pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key
255284
where
256285
F: Future,
257286
{
258287
let mut this = self.project();
259288
// SAFETY: inserting a value into the futures slab does not ever move
260289
// any of the existing values.
261-
let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(stream);
290+
let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future);
262291
this.keys.insert(index);
263292
let key = Key(index);
264293

@@ -455,4 +484,16 @@ mod test {
455484
assert!(group.is_empty());
456485
});
457486
}
487+
488+
#[test]
489+
fn capacity_grow_on_insert() {
490+
futures_lite::future::block_on(async {
491+
let mut group = FutureGroup::new();
492+
let cap = group.capacity();
493+
494+
group.insert(future::ready(1));
495+
496+
assert!(group.capacity() > cap);
497+
});
498+
}
458499
}

src/stream/stream_group.rs

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub struct StreamGroup<S> {
6666
states: PollVec,
6767
keys: BTreeSet<usize>,
6868
key_removal_queue: SmallVec<[usize; 10]>,
69+
capacity: usize,
6970
}
7071

7172
impl<T: Debug> Debug for StreamGroup<T> {
@@ -108,6 +109,7 @@ impl<S> StreamGroup<S> {
108109
states: PollVec::new(capacity),
109110
keys: BTreeSet::new(),
110111
key_removal_queue: smallvec![],
112+
capacity,
111113
}
112114
}
113115

@@ -124,6 +126,7 @@ impl<S> StreamGroup<S> {
124126
/// group.insert(stream::once(12));
125127
/// assert_eq!(group.len(), 1);
126128
/// ```
129+
#[inline(always)]
127130
pub fn len(&self) -> usize {
128131
self.streams.len()
129132
}
@@ -141,7 +144,7 @@ impl<S> StreamGroup<S> {
141144
/// # let group: StreamGroup<usize> = group;
142145
/// ```
143146
pub fn capacity(&self) -> usize {
144-
self.streams.capacity()
147+
self.capacity
145148
}
146149

147150
/// Returns true if there are no futures currently active in the group.
@@ -157,6 +160,7 @@ impl<S> StreamGroup<S> {
157160
/// group.insert(stream::once(12));
158161
/// assert!(!group.is_empty());
159162
/// ```
163+
#[inline(always)]
160164
pub fn is_empty(&self) -> bool {
161165
self.streams.is_empty()
162166
}
@@ -206,6 +210,36 @@ impl<S> StreamGroup<S> {
206210
pub fn contains_key(&mut self, key: Key) -> bool {
207211
self.keys.contains(&key.0)
208212
}
213+
214+
/// Reserves capacity for `additional` more streams to be inserted.
215+
/// Does nothing if the capacity is already sufficient.
216+
///
217+
/// # Example
218+
///
219+
/// ```rust
220+
/// use futures_concurrency::stream::StreamGroup;
221+
/// use futures_lite::stream::Once;
222+
/// # futures_lite::future::block_on(async {
223+
/// let mut group: StreamGroup<Once<usize>> = StreamGroup::with_capacity(0);
224+
/// assert_eq!(group.capacity(), 0);
225+
/// group.reserve(10);
226+
/// assert_eq!(group.capacity(), 10);
227+
///
228+
/// // does nothing if capacity is sufficient
229+
/// group.reserve(5);
230+
/// assert_eq!(group.capacity(), 10);
231+
/// # })
232+
/// ```
233+
pub fn reserve(&mut self, additional: usize) {
234+
if self.len() + additional < self.capacity {
235+
return;
236+
}
237+
let new_cap = self.capacity + additional;
238+
self.wakers.resize(new_cap);
239+
self.states.resize(new_cap);
240+
self.streams.reserve_exact(additional);
241+
self.capacity = new_cap;
242+
}
209243
}
210244

211245
impl<S: Stream> StreamGroup<S> {
@@ -224,22 +258,18 @@ impl<S: Stream> StreamGroup<S> {
224258
where
225259
S: Stream,
226260
{
261+
if self.capacity <= self.len() {
262+
self.reserve(self.capacity * 2 + 1);
263+
}
264+
227265
let index = self.streams.insert(stream);
228266
self.keys.insert(index);
229-
let key = Key(index);
230-
231-
// If our slab allocated more space we need to
232-
// update our tracking structures along with it.
233-
let max_len = self.capacity().max(index);
234-
self.wakers.resize(max_len);
235-
self.states.resize(max_len);
236267

237268
// Set the corresponding state
238269
self.states[index].set_pending();
239-
let mut readiness = self.wakers.readiness();
240-
readiness.set_ready(index);
270+
self.wakers.readiness().set_ready(index);
241271

242-
key
272+
Key(index)
243273
}
244274

245275
/// Create a stream which also yields the key of each item.
@@ -270,10 +300,10 @@ impl<S: Stream> StreamGroup<S> {
270300

271301
impl<S: Stream> StreamGroup<S> {
272302
fn poll_next_inner(
273-
self: Pin<&mut Self>,
303+
mut self: Pin<&mut Self>,
274304
cx: &Context<'_>,
275305
) -> Poll<Option<(Key, <S as Stream>::Item)>> {
276-
let mut this = self.project();
306+
let mut this = self.as_mut().project();
277307

278308
// Short-circuit if we have no streams to iterate over
279309
if this.streams.is_empty() {
@@ -441,4 +471,16 @@ mod test {
441471
assert!(group.is_empty());
442472
});
443473
}
474+
475+
#[test]
476+
fn capacity_grow_on_insert() {
477+
futures_lite::future::block_on(async {
478+
let mut group = StreamGroup::new();
479+
let cap = group.capacity();
480+
481+
group.insert(stream::once(1));
482+
483+
assert!(group.capacity() > cap);
484+
});
485+
}
444486
}

0 commit comments

Comments
 (0)