Skip to content

Commit 4cbb79b

Browse files
committed
downloader: avoid parallel download
1 parent 7db6453 commit 4cbb79b

File tree

2 files changed

+98
-45
lines changed

2 files changed

+98
-45
lines changed

lib/datasets/dataset.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,13 @@ def extract_bz2(bz2)
5454
else
5555
IO.pipe do |bz2_input, bz2_output|
5656
IO.pipe do |plain_input, plain_output|
57+
bz2_stop = false
5758
bz2_thread = Thread.new do
5859
begin
5960
bz2.each do |chunk|
6061
bz2_output.write(chunk)
6162
bz2_output.flush
63+
break if bz2_stop
6264
end
6365
rescue => error
6466
message = "Failed to read bzcat input: " +
@@ -79,7 +81,8 @@ def extract_bz2(bz2)
7981
Process.waitpid(pid)
8082
end
8183
ensure
82-
bz2_thread.kill
84+
bz2_stop = true
85+
bz2_thread.join
8386
end
8487
end
8588
end

lib/datasets/downloader.rb

Lines changed: 94 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,58 +28,108 @@ def download(output_path, &block)
2828
return
2929
end
3030

31-
output_path.parent.mkpath
31+
partial_output_path = Pathname.new("#{output_path}.partial")
32+
synchronize(output_path, partial_output_path) do
33+
output_path.parent.mkpath
3234

33-
n_retries = 0
34-
n_max_retries = 5
35-
begin
36-
headers = {
37-
"Accept-Encoding" => "identity",
38-
"User-Agent" => "Red Datasets/#{VERSION}",
39-
}
40-
start = nil
41-
partial_output_path = Pathname.new("#{output_path}.partial")
42-
if partial_output_path.exist?
43-
start = partial_output_path.size
44-
headers["Range"] = "bytes=#{start}-"
45-
end
46-
47-
start_http(@url, headers) do |response|
48-
if response.is_a?(Net::HTTPPartialContent)
49-
mode = "ab"
50-
else
51-
start = nil
52-
mode = "wb"
35+
n_retries = 0
36+
n_max_retries = 5
37+
begin
38+
headers = {
39+
"Accept-Encoding" => "identity",
40+
"User-Agent" => "Red Datasets/#{VERSION}",
41+
}
42+
start = nil
43+
if partial_output_path.exist?
44+
start = partial_output_path.size
45+
headers["Range"] = "bytes=#{start}-"
5346
end
5447

55-
base_name = @url.path.split("/").last
56-
size_current = 0
57-
size_max = response.content_length
58-
if start
59-
size_current += start
60-
size_max += start
61-
if block_given? and n_retries.zero?
62-
yield_chunks(partial_output_path, &block)
48+
start_http(@url, headers) do |response|
49+
if response.is_a?(Net::HTTPPartialContent)
50+
mode = "ab"
51+
else
52+
start = nil
53+
mode = "wb"
54+
end
55+
56+
base_name = @url.path.split("/").last
57+
size_current = 0
58+
size_max = response.content_length
59+
if start
60+
size_current += start
61+
size_max += start
62+
if block_given? and n_retries.zero?
63+
yield_chunks(partial_output_path, &block)
64+
end
65+
end
66+
progress_reporter = ProgressReporter.new(base_name, size_max)
67+
partial_output_path.open(mode) do |output|
68+
response.read_body do |chunk|
69+
size_current += chunk.bytesize
70+
progress_reporter.report(size_current)
71+
output.write(chunk)
72+
yield(chunk) if block_given?
73+
end
6374
end
6475
end
65-
progress_reporter = ProgressReporter.new(base_name, size_max)
66-
partial_output_path.open(mode) do |output|
67-
response.read_body do |chunk|
68-
size_current += chunk.bytesize
69-
progress_reporter.report(size_current)
70-
output.write(chunk)
71-
yield(chunk) if block_given?
76+
FileUtils.mv(partial_output_path, output_path)
77+
rescue Net::ReadTimeout => error
78+
n_retries += 1
79+
retry if n_retries < n_max_retries
80+
raise
81+
rescue TooManyRedirects => error
82+
last_url = error.message[/\Atoo many redirections: (.+)\z/, 1]
83+
raise TooManyRedirects, "too many redirections: #{@url} .. #{last_url}"
84+
end
85+
end
86+
end
87+
88+
private def synchronize(output_path, partial_output_path)
89+
begin
90+
Process.getpgid(Process.pid)
91+
rescue NotImplementedError
92+
return yield
93+
end
94+
95+
lock_path = Pathname("#{output_path}.lock")
96+
loop do
97+
lock_path.parent.mkpath
98+
begin
99+
lock = lock_path.open(File::RDWR | File::CREAT | File::EXCL)
100+
rescue SystemCallError
101+
valid_lock_path = true
102+
begin
103+
pid = Integer(lock_path.read.chomp, 10)
104+
rescue ArgumentError
105+
# The process that acquired the lock will be exited before
106+
# it stores its process ID.
107+
valid_lock_path = (lock_path.mtime > 10)
108+
else
109+
begin
110+
Process.getpgid(pid)
111+
rescue SystemCallError
112+
# Process that acquired the lock doesn't exist
113+
valid_lock_path = false
72114
end
73115
end
116+
if valid_lock_path
117+
sleep(1 + rand(10))
118+
else
119+
lock_path.delete
120+
end
121+
retry
122+
else
123+
begin
124+
lock.puts(Process.pid.to_s)
125+
lock.flush
126+
yield
127+
ensure
128+
lock.close
129+
lock_path.delete
130+
end
131+
break
74132
end
75-
FileUtils.mv(partial_output_path, output_path)
76-
rescue Net::ReadTimeout => error
77-
n_retries += 1
78-
retry if n_retries < n_max_retries
79-
raise
80-
rescue TooManyRedirects => error
81-
last_url = error.message[/\Atoo many redirections: (.+)\z/, 1]
82-
raise TooManyRedirects, "too many redirections: #{@url} .. #{last_url}"
83133
end
84134
end
85135

0 commit comments

Comments
 (0)