Skip to content

Commit e66d41a

Browse files
committed
Merge master into tsasvla
2 parents 7fe249b + f32f4e9 commit e66d41a

24 files changed

+1745
-1741
lines changed

CHANGES.md

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1-
### 0.1.9-alpha5
1+
### 0.2.0-SNAPSHOT
22

3-
Contributions by Erik Assum, Matthew Davidson, led, Dominic Monroe, Justin Sonntag
3+
Contributions by Matthew Davidson, Ryan Smith
44

5+
* Switch to `bound-fn` in `let-flow` to fix bug where dynamic vars were incorrect for other threads
6+
* Modernized indentation to match current Clojure styles and fix misalignments
7+
8+
### 0.1.9
9+
10+
Contributions by Erik Assum, Reynald Borer, Matthew Davidson, Alexey Kachayev, led, Dominic Monroe, Pierre-Yves Ritschard, Ryan Smith, Justin Sonntag, Zach Tellman, Luo Tian, and Philip van Heerden.
11+
12+
* Updated docs to use cljdoc.org by default
13+
* Minor doc improvements
14+
* Bumped up dependencies to modern versions
515
* Convert to CircleCI for testing and remove `jammin`
616
* Set up for clj-commons
717
* Fix bug where excessive pending takes return wrong deferred
8-
* Clean up timed-out pending takes and exponses vars to control clean up behavior
18+
* Clean up timed-out pending takes and exposes vars to control clean-up behavior
919
* Remove Travis CI
1020
* Allow functions passed to `time/in` to return a deferred
11-
12-
### 0.1.9-alpha4
13-
14-
Contributions by Reynald Borer, Alexey Kachayev, Pierre-Yves Ritschard, Ryan Smith, Zach Tellman, Luo Tian, and Philip van Heerden
15-
1621
* Make `time/in` cancellable
1722
* Extend thread-factory builder to create non-daemon threads
1823
* Prevent `let-flow` body from executing on last deferred thread
@@ -58,7 +63,6 @@ Thanks to Tsutomu Yano and Joshua Griffith
5863

5964
* Target latest Dirigiste, which is no longer compiled using JDK 8 byte code.
6065

61-
6266
### 0.1.2
6367

6468
* fix lifecycle for `batch` and `throttle` when the source is a permanent stream

