|
| 1 | +# Async Usage |
| 2 | + |
| 3 | +Async is a Ruby library that provides asynchronous programming capabilities using fibers and a fiber scheduler. It allows you to write non-blocking, concurrent code that's easy to understand and maintain. |
| 4 | + |
| 5 | +## Tasks |
| 6 | + |
| 7 | +Async uses tasks to represent units of concurrency. Those tasks are backed by fibers and exist in an execution tree. The main way to create a task is to use `Async{...}`: |
| 8 | + |
| 9 | +``` ruby |
| 10 | +Async do |
| 11 | + # Internally non-blocking write: |
| 12 | + puts "Hello World" |
| 13 | +end |
| 14 | +``` |
| 15 | + |
| 16 | +Conceptually, `Async{...}` means execute the given block of code sequentially, but it's execution is asynchronous to the outside world. For example: |
| 17 | + |
| 18 | +``` ruby |
| 19 | +Async do |task| |
| 20 | + # Start two tasks that will run asynchronously: |
| 21 | + child1 = Async{sleep 1; puts "Hello"} |
| 22 | + # Using task.async is the same as Async, but is slightly more efficient: |
| 23 | + child2 = task.async{sleep 2; puts "World"} |
| 24 | + |
| 25 | + # Wait for both tasks to complete: |
| 26 | + child1.wait |
| 27 | + child2.wait |
| 28 | +end |
| 29 | +``` |
| 30 | + |
| 31 | +Waiting on a task returns the result of the block: |
| 32 | + |
| 33 | +```ruby |
| 34 | +Async do |task| |
| 35 | + # Run some computation: |
| 36 | + child = task.async{computation} |
| 37 | + |
| 38 | + # Get the result of the computation: |
| 39 | + result = child.wait |
| 40 | +end |
| 41 | +``` |
| 42 | + |
| 43 | +### Sync |
| 44 | + |
| 45 | +`Async{}` has two uses: it creates an event loop if one doesn't exist, and it creates a task which runs asynchronously with respect to the parent scope. However, the top level `Async{}` block will be synchronous because it creates the event loop. In some programs, you do not care about executing asynchronously, but you still want your code to run in an event loop. `Sync{}` exists to do this efficiently. |
| 46 | + |
| 47 | +```ruby |
| 48 | +# At the top level, this is equivalent to Async{}.wait |
| 49 | +Sync do |
| 50 | +end |
| 51 | + |
| 52 | +Sync do |
| 53 | + # This is a no-op, as it's already in an event loop: |
| 54 | + Sync{...} |
| 55 | + |
| 56 | + # It's semantically equivalent to: |
| 57 | + Async{...}.wait |
| 58 | + # but it is more efficient. |
| 59 | +end |
| 60 | +``` |
| 61 | + |
| 62 | +The main use case for `Sync` is to embed `Async` in methods, e.g. |
| 63 | + |
| 64 | +```ruby |
| 65 | +def fetch_data |
| 66 | + Sync do |
| 67 | + # No matter what, this will happen asynchronously: |
| 68 | + 3.times do |
| 69 | + Async{Net::HTTP.get(...)} |
| 70 | + end |
| 71 | + end |
| 72 | +end |
| 73 | +``` |
| 74 | + |
| 75 | +There are two options for the above code - either it's called from within an event loop, in which case `Sync do ... end` directly executes the block, OR it's invoked without an event loop, in which case it creates an event loop, executes the block, and returns the result (or raises the exception). |
| 76 | + |
| 77 | +### Current Task |
| 78 | + |
| 79 | +It is possible to get the current task using `Async::Task.current`. If you call this methoud without a task, it will raise an exception. If you want a method which returns the current task OR nil, use `Async::Task.current?`. Generally speaking you should not use these methods and instead use the task yielded to the `Async{|task| ...}` block. However, there is one scenario where it can be useful: |
| 80 | + |
| 81 | +```ruby |
| 82 | +def fetch_data(parent: Async::Task.current) |
| 83 | + 3.times do |
| 84 | + Async{Net::HTTP.get(...)} |
| 85 | + end |
| 86 | +end |
| 87 | +``` |
| 88 | + |
| 89 | +If `fetch_data` is called outside of an Async block, it will raise an exception. So, it expresses the intent to the caller that this method should only be invoked from within an asynchonous task. |
| 90 | + |
| 91 | +## Timeouts |
| 92 | + |
| 93 | +General timeouts can be imposed by using `task.with_timeout(duration)`. |
| 94 | + |
| 95 | +```ruby |
| 96 | +Async do |task| |
| 97 | + # This will raise an Async::TimeoutError after 1 second: |
| 98 | + task.with_timeout(1) do |timeout| |
| 99 | + # Timeout#duration= can be used to adjust the duration of the timeout. |
| 100 | + # Timeout#cancel can be used to cancel the timeout completely. |
| 101 | + |
| 102 | + sleep 10 |
| 103 | + end |
| 104 | +end |
| 105 | +``` |
| 106 | + |
| 107 | +## Barriers |
| 108 | + |
| 109 | +Barriers provide a way to manage an unbounded number of tasks. |
| 110 | + |
| 111 | +```ruby |
| 112 | +Async do |
| 113 | + barrier = Async::Barrier.new |
| 114 | + |
| 115 | + items.each do |item| |
| 116 | + barrier.async do |
| 117 | + process(item) |
| 118 | + end |
| 119 | + end |
| 120 | + |
| 121 | + # Process the tasks in order of completion: |
| 122 | + barrier.wait do |task| |
| 123 | + result = task.wait |
| 124 | + # Do something with result. |
| 125 | + |
| 126 | + # If you don't want to wait for any more tasks you can break: |
| 127 | + break |
| 128 | + end |
| 129 | + |
| 130 | + # Or just wait for all tasks to finish: |
| 131 | + barrier.wait # May raise an exception if a task failed. |
| 132 | +ensure |
| 133 | + # Stop all outstanding tasks in the barrier: |
| 134 | + barrier&.stop |
| 135 | +end |
| 136 | +``` |
| 137 | + |
| 138 | +## Semaphores |
| 139 | + |
| 140 | +Semaphores allow you to limit the level of concurrency to a fixed number of tasks: |
| 141 | + |
| 142 | +```ruby |
| 143 | +Async do |task| |
| 144 | + barrier = Async::Barrier.new |
| 145 | + semaphore = Async::Semaphore.new(4, parent: barrier) |
| 146 | + |
| 147 | + # Since the semaphore.async may block, we need to run the work scheduling in a child task: |
| 148 | + task.async do |
| 149 | + items.each do |item| |
| 150 | + semaphore.async do |
| 151 | + process(item) |
| 152 | + end |
| 153 | + end |
| 154 | + end |
| 155 | + |
| 156 | + # Wait for all the work to complete: |
| 157 | + barrier.wait |
| 158 | +ensure |
| 159 | + # Stop all outstanding tasks in the barrier: |
| 160 | + barrier&.stop |
| 161 | +end |
| 162 | +``` |
| 163 | + |
| 164 | +### Idler |
| 165 | + |
| 166 | +Idlers are like semaphores but with a limit defined by current processor utilization. In other words, an idler will do work up to a specific ratio of idle/busy time in the scheduler, and try to maintain that. |
| 167 | + |
| 168 | +```ruby |
| 169 | +Async do |
| 170 | + # Create an idler that will aim for a load average of 80%: |
| 171 | + idler = Async::Idler.new(0.8) |
| 172 | + |
| 173 | + # Some list of work to be done: |
| 174 | + work.each do |work| |
| 175 | + idler.async do |
| 176 | + # Do the work: |
| 177 | + work.call |
| 178 | + end |
| 179 | + end |
| 180 | +end |
| 181 | +``` |
| 182 | + |
| 183 | +The idler will try to schedule as much work such that the load of the scheduler stays at around 80% saturation. |
| 184 | + |
| 185 | +## Queues |
| 186 | + |
| 187 | +Queues allow you to share data between disconnected tasks: |
| 188 | + |
| 189 | +```ruby |
| 190 | +Async do |task| |
| 191 | + queue = Async::Queue.new |
| 192 | + |
| 193 | + reader = task.async do |
| 194 | + while chunk = socket.gets |
| 195 | + queue.push(chunk) |
| 196 | + end |
| 197 | + |
| 198 | + # After this point, we won't be able to add items to the queue, and popping items will eventually result in nil once all items are dequeued: |
| 199 | + queue.close |
| 200 | + end |
| 201 | + |
| 202 | + # Process items from the queue: |
| 203 | + while line = queue.pop |
| 204 | + process(line) |
| 205 | + end |
| 206 | +end |
| 207 | +``` |
| 208 | + |
| 209 | +The above program may have unbounded memory use, so it can be a good idea to use a limited queue with back-pressure: |
| 210 | + |
| 211 | +```ruby |
| 212 | +Async do |task| |
| 213 | + queue = Async::LimitedQueue.new(8) |
| 214 | + |
| 215 | + # Everything else is the same from the queue example, except that the pushing onto the queue will block once 8 items are buffered. |
| 216 | +end |
| 217 | +``` |
0 commit comments