Skip to content

Commit 5930c26

Browse files
authored
Revert "http: limit concurrent file downloads (#548)"
This reverts commit 5e23d25.
1 parent 76c76c2 commit 5930c26

File tree

6 files changed

+59
-134
lines changed

6 files changed

+59
-134
lines changed

src/main/java/me/itzg/helpers/http/ConcurrencyLimiter.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/main/java/me/itzg/helpers/http/ConcurrencyLimiterImpl.java

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/main/java/me/itzg/helpers/http/FetchBuilderBase.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,6 @@ protected URI uri() {
147147
return state.uri;
148148
}
149149

150-
protected ConcurrencyLimiter getConcurrencyLimiter() {
151-
return state.sharedFetch != null ? state.sharedFetch.getConcurrencyLimiter()
152-
: ConcurrencyLimiter.NOOP_LIMITER;
153-
}
154-
155150
public Set<String> getAcceptContentTypes() {
156151
return state.acceptContentTypes;
157152
}

src/main/java/me/itzg/helpers/http/SharedFetch.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ public class SharedFetch implements AutoCloseable {
3333

3434
private final HttpClient reactiveClient;
3535

36-
private final ConcurrencyLimiter concurrencyLimiter;
37-
3836
public SharedFetch(String forCommand, Options options) {
3937
final String userAgent = String.format("%s/%s/%s (cmd=%s)",
4038
"itzg",
@@ -76,8 +74,6 @@ public SharedFetch(String forCommand, Options options) {
7674
);
7775

7876
headers.put("x-fetch-session", fetchSessionId);
79-
80-
concurrencyLimiter = new ConcurrencyLimiterImpl(options.getConcurrentFileDownloads());
8177
}
8278

8379
public FetchBuilderBase<?> fetch(URI uri) {
@@ -120,9 +116,6 @@ public static class Options {
120116
@Default
121117
private final Duration pendingAcquireTimeout = Duration.ofSeconds(120);
122118

123-
@Default
124-
private final int concurrentFileDownloads = 10;
125-
126119
private final Map<String,String> extraHeaders;
127120

128121
public Options withHeader(String key, String value) {
@@ -131,7 +124,7 @@ public Options withHeader(String key, String value) {
131124
newHeaders.put(key, value);
132125

133126
return new Options(
134-
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout, concurrentFileDownloads,
127+
responseTimeout, tlsHandshakeTimeout, maxIdleTimeout, pendingAcquireTimeout,
135128
newHeaders
136129
);
137130
}

src/main/java/me/itzg/helpers/http/SharedFetchArgs.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,6 @@ public void setPendingAcquireTimeout(Duration timeout) {
4848
optionsBuilder.pendingAcquireTimeout(timeout);
4949
}
5050

51-
@Option(names = "--concurrent-file-downloads", defaultValue = "${env:FETCH_CONCURRENT_FILE_DOWNLOADS:-10}",
52-
paramLabel = "COUNT",
53-
description = "The maximum number of concurrent file downloads. Default: ${DEFAULT-VALUE}"
54-
)
55-
public void setConcurrentFileDownloads(int concurrentFileDownloads) {
56-
optionsBuilder.concurrentFileDownloads(concurrentFileDownloads);
57-
}
58-
5951
public Options options() {
6052
return optionsBuilder.build();
6153
}

src/main/java/me/itzg/helpers/http/SpecificFileFetchBuilder.java

Lines changed: 58 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static java.lang.System.currentTimeMillis;
66
import static java.util.Objects.requireNonNull;
77

8-
import io.netty.handler.codec.http.HttpHeaders;
98
import io.netty.handler.codec.http.HttpResponseStatus;
109
import java.io.IOException;
1110
import java.net.URI;
@@ -18,8 +17,6 @@
1817
import me.itzg.helpers.errors.GenericException;
1918
import me.itzg.helpers.files.ReactiveFileUtils;
2019
import reactor.core.publisher.Mono;
21-
import reactor.netty.ByteBufFlux;
22-
import reactor.netty.http.client.HttpClientResponse;
2320

2421
@Slf4j
2522
@Accessors(fluent = true)
@@ -68,78 +65,69 @@ public Mono<Path> assemble() {
6865
final boolean useIfModifiedSince = skipUpToDate && Files.exists(file);
6966

7067
return useReactiveClient(client ->
71-
getConcurrencyLimiter().limit(
72-
client
73-
.doOnRequest((httpClientRequest, connection) ->
74-
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file)
75-
)
76-
.headers(headers ->
77-
setupHeaders(headers, useIfModifiedSince)
78-
)
79-
.followRedirect(true)
80-
.doOnRequest(debugLogRequest(log, "file fetch"))
81-
.get()
82-
.uri(uri)
83-
.response((resp, byteBufFlux) ->
84-
processResponse(resp, byteBufFlux, useIfModifiedSince, uri)
85-
)
86-
.last()
87-
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
88-
)
89-
);
90-
}
91-
92-
private Mono<Path> processResponse(HttpClientResponse resp, ByteBufFlux byteBufFlux, boolean useIfModifiedSince, URI uri) {
93-
final HttpResponseStatus status = resp.status();
94-
95-
if (useIfModifiedSince && status == NOT_MODIFIED) {
96-
log.debug("The file {} is already up to date", file);
97-
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file);
98-
return Mono.just(file);
99-
}
100-
101-
if (notSuccess(resp)) {
102-
return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file");
103-
}
104-
105-
if (notExpectedContentType(resp)) {
106-
return failedContentTypeMono(resp);
107-
}
108-
109-
return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
110-
.flatMap(fileSize -> {
111-
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
112-
downloadedHandler.call(uri, file, fileSize);
113-
return Mono
114-
.deferContextual(contextView -> {
115-
if (log.isDebugEnabled()) {
116-
final long durationMillis =
117-
currentTimeMillis() - contextView.<Long>get("downloadStart");
118-
log.debug("Download of {} took {} at {}",
119-
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
68+
client
69+
.doOnRequest((httpClientRequest, connection) ->
70+
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri, file)
71+
)
72+
.headers(headers -> {
73+
if (useIfModifiedSince) {
74+
try {
75+
final FileTime lastModifiedTime;
76+
lastModifiedTime = Files.getLastModifiedTime(file);
77+
headers.set(
78+
IF_MODIFIED_SINCE,
79+
httpDateTimeFormatter.format(lastModifiedTime.toInstant())
12080
);
81+
} catch (IOException e) {
82+
throw new GenericException("Unable to get last modified time of " + file, e);
12183
}
122-
return Mono.just(file);
123-
});
124-
});
125-
}
12684

127-
private void setupHeaders(HttpHeaders headers, boolean useIfModifiedSince) {
128-
if (useIfModifiedSince) {
129-
try {
130-
final FileTime lastModifiedTime;
131-
lastModifiedTime = Files.getLastModifiedTime(file);
132-
headers.set(
133-
IF_MODIFIED_SINCE,
134-
httpDateTimeFormatter.format(lastModifiedTime.toInstant())
135-
);
136-
} catch (IOException e) {
137-
throw new GenericException("Unable to get last modified time of " + file, e);
138-
}
85+
}
13986

140-
}
87+
applyHeaders(headers);
88+
})
89+
.followRedirect(true)
90+
.doOnRequest(debugLogRequest(log, "file fetch"))
91+
.get()
92+
.uri(uri)
93+
.response((resp, byteBufFlux) -> {
94+
final HttpResponseStatus status = resp.status();
14195

142-
applyHeaders(headers);
96+
if (useIfModifiedSince && status == NOT_MODIFIED) {
97+
log.debug("The file {} is already up to date", file);
98+
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri, file);
99+
return Mono.just(file);
100+
}
101+
102+
if (notSuccess(resp)) {
103+
return failedRequestMono(resp, byteBufFlux.aggregate(), "Trying to retrieve file");
104+
}
105+
106+
if (notExpectedContentType(resp)) {
107+
return failedContentTypeMono(resp);
108+
}
109+
110+
return ReactiveFileUtils.copyByteBufFluxToFile(byteBufFlux, file)
111+
.flatMap(fileSize -> {
112+
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri, file);
113+
downloadedHandler.call(uri, file, fileSize);
114+
return Mono
115+
.deferContextual(contextView -> {
116+
if (log.isDebugEnabled()) {
117+
final long durationMillis =
118+
currentTimeMillis() - contextView.<Long>get("downloadStart");
119+
log.debug("Download of {} took {} at {}",
120+
uri, formatDuration(durationMillis), transferRate(durationMillis, fileSize)
121+
);
122+
}
123+
return Mono.just(file);
124+
});
125+
});
126+
127+
})
128+
.last()
129+
.contextWrite(context -> context.put("downloadStart", currentTimeMillis()))
130+
);
143131
}
144132

145133
}

0 commit comments

Comments
 (0)