Skip to content

Commit 1fc2023

Browse files
fix: support abort() before stream is started and support quick abort when retrying (#23)
1 parent c8b2641 commit 1fc2023

File tree

4 files changed

+416
-454
lines changed

4 files changed

+416
-454
lines changed

index.js

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
183183
const requestNo = lastRequestNo++;
184184
const traceId = (contextOptions.traceId) ? `${contextOptions.traceId}:request=${requestNo}` : `request=${requestNo}`;
185185

186+
const abortListener = () => {
187+
resolver.cancel();
188+
};
186189
requestOptions.lookup = (hostname, options, cb) => {
187190
if (typeof options === 'function') {
188191
cb = options;
@@ -222,6 +225,9 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
222225

223226
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_NAME_RESOLVING, {traceId, hostname, family});
224227
resolver[fn](hostname, {ttl: true}, (err, records) => {
228+
if (requestOptions.signal !== undefined) {
229+
requestOptions.signal.removeEventListener('abort', abortListener);
230+
}
225231
if (err) {
226232
cb(err);
227233
} else {
@@ -245,9 +251,7 @@ function request(nodemodule, requestOptions, body, timeoutOptions, contextOption
245251
});
246252
};
247253
if (requestOptions.signal !== undefined) {
248-
requestOptions.signal.addEventListener('abort', () => {
249-
resolver.cancel();
250-
}, { once: true });
254+
requestOptions.signal.addEventListener('abort', abortListener, { once: true });
251255
}
252256
if (timeoutOptions.connectionTimeoutInMilliseconds > 0) {
253257
requestOptions.timeout = timeoutOptions.connectionTimeoutInMilliseconds;
@@ -371,17 +375,19 @@ function retryrequest(nodemodule, requestOptions, body, retryOptions, timeoutOpt
371375
const maxRetryDelayInSeconds = ('maxRetryDelayInSeconds' in retryOptions) ? retryOptions.maxRetryDelayInSeconds : MAX_RETRY_DELAY_IN_SECONDS;
372376
const delayInMilliseconds = Math.min(Math.random() * Math.pow(2, attempt-1), maxRetryDelayInSeconds) * 1000;
373377
contextOptions.emitter?.emit(EVENT_NAME_REQUEST_RETRYING, {traceId: getTraceId(attempt), attempt, delayInMilliseconds, err});
374-
setTimeout(() => {
375-
if (requestOptions.signal) {
376-
if (requestOptions.signal.aborted === true) {
377-
cb(requestOptions.signal.reason);
378-
} else {
379-
req(attempt);
380-
}
381-
} else {
382-
req(attempt);
378+
const abortListener = () => {
379+
clearTimeout(timeoutId);
380+
cb(requestOptions.signal.reason);
381+
};
382+
const timeoutId = setTimeout(() => {
383+
if (requestOptions.signal !== undefined) {
384+
requestOptions.signal.removeEventListener('abort', abortListener);
383385
}
386+
req(attempt);
384387
}, delayInMilliseconds);
388+
if (requestOptions.signal !== undefined) {
389+
requestOptions.signal.addEventListener('abort', abortListener, { once: true });
390+
}
385391
} else {
386392
cb(err);
387393
}
@@ -809,7 +815,9 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
809815
if (aborted === false) {
810816
aborted = true;
811817
Object.values(partsDownloading).forEach(req => req.abort());
812-
stream.destroy(err);
818+
if (stream !== null) {
819+
stream.destroy(err);
820+
}
813821
}
814822
}
815823

@@ -1004,13 +1012,15 @@ exports.download = ({bucket, key, version}, {partSizeInMegabytes, concurrency, r
10041012
},
10051013
readStream: () => {
10061014
if (started === false) {
1015+
started = true;
10071016
stream = new PassThrough();
10081017
start();
10091018
}
10101019
return stream;
10111020
},
10121021
file: (path, cb) => {
10131022
if (started === false) {
1023+
started = true;
10141024
stream = createWriteStream(path);
10151025
start();
10161026
}

0 commit comments

Comments
 (0)