Skip to content

Commit 473cac3

Browse files
authored
Merge pull request #16 from SystemFw/series/0.2
Series/0.2
2 parents 563f1bd + ecd6380 commit 473cac3

18 files changed

+694
-714
lines changed

.scalafmt.conf

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1 @@
1-
project.includeFilters = [
2-
"./src/*"
3-
"build.sbt"
4-
]
1+
align=none

README.md

Lines changed: 152 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -14,138 +14,176 @@ To get **upperbound**, add the following line to your `build.sbt`
1414
libraryDependencies += "org.systemfw" %% "upperbound" % "version"
1515
```
1616

17-
## Purity
17+
You can find the latest version in the [releases](https://github.com/SystemFw/upperbound/releases) tab.__
18+
**upperbound** depends on `fs2`, `cats`, `cats-effect` and `cats-collections`.
1819

20+
**Note:**
21+
22+
For the time being binary compatibility is **not**
23+
guaranteed. This is not a problem for usage in applications (which is
24+
where you would mostly use a rate limiter anyway), but risky if used
25+
in libraries. Binary compatibility will be guaranteed in the future.
26+
27+
## Design principles
28+
29+
**upperbound** is an interval based limiter, which means jobs are
30+
started at a _constant rate_. This strategy prevents spikes in
31+
throughput, and makes it a very good fit for client side limiting,
32+
e.g. calling a rate limited API.
1933
**upperbound** is completely pure, which allows for ease of reasoning
2034
and composability. On a practical level, this means that some
2135
familiarity with cats, cats-effect and fs2 is required.
2236

2337
## Usage
2438

25-
**upperbound**'s main datatypes are two: `Worker` and `Limiter`.
26-
`Worker` is the central entity in the library, it's defined as:
39+
### Limiter
40+
41+
The main entity of the library is a `Limiter`, which is defined as:
2742

2843
``` scala
29-
trait Worker[F[_]] {
44+
trait Limiter[F[_]] {
3045
def submit[A](job: F[A], priority: Int = 0): F[Unit]
31-
def await[A](job: F[A], priority: Int = 0): F[A]
46+
47+
def interval: SignallingRef[F, FiniteDuration]
48+
49+
def initial: FiniteDuration
50+
51+
def pending: F[Int]
3252
}
3353
```
54+
3455
The `submit` method takes an `F[A]`, which can represent any
3556
program, and returns an `F[Unit]` that represents the action of
3657
submitting it to the limiter, with the given priority. The semantics
3758
of `submit` are fire-and-forget: the returned `F[Unit]` immediately
3859
returns, without waiting for the input `F[A]` to complete its
39-
execution.
40-
41-
The `await` method takes an `F[A]`, which can represent any program,
42-
and returns another `F[A]` that represents the action of submitting
43-
the input `F` to the limiter with the given priority, and waiting
44-
for its result. The semantics of `await` are blocking: the returned
45-
`F[A]` only completes when the input `F` has finished its
46-
execution. However, the blocking is only semantic, no actual threads
47-
are blocked by the implementation.
48-
49-
Both `Worker.submit` and `Worker.await` are designed to be called
50-
concurrently: every concurrent call submits a job to `Limiter`, and
51-
they are then started (in order of priority) at a rate which is
52-
no higher then the maximum rate you specify on construction.
53-
A higher number indicates higher priority, and FIFO order is used in
54-
case there are multiple jobs with the same priority being throttled.
55-
56-
Naturally, you need to ensure that all the places in your code that
57-
need rate limiting share the same instance of `Worker`. It might be
58-
not obvious at first how to do this purely functionally, but it's in
59-
fact very easy: just pass a `Worker` as a parameter. For example, if
60-
you have a class with two methods that need rate limiting, have the
61-
constructor of your class accept a `Worker`.
62-
63-
Following this approach, _your whole program_ will end up having type
64-
`Worker => F[Whatever]`, and all you need now is creating a
65-
`Worker`. This is what `Limiter` is for:
60+
execution.
61+
`Limiter.submit` is designed to be called concurrently: every
62+
concurrent call submits a job to `Limiter`, and they are then started
63+
(in order of priority) at a rate which is no higher then the maximum
64+
rate you specify on construction. A higher number indicates higher
65+
priority, and FIFO order is used in case there are multiple jobs with
66+
the same priority being throttled.
67+
`interval` is an `fs2.concurrent.SignallingRef` that allows you to
68+
sample, change or react to changes to the current interval between two
69+
tasks. Finally, `initial` and `pending` return the initial interval
70+
specified on creation, and the number of jobs that are queued up
71+
waiting to start, respectively.
72+
73+
The `Limiter` algebra is the basic building block of the library,
74+
additional functionality is expressed as combinators over it.
75+
76+
### Program construction
77+
78+
To create a `Limiter`, use the `start` method:
6679

6780
``` scala
68-
import cats.effect.Effect
69-
70-
trait Limiter[F[_]] {
71-
def worker: Worker[F]
72-
def shutDown: F[Unit])
73-
}
81+
case class Rate(n: Int, t: FiniteDuration)
7482

