|
| 1 | +# Limiter |
| 2 | + |
| 3 | +## Submitting jobs |
| 4 | + |
| 5 | +**upperbound** offers a very minimal api, centred around the **Limiter** type: |
| 6 | + |
| 7 | +```scala |
| 8 | +trait Limiter[F[_]] { |
| 9 | + def submit[A](job: F[A], priority: Int = 0): F[A] |
| 10 | +} |
| 11 | +``` |
| 12 | + |
| 13 | +The `submit` method submits a job (which can be an arbitrary task) to |
| 14 | +the limiter and waits until its execution is complete and a result |
| 15 | +is available. |
| 16 | + |
| 17 | +It is designed to be called concurrently: every call submits a job, |
| 18 | +and they are started at regular intervals up to a maximum number of |
| 19 | +concurrent jobs, based on the parameters you specify when creating the |
| 20 | +limiter. |
| 21 | + |
| 22 | +In case of failure, the returned `F[A]` will fail with the same error |
| 23 | +`job` failed with. Note that in **upperbound** no errors are thrown if a job is rate limited, it simply waits to be executed in a queue. |
| 24 | +`submit` can however fail with a `LimitReachedException` if the number |
| 25 | +of enqueued jobs is past the limit you specify when creating the |
| 26 | +limiter. |
| 27 | + |
| 28 | +**upperbound** is well behaved with respect to cancelation: if you |
| 29 | +cancel the `F[A]` returned by `submit`, the submitted job will be |
| 30 | +canceled too. Two scenarios are possible: if cancelation is triggered |
| 31 | +whilst the job is still queued up for execution, it will never be |
| 32 | +executed and the rate of the limiter won't be affected. If instead |
| 33 | +cancelation is triggered while the job is running, it will be |
| 34 | +interrupted, but that slot will be considered used and the next job |
| 35 | +will only be executed after the required time interval has elapsed. |
| 36 | + |
| 37 | +`submit` also takes a `priority` parameter, which lets you submit jobs |
| 38 | +at different priorities, so that higher priority jobs can be executed |
| 39 | +before lower priority ones. |
| 40 | +A higher number means a higher priority, with 0 being the default. |
| 41 | + |
| 42 | +Note that any blocking performed by `submit` is only semantic, no |
| 43 | +actual threads are blocked by the implementation. |
| 44 | + |
| 45 | + |
| 46 | +## Rate limiting controls |
| 47 | + |
| 48 | +To create a `Limiter`, use the `Limiter.start` method, which creates a |
| 49 | +new limiter and starts processing jobs submitted to it. |
| 50 | + |
| 51 | +```scala |
| 52 | +object Limiter { |
| 53 | + def start[F[_]: Temporal]( |
| 54 | + minInterval: FiniteDuration, |
| 55 | + maxConcurrent: Int = Int.MaxValue, |
| 56 | + maxQueued: Int = Int.MaxValue |
| 57 | + ): Resource[F, Limiter[F]] |
| 58 | +} |
| 59 | +``` |
| 60 | + |
| 61 | +> **Note:** |
| 62 | +> It's recommended to use an explicit type ascription such as |
| 63 | +> `Limiter.start[IO]` or `Limiter.start[F]` when calling `start`, to |
| 64 | +> avoid type inference issues. |
| 65 | +
|
| 66 | +In order to avoid bursts, jobs submitted to the limiter are |
| 67 | +started at regular intervals, as specified by the `minInterval` |
| 68 | +parameter. You can pass `minInterval` as a `FiniteDuration`, or using |
| 69 | +**upperbound**'s rate syntax (note the underscores in the imports): |
| 70 | +```scala |
| 71 | +import upperbound._ |
| 72 | +import upperbound.syntax.rate._ |
| 73 | +import scala.concurrent.duration._ |
| 74 | +import cats.effect._ |
| 75 | + |
| 76 | +Limiter.start[IO](minInterval = 1.second) |
| 77 | +// or |
| 78 | +Limiter.start[IO](minInterval = 60 every 1.minute) |
| 79 | +``` |
| 80 | + |
| 81 | +If the duration of some jobs is longer than `minInterval`, multiple |
| 82 | +jobs will be started concurrently. |
| 83 | +You can limit the amount of concurrency with the `maxConcurrent` |
| 84 | +parameter: upon reaching `maxConcurrent` running jobs, the |
| 85 | +limiter will stop pulling new ones until old ones terminate. |
| 86 | +Note that this means that the specified interval between jobs is |
| 87 | +indeed a _minimum_ interval, and it could be longer if the |
| 88 | +`maxConcurrent` bound gets hit. The default is no limit. |
| 89 | + |
| 90 | +Jobs that are waiting to be executed are queued up in memory, and |
| 91 | +you can control the maximum size of this queue with the |
| 92 | +`maxQueued` parameter. |
| 93 | +Once this number is reached, submitting new jobs will immediately |
| 94 | +fail with a `LimitReachedException`, so that you can in turn signal |
| 95 | +for backpressure downstream. Submission is allowed again as soon as |
| 96 | +the number of jobs waiting goes below `maxQueued`. |
| 97 | +`maxQueued` must be **> 0**, and the default is no limit. |
| 98 | + |
| 99 | +> **Notes:** |
| 100 | +> - `Limiter` accepts jobs at different priorities, with jobs at a |
| 101 | +higher priority being executed before lower priority ones. |
| 102 | +> - Jobs that fail or are interrupted do not affect processing of |
| 103 | +> other jobs. |
| 104 | +
|
| 105 | + |
| 106 | +## Program construction |
| 107 | + |
| 108 | +`Limiter.start` returns a `cats.effect.Resource` so that processing |
| 109 | +can be stopped gracefully when the limiter's lifetime is over. When |
| 110 | +the `Resource` is finalised, all pending and running jobs are |
| 111 | +canceled. All outstanding calls to `submit` are also canceled. |
| 112 | + |
| 113 | +To assemble your program, make sure that all the places that need |
| 114 | +limiting at the same rate take `Limiter` as an argument, and create |
| 115 | +one at the end of a region of sharing (typically `main`) via a single |
| 116 | +call to `Limiter.start(...).use`. |
| 117 | + |
| 118 | +In particular, note that the following code creates two different |
| 119 | +limiters: |
| 120 | + |
| 121 | +```scala |
| 122 | +import cats.syntax.all._ |
| 123 | +import upperbound._ |
| 124 | +import cats.effect._ |
| 125 | +import scala.concurrent.duration._ |
| 126 | + |
| 127 | +val limiter = Limiter.start[IO](1.second) |
| 128 | + |
| 129 | +// example modules, generally classes in real code |
| 130 | +def apiCall: IO[Unit] = |
| 131 | + limiter.use { limiter => |
| 132 | + val call: IO[Unit] = ??? |
| 133 | + limiter.submit(call) |
| 134 | + } |
| 135 | + |
| 136 | +def otherApiCall: IO[Unit] = ??? |
| 137 | + limiter.use { limiter => |
| 138 | + val otherCall: IO[Unit] = ??? |
| 139 | + limiter.submit(otherCall) |
| 140 | + } |
| 141 | + |
| 142 | +// example logic |
| 143 | +(apiCall, otherApiCall).parTupled |
| 144 | +``` |
| 145 | + |
| 146 | +Instead, you want to ensure the same limiter is passed to both: |
| 147 | + |
| 148 | +```scala |
| 149 | +import cats.syntax.all._ |
| 150 | +import upperbound._ |
| 151 | +import cats.effect._ |
| 152 | +import scala.concurrent.duration._ |
| 153 | + |
| 154 | +val limiter = Limiter.start[IO](1.second) |
| 155 | + |
| 156 | +// example modules, generally classes in real code |
| 157 | +def apiCall(limiter: Limiter[IO]): IO[Unit] = { |
| 158 | + val call: IO[Unit] = ??? |
| 159 | + limiter.submit(call) |
| 160 | +} |
| 161 | + |
| 162 | +def otherApiCall(limiter: Limiter[IO]): IO[Unit] = { |
| 163 | + val otherCall: IO[Unit] = ??? |
| 164 | + limiter.submit(otherCall) |
| 165 | +} |
| 166 | + |
| 167 | +// example logic |
| 168 | +limiter.use { limiter => |
| 169 | + ( |
| 170 | + apiCall(limiter), |
| 171 | + otherApiCall(limiter) |
| 172 | + ).parTupled |
| 173 | +} |
| 174 | +``` |
| 175 | + |
| 176 | +If you struggled to make sense of the examples in this section, it's |
| 177 | +recommended to watch [this talk](https://systemfw.org/talks.html#scala-italy-2018). |
| 178 | + |
| 179 | +## Adjusting rate and concurrency |
| 180 | + |
| 181 | +**upperbound** lets you control both the rate of submission and the |
| 182 | +maximum concurrency dynamically, through the following methods on |
| 183 | +`Limiter`: |
| 184 | + |
| 185 | +```scala |
| 186 | +def minInterval: F[FiniteDuration] |
| 187 | +def setMinInterval(newMinInterval: FiniteDuration): F[Unit] |
| 188 | +def updateMinInterval(update: FiniteDuration => FiniteDuration): F[Unit] |
| 189 | + |
| 190 | +def maxConcurrent: F[Int] |
| 191 | +def setMaxConcurrent(newMaxConcurrent: Int): F[Unit] |
| 192 | +def updateMaxConcurrent(update: Int => Int): F[Unit] |
| 193 | +``` |
| 194 | + |
| 195 | +The `*minInterval` methods let you change the rate of submission by |
| 196 | +varying the minimum time interval between two tasks. If the interval |
| 197 | +changes while the limiter is sleeping between tasks, the duration of |
| 198 | +the sleep is adjusted on the fly, taking into account any elapsed |
| 199 | +time. This might mean waking up instantly if the entire new interval |
| 200 | +has already elapsed. |
| 201 | + |
| 202 | +The `*maxConcurrent` methods let you change the maximum number of |
| 203 | +concurrent tasks that can be executing at any given time. If the |
| 204 | +concurrency limit gets changed while the limiter is already blocked |
| 205 | +waiting for some tasks to finish, the limiter will then be unblocked |
| 206 | +as soon as the number of running tasks goes below the new concurrency |
| 207 | +limit. Note however that if the limit shrinks the limiter will not try to |
| 208 | +interrupt tasks that are already running, so for some time it might be |
| 209 | +that `runningTasks > maxConcurrent`. |
| 210 | + |
| 211 | + |
| 212 | +## Test limiter |
| 213 | + |
| 214 | +**upperbound** also provides `Limiter.noOp` for testing purposes, which is |
| 215 | +a stub `Limiter` with no actual rate limiting and a synchronous |
| 216 | +`submit` method. |
| 217 | + |
| 218 | + |
| 219 | + |
0 commit comments