1
1
use crate :: {
2
- Download , DownloadError , DownloadEvent , DownloadID , DownloadManager , DownloadResult , Progress ,
2
+ Download , DownloadEvent , DownloadID , DownloadManager , Progress , scheduler :: SchedulerCmd ,
3
3
} ;
4
4
use derive_builder:: Builder ;
5
- use reqwest:: Url ;
6
- use std:: {
7
- path:: { Path , PathBuf } ,
8
- time:: Duration ,
5
+ use reqwest:: {
6
+ Url ,
7
+ header:: { HeaderMap , IntoHeaderName } ,
9
8
} ;
9
+ use std:: path:: { Path , PathBuf } ;
10
10
use tokio:: sync:: { broadcast, oneshot, watch} ;
11
11
use tokio_util:: sync:: CancellationToken ;
12
12
13
+ #[ derive( Debug , Clone ) ]
13
14
pub struct Request {
14
15
id : DownloadID ,
15
16
url : Url ,
@@ -18,28 +19,27 @@ pub struct Request {
18
19
19
20
progress : watch:: Sender < Progress > ,
20
21
events : broadcast:: Sender < DownloadEvent > ,
21
- result : oneshot:: Sender < Result < DownloadResult , DownloadError > > ,
22
22
23
23
pub cancel_token : CancellationToken ,
24
24
}
25
25
26
- #[ derive( Debug , Builder ) ]
26
+ #[ derive( Debug , Builder , Clone ) ]
27
27
#[ builder( pattern = "owned" ) ]
28
28
pub struct DownloadConfig {
29
29
#[ builder( default = "3" ) ]
30
30
retries : u32 ,
31
- #[ builder( default , setter( strip_option) ) ]
32
- user_agent : Option < String > ,
33
31
#[ builder( default = "false" ) ]
34
32
overwrite : bool ,
33
+ #[ builder( default = "HeaderMap::new()" , setter( skip) ) ]
34
+ headers : HeaderMap ,
35
35
}
36
36
37
37
impl Default for DownloadConfig {
38
38
fn default ( ) -> Self {
39
39
DownloadConfig {
40
40
retries : 3 ,
41
- user_agent : None ,
42
41
overwrite : false ,
42
+ headers : HeaderMap :: new ( ) ,
43
43
}
44
44
}
45
45
}
@@ -49,13 +49,13 @@ impl DownloadConfig {
49
49
self . retries
50
50
}
51
51
52
- pub fn user_agent ( & self ) -> Option < & str > {
53
- self . user_agent . as_deref ( )
54
- }
55
-
56
52
pub fn overwrite ( & self ) -> bool {
57
53
self . overwrite
58
54
}
55
+
56
+ pub fn headers ( & self ) -> & HeaderMap {
57
+ & self . headers
58
+ }
59
59
}
60
60
61
61
impl Request {
@@ -64,6 +64,7 @@ impl Request {
64
64
url : None ,
65
65
destination : None ,
66
66
config : DownloadConfigBuilder :: default ( ) ,
67
+ headers : HeaderMap :: new ( ) ,
67
68
manager,
68
69
}
69
70
}
@@ -87,20 +88,11 @@ impl Request {
87
88
& self . config
88
89
}
89
90
90
- pub fn is_cancelled ( & self ) -> bool {
91
- self . cancel_token . is_cancelled ( )
92
- }
93
-
94
- fn emit ( & self , event : DownloadEvent ) {
91
+ pub fn emit ( & self , event : DownloadEvent ) {
95
92
// TODO: Log the error
96
93
let _ = self . events . send ( event) ;
97
94
}
98
95
99
- fn send_result ( self , result : Result < DownloadResult , DownloadError > ) {
100
- // TODO: Log the error
101
- let _ = self . result . send ( result) ;
102
- }
103
-
104
96
pub fn update_progress ( & self , progress : Progress ) {
105
97
// TODO: Log the error
106
98
let _ = self . progress . send ( progress) ;
@@ -110,42 +102,17 @@ impl Request {
110
102
self . emit ( DownloadEvent :: Started {
111
103
id : self . id ( ) ,
112
104
url : self . url ( ) . clone ( ) ,
113
- destination : self . destination . clone ( ) ,
105
+ destination : self . destination ( ) . to_path_buf ( ) ,
114
106
total_bytes : None ,
115
107
} ) ;
116
108
}
117
-
118
- pub fn fail ( self , error : DownloadError ) {
119
- self . send_result ( Err ( error) ) ;
120
- }
121
-
122
- pub fn finish ( self , result : DownloadResult ) {
123
- self . emit ( DownloadEvent :: Completed {
124
- id : self . id ( ) ,
125
- path : result. path . clone ( ) ,
126
- bytes_downloaded : result. bytes_downloaded ,
127
- } ) ;
128
- self . send_result ( Ok ( result) )
129
- }
130
-
131
- pub fn retry ( & self , attempt : u32 , delay : Duration ) {
132
- self . emit ( DownloadEvent :: Retrying {
133
- id : self . id ( ) ,
134
- attempt,
135
- next_delay_ms : delay. as_millis ( ) as u64 ,
136
- } ) ;
137
- }
138
-
139
- pub fn cancel ( self ) {
140
- self . emit ( DownloadEvent :: Cancelled { id : self . id ( ) } ) ;
141
- self . send_result ( Err ( DownloadError :: Cancelled ) )
142
- }
143
109
}
144
110
145
111
pub struct RequestBuilder < ' a > {
146
112
url : Option < Url > ,
147
113
destination : Option < PathBuf > ,
148
114
config : DownloadConfigBuilder ,
115
+ headers : HeaderMap ,
149
116
150
117
manager : & ' a DownloadManager ,
151
118
}
@@ -166,47 +133,48 @@ impl RequestBuilder<'_> {
166
133
self
167
134
}
168
135
169
- pub fn user_agent ( mut self , user_agent : impl AsRef < str > ) -> Self {
170
- self . config = self . config . user_agent ( user_agent. as_ref ( ) . into ( ) ) ;
171
- self
136
+ pub fn user_agent ( self , user_agent : impl AsRef < str > ) -> Self {
137
+ self . header ( reqwest:: header:: USER_AGENT , user_agent)
172
138
}
173
139
174
140
pub fn overwrite ( mut self , overwrite : bool ) -> Self {
175
141
self . config = self . config . overwrite ( overwrite) ;
176
142
self
177
143
}
178
144
145
+ pub fn header ( mut self , header : impl IntoHeaderName , value : impl AsRef < str > ) -> Self {
146
+ self . headers . insert ( header, value. as_ref ( ) . parse ( ) . unwrap ( ) ) ;
147
+ self
148
+ }
149
+
179
150
pub fn start ( self ) -> anyhow:: Result < Download > {
180
151
let url = self . url . ok_or_else ( || anyhow:: anyhow!( "URL must be set" ) ) ?;
181
152
let destination = self
182
153
. destination
183
154
. ok_or_else ( || anyhow:: anyhow!( "Destination must be set" ) ) ?;
184
155
let config = self . config . build ( ) ?;
185
156
186
- let ( progress_tx , progress_rx ) = watch :: channel ( Progress :: new ( None ) ) ;
157
+ let id = self . manager . ctx . next_id ( ) ;
187
158
let ( result_tx, result_rx) = oneshot:: channel ( ) ;
159
+ let ( progress_tx, progress_rx) = watch:: channel ( Progress :: new ( None ) ) ;
188
160
let cancel_token = self . manager . child_token ( ) ;
189
-
190
161
let event_tx = self . manager . ctx . events . clone ( ) ;
191
162
let event_rx = event_tx. subscribe ( ) ;
192
- let id = self . manager . ctx . next_id ( ) ;
163
+
193
164
let request = Request {
194
165
id,
195
166
url : url. clone ( ) ,
196
167
destination : destination. clone ( ) ,
197
168
config,
169
+
170
+ events : event_tx,
198
171
progress : progress_tx,
199
- events : event_tx. clone ( ) ,
200
- result : result_tx,
201
172
cancel_token : cancel_token. clone ( ) ,
202
173
} ;
203
174
204
- self . manager . queue_request ( request) ?;
205
- event_tx. send ( DownloadEvent :: Queued {
206
- id,
207
- url,
208
- destination,
209
- } ) ?;
175
+ self . manager
176
+ . scheduler_tx
177
+ . try_send ( SchedulerCmd :: Enqueue { request, result_tx } ) ;
210
178
211
179
Ok ( Download :: new (
212
180
id,
0 commit comments