7583
object Limiter {
76-
def start[F[_]: Effect](maxRate: Rate)(implicit ec: ExecutionContext): F[Limiter[F]]
77-
def stream[F[_]: Effect](maxRate: Rate)(implicit ec: ExecutionContext): Stream[F, Limiter[F]]
84+
def start[F[_]: Concurrent: Timer](maxRate: Rate, n: Int = Int.MaxValue): Resource[F, Limiter[F]]
7885
}
7986
```
80-
You should only need `Limiter` at the end of your program, to assemble
81-
all the parts together. Imagine your program is defined as:
8287

83-
``` scala
84-
import upperbound._
85-
import cats.effect.IO
86-
87-
case class YourWholeProgram(w: Worker[IO]) {
88-
def doStuff: IO[Unit] = {
89-
def yourLogic: IO[Whatever] = ???
90-
w.submit(yourLogic)
91-
}
92-
}
93-
```
94-
you can then do:
88+
`start` creates a new `Limiter` and starts processing the jobs
89+
submitted so it, which are started at a rate no higher than `maxRate`.
90+
`import upperbound.syntax.rate._` exposes the `every` syntax for creating `Rate`s:
9591

9692
``` scala
97-
import upperbound._, syntax.rate._
98-
import fs2.Stream
99-
import cats.effect.{IO, IOApp, ExitCode}
100-
import scala.concurrent.ExecutionContext.Implicits.global
93+
import upperbound.syntax.rate._
10194
import scala.concurrent.duration._
10295

