44require "downloadable"
55require "concurrent/promises"
66require "concurrent/executors"
7+ require "concurrent/atomic/atomic_boolean"
78require "retryable_download"
89require "resource"
910require "utils/output"
1011
1112module Homebrew
13+ # Raised when a download is cancelled cooperatively.
14+ class CancelledDownloadError < StandardError ; end
15+
16+ # Manages a queue of concurrent downloads with cooperative cancellation support.
1217 class DownloadQueue
1318 include Utils ::Output ::Mixin
1419
@@ -31,6 +36,7 @@ def initialize(retries: 1, force: false, pour: false)
3136 @spinner = T . let ( nil , T . nilable ( Spinner ) )
3237 @symlink_targets = T . let ( { } , T ::Hash [ Pathname , T ::Set [ Downloadable ] ] )
3338 @downloads_by_location = T . let ( { } , T ::Hash [ Pathname , Concurrent ::Promises ::Future ] )
39+ @cancelled = T . let ( Concurrent ::AtomicBoolean . new ( false ) , Concurrent ::AtomicBoolean )
3440 end
3541
3642 sig {
@@ -40,6 +46,7 @@ def initialize(retries: 1, force: false, pour: false)
4046 ) . void
4147 }
4248 def enqueue ( downloadable , check_attestation : false )
49+ @cancelled . make_false
4350 cached_location = downloadable . cached_download
4451
4552 @symlink_targets [ cached_location ] ||= Set . new
@@ -48,10 +55,14 @@ def enqueue(downloadable, check_attestation: false)
4855
4956 @downloads_by_location [ cached_location ] ||= Concurrent ::Promises . future_on (
5057 pool , RetryableDownload . new ( downloadable , tries :, pour :) ,
51- force , quiet , check_attestation
52- ) do |download , force , quiet , check_attestation |
58+ @cancelled , force , quiet , check_attestation
59+ ) do |download , cancelled , force , quiet , check_attestation |
60+ raise CancelledDownloadError if cancelled . true?
61+
5362 download . clear_cache if force
5463 download . fetch ( quiet :)
64+ raise CancelledDownloadError if cancelled . true?
65+
5566 if check_attestation && downloadable . is_a? ( Bottle )
5667 Utils ::Attestation . check_attestation ( downloadable , quiet : true )
5768 end
@@ -70,6 +81,8 @@ def fetch
7081 if concurrency == 1
7182 downloads . each do |downloadable , promise |
7283 promise . wait!
84+ rescue CancelledDownloadError
85+ next
7386 rescue ChecksumMismatchError => e
7487 ofail "#{ downloadable . download_queue_type } reports different checksum: #{ e . expected } "
7588 rescue => e
@@ -86,6 +99,7 @@ def fetch
8699 output_message = lambda do |downloadable , future , last |
87100 status = status_from_future ( future )
88101 exception = future . reason if future . rejected?
102+ next 1 if exception . is_a? ( CancelledDownloadError )
89103 next 1 if bottle_manifest_error? ( downloadable , exception )
90104
91105 message = downloadable . download_queue_message
@@ -162,10 +176,6 @@ def fetch
162176 # We want to catch all exceptions to ensure we can cancel any
163177 # running downloads and flush the TTY.
164178 rescue Exception # rubocop:disable Lint/RescueException
165- remaining_downloads . each do |_ , future |
166- # FIXME: Implement cancellation of running downloads.
167- end
168-
169179 cancel
170180
171181 if previous_pending_line_count . positive?
@@ -230,10 +240,10 @@ def bottle_manifest_error?(downloadable, exception)
230240
231241 sig { void }
232242 def cancel
233- # FIXME: Implement graceful cancellation of running downloads based on
234- # https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Cancellation.html
235- # instead of killing the whole thread pool .
236- pool . kill
243+ # Signal cooperative cancellation to all running downloads.
244+ # Downloads check the cancelled flag at key points and will raise
245+ # CancelledDownloadError when cancelled .
246+ @cancelled . make_true
237247 end
238248
239249 sig { returns ( Concurrent ::FixedThreadPool ) }
@@ -345,6 +355,7 @@ def message_with_progress(downloadable, future, message, message_length_max)
345355 "#{ message [ 0 , message_length ] . to_s . ljust ( message_length ) } #{ progress } "
346356 end
347357
358+ # Animated spinner for download progress display.
348359 class Spinner
349360 FRAMES = [
350361 "⠋" ,
0 commit comments