-
Notifications
You must be signed in to change notification settings - Fork 275
Description
Hi,
I'm building an S3 client for our university's IT center which our researchers can use to work with our RDM storage systems.
I just implemented parallel uploads (PUT requests) and technically it works just fine. To track the progress I want to use a MulitProgress struct which holds the single ProgressBars of each upload. Since the uploads run parallel, I need an additional async task for every single ProgressBar which receives the length of the sent bytes from the running upload.
With this setup, when running the parallel tasks, the single progress bars are displayed. But some except the last bar, which is cleared correctly, the other bars freeze somewhere in the middle. It seems the freeze happens when another bar should be finished and cleared:
Here should only success messages be printed, but there remain some unfinished bars.
The important code chunks are the following:
async fn copy_local_to_s3(&self, args: CopyArgs, cfg: &mut S3Config) -> anyhow::Result<()> {
let mut from = Vec::new();
for p in args.pos_args.first.clone().into_iter() {
let path = p.get_path()?.to_path_buf();
check_if_local_exists(&path)?;
from.push(path);
}
let to = args.pos_args.last.get_s3string()?.clone();
// some missing lines with unimportant code
let multi_bar = MultiProgress::new();
let mut jobs = Vec::new();
for p in from.into_iter() {
let file_size = tokio::fs::metadata(&p).await?.len();
let bar = multi_bar.add(ProgressBar::new(file_size));
let args = args.clone();
let config = cfg.clone();
let to = to.clone();
let job = async move {
Self::put_local_to_s3(args, config, p.clone(), to, file_size, bar).await?;
Ok::<(), anyhow::Error>(())
};
jobs.push(job);
}
futures::future::join_all(jobs).await;
}
// function put_local_to_s3
async fn put_local_to_s3(
args: CopyArgs,
cfg: S3Config,
from: PathBuf,
to: S3String,
file_size: u64,
bar: ProgressBar,
) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::unbounded_channel();
let tx_arc = Arc::new(Mutex::new(tx.clone()));
let file = from.to_string_lossy().into_owned();
let bucket = to.s3bucket().unwrap().to_owned();
tokio::spawn(async move {
bar.set_style(
ProgressStyle::with_template(PUT_BAR_STYLE)
.unwrap()
.progress_chars("#>-"),
);
bar.set_message(format!(
"Copying {} to bucket {}",
file.bold().color(LOCAL_COLOR),
bucket.color(BUCKET_COLOR).bold()
));
while bar.position() < file_size {
// while !rx.is_closed() {
let received = rx.recv().await.unwrap_or_default();
bar.inc(received);
}
bar.finish_and_clear();
});
// in the following code the PUT calls are executed, during upload the sender (tx_arc) sends chunks of uploaded
// bytes to the spawned task which updates the single ProgressBarMaybe the problem is with the while bar.position() < file_size condition. But every call has its own mpsc pair which send until the upload is done. Somehow the first spawned single bars seem to stop increasing another single bar is spawned.
It is more clear, if I change bar.finish_and_clear() to simply bar.finish(). Then one can see that all despite the last single bar are spawned mulitple times. Of course, there should only occur a single bar for every of the three uploaded files.
Maybe I miss something out and I know there are several async tasks which makes things complicated.