103-
object Main extends IOApp {
104-
override def run(args: List[String]): IO[ExitCode] =
105-
for {
106-
limiter <- Limiter.stream[IO](100 every 1.minute)
107-
res <- Stream.eval(YourWholeProgram(limiter.worker).doStuff).as(ExitCode.Success)
108-
} yield res
109-
}
96+
Limiter.start[F](100 every 1.minute)
11097
```
11198

112-
Note: the `every` syntax for declaring `Rate`s requires
99+
Additionally, `n` enforces a bound on the maximum number of jobs
100+
allowed to queue up while waiting for execution. Once this number is
101+
reached, calling `submit` will fail the resulting `F[Unit]` with a
102+
`LimitReachedException`, so that you can in turn signal for
103+
backpressure downstream. Processing restarts as soon as the number of
104+
jobs waiting goes below `n` again.
105+
106+
The reason `start` returns a `cats.effect.Resource` is so that
107+
processing can be stopped gracefully when the `Limiter`'s lifetime is
108+
over.
109+
To assemble your program, all the places that need limiting at the
110+
same rate should take a `Limiter` as an argument, which is then
111+
created at the end of a region of sharing (typically `main`) and
112+
injected via `Limiter.start(...).use` or
113+
`Stream.resource(Limiter.start(...)).flatMap`. If this sentence didn't
114+
make sense to you, it's recommended to watch [this talk](https://github.com/SystemFw/scala-italy-201).
115+
116+
117+
**Note:**
118+
119+
It's up to you whether you want to pass the `Limiter` algebra
120+
implicitly (as an `F[_]: Limiter` bound) or explicitly.
121+
My position is that it's ok to pass algebras implicitly _as long as
122+
the instance is made implicit at call site_, as close as possible to
123+
where it's actually injected. This avoids any problems related to
124+
mixing things up, and is essentially equivalent to having an instance
125+
of your algebra for a newtype over Kleisli.
126+
127+
Reasonable people might disagree however, and I myself pass algebras
128+
around both ways, in different codebases.
129+
**upperbound** is slightly skewed towards the `F[_]: Limiter` style:
130+
internal combinators are expressed that way, and `Limiter` has a
131+
summoner method to allow `Limiter[F].submit`
132+
133+
### Await
134+
135+
As mentioned above, `submit` has fire-and-forget semantics.
136+
When this is not sufficient, you can use `await`:
113137

114138
``` scala
115-
import upperbound.syntax.rate._
116-
import scala.concurrent.duration._
139+
object Limiter {
140+
def await[F[_]: Concurrent: Limiter, A](job: F[A], priority: Int = 0): F[A]
141+
}
117142
```
118143

119-
## Testing
120-
One further advantage of the architecture outlined above is testability.
121-
In particular, you normally don't care about rate limiting in unit
122-
tests, but the logic you are testing might require a `Worker` when
123-
it's actually running. In this case, it's enough to pass in a stub
124-
implementation of `Worker` that contains whatever logic is needed for
125-
your tests. In particular, you can use `upperbound.testWorker` to get an
126-
instance that does no rate limiting.
144+
`await` looks very similar to `submit`, except its semantics are
145+
blocking: the returned `F[A]` only completes when `job` has
146+
finished its execution. Note however, that the blocking is only semantic,
147+
no actual threads are blocked by the implementation.
127148

128-
## Backpressure
149+
### Backpressure
129150

130-
`upperbound` gives you a mechanism for applying backpressure to the
131-
`Limiter` based on the result of a specific job submitted by the
132-
corresponding `Worker` (e.g. a REST call that got rejected upstream).
133-
In particular, both `submit` and `await` take an extra (optional)
134-
argument:
151+
`Limiter[F].interval` offers flexible control over the rate, which can
152+
be used as a mechanism for applying backpressure based on the result
153+
of a specific job (e.g. a REST call that got rejected upstream).
154+
Although this can be implemented entirely in user land, **upperbound**
155+
provides some backpressure helpers and combinators out of the box.
135156

136157
``` scala
137-
def submit[A](job: F[A], priority: Int, ack: BackPressure.Ack[A])
138-
def await[A](job: F[A], priority: Int, ack: BackPressure.Ack[A])
158+
class BackPressure[F[_]: Limiter, A](job: F[A]) {
159+
def withBackoff(
160+
backOff: FiniteDuration => FiniteDuration,
161+
ack: BackPressure.Ack[A]
162+
): F[A]
163+
}
164+
object BackPressure {
165+
case class Ack[-A](slowDown: Either[Throwable, A] => Boolean)
166+
}
139167
```
140168

141-
`BackPressure.Ack[A]` is an alias for `Either[Throwable, A] => BackPressure`,
142-
where `case class BackPressure(slowDown: Boolean)` is used to assert
169+
`withBackoff` enriches an `F[A]` with a `Limiter` constraint with the ability to apply backpressure to the `Limiter`:
170+
Every time a job signals backpressure is needed through `ack`, the `Limiter` will
171+
adjust its current rate by applying `backOff` to it. This means the
172+
rate will be adjusted by calling `backOff` repeatedly whenever
173+
multiple consecutive jobs signal for backpressure, and reset to its
174+
original value when a job signals backpressure is no longer needed.
175+
176+
Note that since jobs submitted to the Limiter are processed
177+
asynchronously, rate changes will not propagate instantly when the
178+
rate is smaller than the job completion time. However, the rate will
179+
eventually converge to its most up-to-date value.
180+
181+
`BackPressure.Ack[A]` is a wrapper over `Either[Throwable, A] => Boolean`,
182+
and it's used to assert
143183
that backpressure is needed based on a specific result (or error) of
144-
the submitted job. You can write your own `Ack`s, but `upperbound` provides
145-
some for you:
184+
the submitted job. You can write your own `Ack`s, but the library provides
185+
some for you, including:
146186

147-
- `BackPressure.never`: never signal backpressure. If you don't
148-
specify an `ack`, this is passed as a default.
149187
- `BackPressure.onAllErrors`: signal backpressure every time a job
150188
fails with any error.
151189
- `BackPressure.onError[E <: Throwable]`: signal backpressure if a job
@@ -154,33 +192,33 @@ some for you:
154192
- `BackPressure.onResult(cond: A => Boolean)`: signal backpressure
155193
when the result of a job satisfies the given condition.
156194

157-
To deal with backpressure, `Limiter.start` takes extra optional parameters:
158-
195+
Note that `withBackoff` only transforms the input job, you still need
196+
to actually `submit` or `await` yourself. This is done to allow
197+
further combinators to operate on a job as a chain of `F[A] => F[A]`
198+
functions before actually submitting to the `Limiter`.
199+
It's also available as syntax:
200+
159201
``` scala
160-
def start(maxRate: Rate,
161-
backOff: FiniteDuration => FiniteDuration = identity,
162-
n: Int = Int.MaxValue): F[Limiter]
163-
```
202+
import scala.concurrent.duration._
203+
import upperbound._
204+
import upperbound.syntax.backpressure._
164205

165-
Every time a job signals backpressure is needed, the Limiter will
166-
adjust its current rate by applying `backOff` to it. This means the
167-
rate will be adjusted by calling `backOff` repeatedly whenever
168-
multiple consecutive jobs signal for backpressure, and reset to its
169-
original value when a job signals backpressure is no longer needed.
170206

171-
Note that since jobs submitted to the Limiter are processed
172-
asynchronously, rate changes will not propagate instantly when the
173-
rate is smaller than the job completion time. However, the rate will
174-
eventually converge to its most up-to-date value.
207+
def prog[F[_]: Limiter, A](fa: F[A]): F[Unit] =
208+
Limiter[F].submit {
209+
fa.withBackoff(_ + 1.second, Backpressure.onAllErrors)
210+
}
211+
```
175212

