@@ -736,42 +736,57 @@ namespace azure { namespace storage {
736
736
737
737
auto smallest_offset = std::make_shared<utility::size64_t >(target_offset);
738
738
auto condition_variable = std::make_shared<std::condition_variable>();
739
- std::mutex condition_variable_mutex;
739
+ std::mutex condition_variable_mutex;
740
+ std::vector<pplx::task<void >> parallel_tasks;
740
741
for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::transactional_md5_block_size)
741
742
{
742
743
utility::size64_t current_length = protocol::transactional_md5_block_size;
743
744
if (current_offset + current_length > target_offset + target_length)
744
745
{
745
746
current_length = target_offset + target_length - current_offset;
746
747
}
747
- semaphore->lock_async ().then ([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, offset, target, smallest_offset, current_offset, current_length, modified_condition, options, context, timer_handler]()
748
+ auto parallel_task = semaphore->lock_async ().then ([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, offset, target, smallest_offset, current_offset, current_length, modified_condition, options, context, timer_handler]()
748
749
{
750
+ auto sem_unlocker = std::make_shared<std::unique_lock<core::async_semaphore>>(*semaphore, std::adopt_lock);
751
+
749
752
concurrency::streams::container_buffer<std::vector<uint8_t >> buffer;
750
753
auto segment_ostream = buffer.create_ostream ();
751
754
// if transaction MD5 is enabled, it will be checked inside each download_single_range_to_stream_async.
752
- instance->download_single_range_to_stream_async (segment_ostream, current_offset, current_length, modified_condition, options, context, false , timer_handler->get_cancellation_token (), timer_handler)
753
- .then ([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task<void > download_task)
755
+ return instance->download_single_range_to_stream_async (segment_ostream, current_offset, current_length, modified_condition, options, context, false , timer_handler->get_cancellation_token (), timer_handler)
756
+ .then ([buffer, segment_ostream, semaphore, sem_unlocker, condition_variable, &condition_variable_mutex, smallest_offset, offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task<void > download_task)
754
757
{
755
758
segment_ostream.close ().then ([download_task](pplx::task<void > close_task)
756
759
{
757
- download_task.wait ();
760
+ try
761
+ {
762
+ download_task.wait ();
763
+ }
764
+ catch (const std::exception&)
765
+ {
766
+ try
767
+ {
768
+ close_task.wait ();
769
+ }
770
+ catch (...)
771
+ {
772
+ }
773
+ throw ;
774
+ }
758
775
close_task.wait ();
759
776
}).wait ();
760
777
761
- // status of current semaphore.
762
- bool released = false ;
763
778
// target stream is seekable, could write to target stream once the download finished.
764
779
if (target.can_seek ())
765
780
{
766
781
pplx::extensibility::scoped_rw_lock_t guard (mutex);
767
782
target.streambuf ().seekpos (current_offset - offset, std::ios_base::out);
768
783
target.streambuf ().putn_nocopy (buffer.collection ().data (), buffer.collection ().size ()).wait ();
769
784
*smallest_offset += protocol::transactional_md5_block_size;
770
- released = true ;
771
- semaphore->unlock ();
772
785
}
773
786
else
774
787
{
788
+ // status of current semaphore.
789
+ bool released = false ;
775
790
{
776
791
pplx::extensibility::scoped_rw_lock_t guard (mutex);
777
792
if (*smallest_offset == current_offset)
@@ -781,7 +796,7 @@ namespace azure { namespace storage {
781
796
*smallest_offset += protocol::transactional_md5_block_size;
782
797
condition_variable->notify_all ();
783
798
released = true ;
784
- semaphore ->unlock ();
799
+ sem_unlocker ->unlock ();
785
800
}
786
801
}
787
802
if (!released)
@@ -790,7 +805,7 @@ namespace azure { namespace storage {
790
805
if (writer < options.parallelism_factor ())
791
806
{
792
807
released = true ;
793
- semaphore ->unlock ();
808
+ sem_unlocker ->unlock ();
794
809
}
795
810
std::unique_lock<std::mutex> locker (condition_variable_mutex);
796
811
condition_variable->wait (locker, [smallest_offset, current_offset, &mutex]()
@@ -813,15 +828,13 @@ namespace azure { namespace storage {
813
828
}
814
829
condition_variable->notify_all ();
815
830
pplx::details::atomic_decrement (writer);
816
- if (!released)
817
- {
818
- semaphore->unlock ();
819
- }
820
831
}
821
832
}
822
833
});
823
834
});
835
+ parallel_tasks.emplace_back (std::move (parallel_task));
824
836
}
837
+ // Code below is nonsense, becasuse exceptions won't be thrown from wait_all_async.
825
838
// If the cancellation token is canceled, the lock will be in lock status when the exception is thrown, so need to unlock it in case it blocks other async processes
826
839
try
827
840
{
@@ -835,6 +848,32 @@ namespace azure { namespace storage {
835
848
}
836
849
throw ex;
837
850
}
851
+
852
+ pplx::when_all (parallel_tasks.begin (), parallel_tasks.end ()).then ([parallel_tasks](pplx::task<void > wait_all_task)
853
+ {
854
+ try
855
+ {
856
+ wait_all_task.wait ();
857
+ }
858
+ catch (const std::exception&)
859
+ {
860
+ std::for_each (parallel_tasks.begin (), parallel_tasks.end (), [](pplx::task<void > task)
861
+ {
862
+ task.then ([](pplx::task<void > t)
863
+ {
864
+ try
865
+ {
866
+ t.wait ();
867
+ }
868
+ catch (...)
869
+ {
870
+ }
871
+ });
872
+ });
873
+ throw ;
874
+ }
875
+ }).wait ();
876
+
838
877
std::unique_lock<std::mutex> locker (condition_variable_mutex);
839
878
condition_variable->wait (locker, [smallest_offset, &mutex, target_offset, target_length]()
840
879
{
0 commit comments