Skip to content

How to prevent deadlock using Async? #107

@romiras

Description

@romiras

I wrote a simple demo producer-consumer using Thread(s) and Queue and it works.
Then I rewrote it using Async, it fails due to deadlock. Tried to use Async::Reactor instead while loop and no luck.

I, [2021-03-27T14:36:31.695540 #30532]  INFO -- : Consumer
Traceback (most recent call last):
	2: from /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
	1: from /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x0000555f03693840 main thread:0x0000555f03693840
* #<Thread:0x0000555f036f4b60 sleep_forever>
   rb_thread_t:0x0000555f03693840 native:0x00007f9440267740 int:0
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `pop'
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
   /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'

Can you advise how to fix it?

require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    # consumer
    t1 = Thread.new do
        log.info('Consumer')
        while (has_jobs(job = q.deq))
            log.info("consume #{job}")
        end
        log.info('Consumer exited')
    end

    # producer
    t2 = Thread.new do
        log.info('Producer')
        (1..times).each do |i|
            log.info("produce #{i}")
            q.enq(i)
            sleep(delay)
        end
        q.enq(nil)
        q.close
        log.info('Producer exited')
    end

    # Ensure we wait for all tasks to complete before continuing:
    [t1, t2].each(&:join)
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')
require 'async'
require 'async/barrier'
require 'async/semaphore'
require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    Async do
        barrier = Async::Barrier.new
        semaphore = Async::Semaphore.new(2, parent: barrier)

        # consumer
        semaphore.async do
            Async do |task|
                log.info('Consumer')
                while (has_jobs(job = q.deq))
                    log.info("consume #{job}")
                end
                # Async::Reactor.run do
                #     job = q.deq
                #     if has_jobs(job)
                #         log.info("consume #{job}")
                #     end
                # end
                log.info('Consumer exited')
            end
        end

        # producer
        semaphore.async do
            Async do |task|
                log.info('Producer')
                (1..times).each do |i|
                    log.info("produce #{i}")
                    q.enq(i)
                    task.sleep(delay)
                end
                q.enq(nil)
                q.close
                log.info('Producer exited')
            end
        end
    
        # Ensure we wait for all tasks to complete before continuing:
        barrier.wait
    end
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions