Skip to content

Commit e359a09

Browse files
committed
Add add_nonblocking, capacity and is_full for streams
1 parent fdd2593 commit e359a09

File tree

3 files changed

+85
-0
lines changed

3 files changed

+85
-0
lines changed

lib_eio/stream.ml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
type drop_priority = Newest | Oldest
2+
13
module Locking = struct
24
type 'a t = {
35
mutex : Mutex.t;
@@ -64,6 +66,26 @@ module Locking = struct
6466
)
6567
)
6668

69+
let add_nonblocking ~drop_priority t item =
70+
Mutex.lock t.mutex;
71+
match Waiters.wake_one t.readers item with
72+
| `Ok -> Mutex.unlock t.mutex; None
73+
| `Queue_empty ->
74+
(* No-one is waiting for an item. Queue it. *)
75+
if Queue.length t.items < t.capacity then (
76+
Queue.add item t.items;
77+
Mutex.unlock t.mutex;
78+
None
79+
) else (
80+
match drop_priority with
81+
| Newest -> Mutex.unlock t.mutex; Some item
82+
| Oldest ->
83+
let dropped_item = Queue.take t.items in
84+
Queue.add item t.items;
85+
Mutex.unlock t.mutex;
86+
Some dropped_item
87+
)
88+
6789
let take t =
6890
Mutex.lock t.mutex;
6991
match Queue.take_opt t.items with
@@ -101,6 +123,8 @@ module Locking = struct
101123
let len = Queue.length t.items in
102124
Mutex.unlock t.mutex;
103125
len
126+
127+
let capacity t = t.capacity
104128

105129
let dump f t =
106130
Fmt.pf f "<Locking stream: %d/%d items>" (length t) t.capacity
@@ -123,6 +147,11 @@ let take = function
123147
| Sync x -> Sync.take x |> Result.get_ok (* todo: allow closing streams *)
124148
| Locking x -> Locking.take x
125149

150+
let add_nonblocking ~drop_priority t v =
151+
match t with
152+
| Sync _ -> Some v
153+
| Locking x -> Locking.add_nonblocking ~drop_priority x v
154+
126155
let take_nonblocking = function
127156
| Locking x -> Locking.take_nonblocking x
128157
| Sync x ->
@@ -134,8 +163,14 @@ let length = function
134163
| Sync _ -> 0
135164
| Locking x -> Locking.length x
136165

166+
let capacity = function
167+
| Sync _ -> 0
168+
| Locking x -> Locking.capacity x
169+
137170
let is_empty t = (length t = 0)
138171

172+
let is_full t = (length t = capacity t)
173+
139174
let dump f = function
140175
| Sync x -> Sync.dump f x
141176
| Locking x -> Locking.dump f x

lib_eio/stream.mli

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,23 @@ val take : 'a t -> 'a
3333
3434
If no items are available, it waits until one becomes available. *)
3535

36+
type drop_priority = Newest | Oldest
37+
38+
val add_nonblocking : drop_priority: drop_priority -> 'a t -> 'a -> 'a option
39+
(** [add_nonblocking ~drop_priority t item] is like [(add t item); None] except that
40+
it returns [Some dropped_item] if the stream is full rather than waiting, where
41+
[dropped_item] is [item] if [drop_priority = Newest], and the first element of the
42+
stream if [drop_priority = Oldest].
43+
44+
In other words, if the stream is full then:
45+
- [add_nonblocking ~drop_priority:Newest t item] is like [Some item]; and
46+
- [add_nonblocking ~drop_priority:Oldest t item] is like
47+
[let dropped_item = take t in add t item; Some dropped_item]
48+
except that no other stream operation can happen (even in other threads)
49+
between the [take] and the [add].
50+
51+
On streams of capacity [0], this always returns [Some item], even if a reader is waiting. *)
52+
3653
val take_nonblocking : 'a t -> 'a option
3754
(** [take_nonblocking t] is like [Some (take t)] except that
3855
it returns [None] if the stream is empty rather than waiting.
@@ -43,8 +60,14 @@ val take_nonblocking : 'a t -> 'a option
4360
val length : 'a t -> int
4461
(** [length t] returns the number of items currently in [t]. *)
4562

63+
val capacity : 'a t -> int
64+
(** [capacity t] returns the number of items [t] can hold without blocking writers. *)
65+
4666
val is_empty : 'a t -> bool
4767
(** [is_empty t] is [length t = 0]. *)
4868

69+
val is_full : 'a t -> bool
70+
(** [is_full t] is [length t = capacity t]. *)
71+
4972
val dump : 'a t Fmt.t
5073
(** For debugging. *)

tests/stream.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ let add t v =
2020
S.add t v;
2121
traceln "Added %d to stream" v
2222
23+
let add_nonblocking ~drop_priority t v =
24+
traceln "Adding %d to stream" v;
25+
match S.add_nonblocking ~drop_priority t v with
26+
| None -> traceln "Added %d to stream" v
27+
| Some d ->
28+
match drop_priority, S.capacity t with
29+
| Newest, _ | _, 0 -> assert (d = v); traceln "Dropped %i instead of adding it to stream" v
30+
| Oldest, _ -> traceln "Dropped %i from stream and added %i to stream" d v
31+
32+
2333
let take t =
2434
traceln "Reading from stream";
2535
traceln "Got %d from stream" (S.take t)
@@ -320,6 +330,23 @@ Cancelling writing to a stream:
320330
- : unit = ()
321331
```
322332

333+
Non-blocking add:
334+
335+
```ocaml
336+
# run @@ fun () ->
337+
let t = S.create 1 in
338+
add t 0;
339+
add_nonblocking ~drop_priority:Newest t 1;
340+
add_nonblocking ~drop_priority:Oldest t 2;;
341+
+Adding 0 to stream
342+
+Added 0 to stream
343+
+Adding 1 to stream
344+
+Dropped 1 instead of adding it to stream
345+
+Adding 2 to stream
346+
+Dropped 0 from stream and added 2 to stream
347+
- : unit = ()
348+
```
349+
323350
Non-blocking take:
324351

325352
```ocaml

0 commit comments

Comments
 (0)