176-
Similarly, `n` allows you to place a bound on the maximum number of
177-
jobs allowed to queue up while waiting for execution. Once this number
178-
is reached, the `F` returned by any call to the corresponding
179-
`Worker` will immediately fail with a `LimitReachedException`, so
180-
that you can in turn signal for backpressure downstream. Processing
181-
restarts as soon as the number of jobs waiting goes below `n` again.
182213

183-
Please be aware that the backpressure support doesn't interfere with
214+
Finally, please be aware that the backpressure support doesn't interfere with
184215
your own error handling, nor does any error handling (e.g. retrying)
185-
for you. This is an application specific concern and should be handled
186-
in application code.
216+
for you.
217+
218+
219+
### Test limiter
220+
221+
If you need to satisfy a `Limiter` constraint in test code, where you
222+
don't actally care about rate limiting, you can use `Limiter.noOp`,
223+
which gives you a stub `Limiter` with no actual rate limiting and a
224+
synchronous `submit` method.

build.sbt

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ lazy val root = (project in file(".")).settings(
1212
lazy val commonSettings = Seq(
1313
organization := "org.systemfw",
1414
name := "upperbound",
15-
scalaVersion := "2.11.11",
16-
crossScalaVersions := Seq("2.11.11", "2.12.1")
15+
scalaVersion := "2.12.6",
16+
crossScalaVersions := Seq("2.11.12", scalaVersion.value),
17+
scalafmtOnCompile := true
1718
)
1819

1920
lazy val consoleSettings = Seq(
@@ -38,7 +39,7 @@ lazy val compilerOptions =
3839
)
3940

4041
lazy val typeSystemEnhancements =
41-
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3")
42+
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.8")
4243

4344
def dep(org: String)(version: String)(modules: String*) =
4445
Seq(modules: _*) map { name =>
@@ -47,32 +48,22 @@ def dep(org: String)(version: String)(modules: String*) =
4748

4849
lazy val dependencies =
4950
libraryDependencies ++= Seq(
50-
"co.fs2" %% "fs2-core" % "1.0.0",
51+
"co.fs2" %% "fs2-core" % "1.0.2",
5152
"org.typelevel" %% "cats-core" % "1.4.0",
52-
"org.typelevel" %% "dogs-core" % "0.6.10",
53-
"org.typelevel" %% "cats-effect" % "1.0.0"
53+
"org.typelevel" %% "cats-effect" % "1.1.0",
54+
"org.typelevel" %% "cats-collections-core" % "0.7.0"
5455
)
5556

5657
lazy val tests = {
57-
val dependencies = {
58-
val specs2 = dep("org.specs2")("3.8.9")(
59-
"specs2-core",
60-
"specs2-scalacheck"
61-
)
62-
63-
val mixed = Seq(
64-
"org.scalacheck" %% "scalacheck" % "1.13.4",
65-
"org.scalactic" %% "scalactic" % "3.0.1"
66-
)
67-
58+
val dependencies =
6859
libraryDependencies ++= Seq(
69-
specs2,
70-
mixed
71-
).flatten.map(_ % "test")
72-
}
60+
"org.scalacheck" %% "scalacheck" % "1.13.4",
61+
"org.scalatest" %% "scalatest" % "3.0.5",
62+
"org.typelevel" %% "cats-effect-laws" % "1.0.0"
63+
).map(_ % "test")
7364

7465
val frameworks =
75-
testFrameworks := Seq(TestFrameworks.Specs2)
66+
testFrameworks := Seq(TestFrameworks.ScalaTest)
7667

7768
Seq(dependencies, frameworks)
7869
}

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=0.13.17
1+
sbt.version=1.1.6

0 commit comments

Comments
 (0)