README.md

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
[![Clojars Project](https://img.shields.io/clojars/v/manifold.svg)](https://clojars.org/manifold)
2-
[![cljdoc badge](https://cljdoc.org/badge/manifold)](https://cljdoc.org/d/manifold)
2+
[![cljdoc badge](https://cljdoc.org/badge/manifold/manifold)](https://cljdoc.org/d/manifold/manifold)
33
[![CircleCI](https://circleci.com/gh/clj-commons/manifold.svg?style=svg)](https://circleci.com/gh/clj-commons/manifold)
44
![](docs/manifold.png)
55

66
This library provides basic building blocks for asynchronous programming, and can be used as a translation layer between libraries which use similar but incompatible abstractions.
77

88
Manifold provides two core abstractions: **deferreds**, which represent a single asynchronous value, and **streams**, which represent an ordered sequence of asynchronous values.
99

10-
A detailed discussion of Manifold's rationale can be found [here](http://aleph.io/manifold/rationale.html). Full documentation can be found [here](http://aleph.io/codox/manifold/).
10+
A detailed discussion of Manifold's rationale can be found [here](http://aleph.io/manifold/rationale.html). Full documentation can be found [here](https://cljdoc.org/d/manifold/manifold).
1111

1212

13-
```clj
14-
[manifold "0.1.9-alpha5"]
13+
```clojure
14+
[manifold "0.1.9"]
1515
```
1616

17-
### deferreds
17+
### Deferreds
1818

1919
A deferred in Manifold is similar to a Clojure promise:
2020

21-
```clj
21+
```clojure
2222
> (require '[manifold.deferred :as d])
2323
nil
2424

@@ -34,7 +34,7 @@ true
3434

3535
However, similar to Clojure's futures, deferreds in Manifold can also represent errors. Crucially, they also allow for callbacks to be registered, rather than simply blocking on dereferencing.
3636

37-
```clj
37+
```clojure
3838
> (def d (d/deferred))
3939
#'d
4040

@@ -45,7 +45,7 @@ true
4545
Exception: boom
4646
```
4747

48-
```clj
48+
```clojure
4949
> (def d (d/deferred))
5050
#'d
5151

@@ -61,15 +61,15 @@ true
6161

6262
Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. In practice, **no one should ever need to use `on-realized`**. Manifold provides a number of operators for composing over deferred values, [which can be read about here](/docs/deferred.md).
6363

64-
### streams
64+
### Streams
6565

6666
Manifold's streams provide mechanisms for asynchronous puts and takes, timeouts, and backpressure. They are compatible with Java's `BlockingQueues`, Clojure's lazy sequences, and core.async's channels. Methods for converting to and from each are provided.
6767

6868
Manifold differentiates between **sources**, which emit messages, and **sinks**, which consume messages. We can interact with sources using `take!` and `try-take!`, which return deferred values representing the next message. We can interact with sinks using `put!` and `try-put!`, which return a deferred values which will yield `true` if the put is successful, or `false` otherwise.
6969

7070
We can create a stream using `(manifold.stream/stream)`:
7171

72-
```clj
72+
```clojure
7373
> (require '[manifold.stream :as s])
7474
nil
7575
> (def s (s/stream))
@@ -82,7 +82,7 @@ nil
8282

8383
A stream is both a sink and a source; any message sent via `put!` can be received via `take!`. We can also create sinks and sources from other stream representations using `->sink` and `->source`:
8484

85-
```clj
85+
```clojure
8686
> (require '[clojure.core.async :as a])
8787
nil
8888
> (def c (a/chan))
@@ -97,7 +97,7 @@ nil
9797

9898
We can also turn a Manifold stream into a different representation by using `connect` to join them together:
9999

100-
```clj
100+
```clojure
101101
> (def s (s/stream))
102102
#'s
103103
> (def c (a/chan))
@@ -112,13 +112,17 @@ nil
112112

113113
Manifold can use any transducer, which are applied via `transform`. It also provides stream-specific transforms, including `zip`, `reduce`, `buffer`, `batch`, and `throttle`. [To learn more about streams, go here](/docs/stream.md).
114114

115-
### Java 8 extensions
115+
### Clojurescript
116+
117+
A Clojurescript implementation of Manifold can be found here: [dm3/manifold-cljs](https://github.com/dm3/manifold-cljs).
118+
119+
### Older Java support
116120

117121
Manifold includes support for a few classes introduced in Java 8:
118122
`java.util.concurrent.CompletableFuture` and `java.util.stream.BaseStream`.
119-
Support for Java 8 is detected automatically at compile time; if you are
120-
AOT compiling Manifold on Java 8 or newer, and will be running the compiled
121-
jar with a Java 7 or older JRE, you will need to disable this feature, by
123+
Support for Java 8+ is detected automatically at compile time; if you are
124+
AOT compiling Manifold on Java 8 or newer, but will be running the compiled
125+
jar with a Java 7 or older JRE, you will need to disable them, by
122126
setting the JVM option `"manifold.disable-jvm8-primitives"`, either at the
123127
command line with
124128

@@ -130,12 +134,9 @@ or by adding
130134

131135
to your application's project.clj.
132136

133-
### Clojurescript
134-
135-
A Clojurescript implementation of Manifold can be found here: [dm3/manifold-cljs](https://github.com/dm3/manifold-cljs).
136137

137-
### license
138+
### License
138139

139-
Copyright © 2014-2018 Zach Tellman
140+
Copyright © 2014-2021 Zach Tellman
140141

141142
Distributed under the MIT License.

docs/deferred.md

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
### deferreds
1+
### Deferreds
22

33
A deferred in Manifold is similar to a Clojure promise:
44

5-
```clj
5+
```clojure
66
> (require '[manifold.deferred :as d])
77
nil
88

@@ -18,7 +18,7 @@ true
1818

1919
However, similar to Clojure's futures, deferreds in Manifold can also represent errors. Crucially, they also allow for callbacks to be registered, rather than simply blocking on dereferencing.
2020

21-
```clj
21+
```clojure
2222
> (def d (d/deferred))
2323
#'d
2424

@@ -29,7 +29,7 @@ true
2929
Exception: boom
3030
```
3131

32-
```clj
32+
```clojure
3333
> (def d (d/deferred))
3434
#'d
3535

@@ -43,13 +43,13 @@ success! :foo
4343
true
4444
```
4545

46-
### composing with deferreds
46+
### Composing with deferreds
4747

4848
Callbacks are a useful building block, but they're a painful way to create asynchronous workflows. In practice, no one should ever use `on-realized`.
4949

5050
Instead, they should use `manifold.deferred/chain`, which chains together callbacks, left to right:
5151

52-
```clj
52+
```clojure
5353
> (def d (d/deferred))
5454
#'d
5555

@@ -65,7 +65,7 @@ true
6565

6666
Values that can be coerced into a deferred include Clojure futures, Java futures, and Clojure promises.
6767

68-
```clj
68+
```clojure
6969
> (def d (d/deferred))
7070
#'d
7171

@@ -81,7 +81,7 @@ true
8181

8282
If any stage in `chain` throws an exception or returns a deferred that yields an error, all subsequent stages are skipped, and the deferred returned by `chain` yields that same error. To handle these cases, you can use `manifold.deferred/catch`:
8383

84-
```clj
84+
```clojure
8585
> (def d (d/deferred))
8686
#p
8787

@@ -99,14 +99,14 @@ Using the `->` threading operator, `chain` and `catch` can be easily and arbitra
9999

100100
To combine multiple deferrable values into a single deferred that yields all their results, we can use `manifold.deferred/zip`:
101101

102-
```clj
102+
```clojure
103103
> @(d/zip (future 1) (future 2) (future 3))
104104
(1 2 3)
105105
```
106106

107107
Finally, we can use `manifold.deferred/timeout!` to register a timeout on the deferred which will yield either a specified timeout value or a `TimeoutException` if the deferred is not realized within `n` milliseconds.
108108

109-
```clj
109+
```clojure
110110
> @(d/timeout!
111111
(d/future (Thread/sleep 1000) :foo)
112112
100
@@ -126,7 +126,7 @@ Wherever possible, use `manifold.deferred/deferred` instead of `promise`, and `m
126126

127127
Let's say that we have two services which provide us numbers, and want to get their sum. By using `zip` and `chain` together, this is relatively straightforward:
128128

129-
```clj
129+
```clojure
130130
(defn deferred-sum []
131131
(let [a (call-service-a)
132132
b (call-service-b)]
@@ -137,7 +137,7 @@ Let's say that we have two services which provide us numbers, and want to get th
137137

138138
However, this isn't a very direct expression of what we're doing. For more complex relationships between deferred values, our code will become even more difficult to understand. In these cases, it's often best to use `let-flow`.
139139

140-
```clj
140+
```clojure
141141
(defn deferred-sum []
142142
(let-flow [a (call-service-a)
143143
b (call-service-b)]
@@ -146,7 +146,7 @@ However, this isn't a very direct expression of what we're doing. For more comp
146146

147147
In `let-flow`, we can treat deferred values as if they're realized. This is only true of values declared within or closed over by `let-flow`, however. So we can do this:
148148

149-
```clj
149+
```clojure
150150
(let [a (future 1)]
151151
(let-flow [b (future (+ a 1))
152152
c (+ b 1)]
@@ -155,7 +155,7 @@ In `let-flow`, we can treat deferred values as if they're realized. This is onl
155155

156156
but not this:
157157

158-
```clj
158+
```clojure
159159
(let-flow [a (future 1)
160160
b (let [c (future 1)]
161161
(+ a c))]
@@ -191,7 +191,7 @@ The benefit of this macro over `let-flow` is that it gives complete control of w
191191

192192
Manifold also provides a `loop` macro, which allows for asynchronous loops to be defined. Consider `manifold.stream/consume`, which allows a function to be invoked with each new message from a stream. We can implement similar behavior like so:
193193

194-
```clj
194+
```clojure
195195
(require
196196
'[manifold.deferred :as d]
197197
'[manifold.stream :as s])
@@ -217,6 +217,6 @@ Here we define a loop which takes messages one at a time from `stream`, and pass
217217

218218
While Manifold doesn't provide anything as general purpose as core.async's `go` macro, the combination of `loop` and `let-flow` can allow for the specification of highly intricate asynchronous workflows.
219219

220-
### custom execution models
220+
### Custom execution models
221221

222222
Both deferreds and streams allow for custom execution models to be specified. To learn more, [go here](/docs/execution.md).

docs/execution.md

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,32 @@
1-
Concurrent systems separate **what** happens from **when** it happens. This is typically accomplished by specifying what the programmers wants to happen (e.g. callbacks), and layering atop an execution model that decides when and where the code should be run (e.g. one or more threads reading from a queue of callbacks to be invoked). Often, this execution model is hardcoded, making interop between different stream representations much harder than necessary.
1+
Concurrent systems separate **what** happens from **when** it happens. This is typically accomplished by specifying what the programmers wants to happen (e.g. callbacks), and layering atop an execution model that decides when and where the code should be run (e.g. one or more threads reading from a queue of callbacks to be invoked). Often, this execution model is hard-coded, making interop between different stream representations much harder than necessary.
22

33
Manifold tries to make its execution model as configurable as possible, while still remaining functional for users who don't want to fiddle with the low-level details. Under different circumstances, Manifold will lazily construct three different pools:
44

5-
* *wait-pool* - Used solely to wait on blocking operations. Only created when `manifold.stream/connect` is used on blocking stream abstractions like `java.util.BlockingQueue` or Clojure seqs, or `manifold.deferred/chain` is used with abstractions like `java.util.concurrent.Future` or Clojure promises. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-wait-pool-stats-callback`.
5+
* *wait-pool* - Used solely to wait on blocking operations. Only created when `manifold.stream/connect` is used on blocking stream abstractions like `java.util.BlockingQueue` or Clojure seqs, or when `manifold.deferred/chain` is used with abstractions like `java.util.concurrent.Future` or Clojure promises. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-wait-pool-stats-callback`.
66
* *execute-pool* - Used to execute `manifold.deferred/future` bodies, and only created if that macro is used. This is an instrumented pool, and statistics can be consumed via `manifold.executor/register-execute-pool-stats-callback`.
77
* *scheduler-pool* - Used to execute delayed tasks, periodic tasks, or timeouts. Only created when `manifold.time/in`, `manifold.time/every`, `manifold.stream/periodically`, or take/put timeouts are used.
88

9-
However, by default messages are processed on whatever thread they were originally `put!` on. This can get more complicated if multiple threads are calling `put!` on the same stream at the same time, in which case one thread may propagate messages from the other thread. In general, this means that Manifold conforms to whatever the surrounding execution model is, and users can safely use it in concert with other frameworks.
9+
However, by default, messages are processed on whatever thread they were originally `put!` on. This can get more complicated if multiple threads are calling `put!` on the same stream at the same time, in which case one thread may propagate messages from the other thread. In general, this means that Manifold conforms to whatever the surrounding execution model is, and users can safely use it in concert with other frameworks.
1010

11-
However, this also means that `put!` will only return once the message has been completely propagated through the downstream topology, which is not always the desired behavior. The same is also true for a deferred with a long chain of methods waiting on it to be realized. Conversely, in core.async each hand-off between goroutines is a new task enqueued onto the main thread pool. This gives better guarantees as to how long an enqueue operation will take before it returns, which can be useful in some situations.
11+
This also means that `put!` will only return once the message has been completely propagated through the downstream topology, which is not always the desired behavior. The same is also true for a deferred with a long chain of methods waiting on it to be realized. Conversely, in core.async each hand-off between goroutines is a new task enqueued onto the main thread pool. This gives better guarantees as to how long an enqueue operation will take before it returns, which can be useful in some situations.
1212

1313
In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor.
1414

15-
```clj
16-
(require
17-
'[manifold.deferred :as d]
18-
'[manifold.stream :as s])
15+
```clojure
16+
(require '[manifold.deferred :as d]
17+
'[manifold.stream :as s])
1918

2019
(def executor (fixed-thread-executor 42))
2120

2221
(-> (d/future 1)
23-
(d/onto executor)
24-
(d/chain inc inc inc))
22+
(d/onto executor)
23+
(d/chain inc inc inc))
2524

2625
(->> (s/->source (range 1e3))
27-
(s/onto executor)
28-
(s/map inc))
26+
(s/onto executor)
27+
(s/map inc))
2928
```
3029

31-
If you want to specify your own thread pool, it's important to note that such thread pools in practice either need to have an unbounded queue or an unbounded number of threads. This is because thread pools with bounded queues and threads will throw a `RejectedExecutionException` when they're full, which can leave our message processing in an undefined state if we're only halfway through the message topology. It's important to note, though, that the maximum number of enqueued actions is **not** equal to the number of messages we need to process, but rather to the number of nodes in our topology. This number is usually either fixed, or proportional to something else we can control, such as the number of open connections. In either case, it is not something that a single external actor can artifically inflate (or at least it shouldn't be).
30+
If you want to specify your own thread pool, it's important to note that such thread pools in practice either need to have an unbounded queue or an unbounded number of threads. This is because thread pools with bounded queues and threads will throw a `RejectedExecutionException` when they're full, which can leave our message processing in an undefined state if we're only halfway through the message topology. It's important to note, though, that the maximum number of enqueued actions is **not** equal to the number of messages we need to process, but rather to the number of nodes in our topology. This number is usually either fixed, or proportional to something else we can control, such as the number of open connections. In either case, it is not something that a single external actor can artificially inflate (or at least it shouldn't be).
3231

3332
This configurability is necessary given Manifold's goal of interop with other stream representations, but is only meant to be used by those who need it. Most can, and should, ignore it.

0 commit comments

Comments
 (0)