|
1 | | -use crate::{fetch::Arguments, transport::packetline::read::ProgressAction}; |
2 | | -use std::path::Path; |
3 | | -use std::sync::atomic::{AtomicBool, Ordering}; |
4 | | - |
5 | 1 | use crate::fetch::{ |
6 | 2 | negotiate, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, RefMap, Shallow, Tags, |
7 | 3 | }; |
| 4 | +use crate::{fetch::Arguments, transport::packetline::read::ProgressAction}; |
| 5 | +use gix_features::progress::DynNestedProgress; |
| 6 | +use std::path::Path; |
| 7 | +use std::sync::atomic::{AtomicBool, Ordering}; |
8 | 8 |
|
9 | 9 | /// Perform one fetch operation, relying on a `transport`, right after a [`ref_map`](RefMap::new()) was created so |
10 | 10 | /// it's clear what the remote has. |
@@ -32,7 +32,7 @@ use crate::fetch::{ |
32 | 32 | pub async fn fetch<P, T, E>( |
33 | 33 | ref_map: &RefMap, |
34 | 34 | negotiate: &mut impl Negotiate, |
35 | | - consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut P, &AtomicBool) -> Result<(), E>, |
| 35 | + consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<(), E>, |
36 | 36 | mut progress: P, |
37 | 37 | should_interrupt: &AtomicBool, |
38 | 38 | Context { |
@@ -114,7 +114,7 @@ where |
114 | 114 | let mut rounds = Vec::new(); |
115 | 115 | let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests()); |
116 | 116 | let mut state = negotiate::one_round::State::new(is_stateless); |
117 | | - let reader = 'negotiation: loop { |
| 117 | + let mut reader = 'negotiation: loop { |
118 | 118 | let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1); |
119 | 119 | progress.step(); |
120 | 120 | progress.set_name(format!("negotiate (round {})", rounds.len() + 1)); |
@@ -157,11 +157,37 @@ where |
157 | 157 | } |
158 | 158 |
|
159 | 159 | #[cfg(feature = "async-client")] |
160 | | - let mut reader = crate::futures_lite::io::BlockOn::new(reader); |
| 160 | + let mut rd = crate::futures_lite::io::BlockOn::new(reader); |
| 161 | + #[cfg(not(feature = "async-client"))] |
| 162 | + let mut rd = reader; |
| 163 | + consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?; |
| 164 | + #[cfg(feature = "async-client")] |
| 165 | + { |
| 166 | + reader = rd.into_inner(); |
| 167 | + } |
161 | 168 | #[cfg(not(feature = "async-client"))] |
162 | | - let mut reader = reader; |
163 | | - consume_pack(&mut reader, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?; |
| 169 | + { |
| 170 | + reader = rd; |
| 171 | + } |
| 172 | + |
| 173 | + // Assure the final flush packet is consumed. |
| 174 | + let has_read_to_end = reader.stopped_at().is_some(); |
| 175 | + #[cfg(feature = "async-client")] |
| 176 | + { |
| 177 | + if !has_read_to_end { |
| 178 | + futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink()) |
| 179 | + .await |
| 180 | + .map_err(Error::ReadRemainingBytes)?; |
| 181 | + } |
| 182 | + } |
| 183 | + #[cfg(not(feature = "async-client"))] |
| 184 | + { |
| 185 | + if !has_read_to_end { |
| 186 | + std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?; |
| 187 | + } |
| 188 | + } |
164 | 189 | drop(reader); |
| 190 | + |
165 | 191 | if let Some(shallow_lock) = shallow_lock { |
166 | 192 | if !previous_response.shallow_updates().is_empty() { |
167 | 193 | gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?; |
|
0 commit comments