Skip to content

Commit d3a3489

Browse files
committed
chore: hoist timeout logic into its own method
1 parent 36e5361 commit d3a3489

File tree

1 file changed

+20
-26
lines changed

1 file changed

+20
-26
lines changed

src/subscriber.ts

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,23 @@ export class Subscriber extends EventEmitter {
932932
return AckResponses.Success;
933933
}
934934

935+
async #awaitTimeoutAndCheck(
936+
promise: Promise<void>,
937+
timeout: Duration,
938+
): Promise<void> {
939+
const result = await awaitWithTimeout(promise, timeout);
940+
if (result.exception || result.timedOut) {
941+
// Don't try to deal with errors at this point, just warn-log.
942+
if (result.timedOut === false) {
943+
// This wasn't a timeout.
944+
logs.debug.warn(
945+
'Error during Subscriber.close(): %j',
946+
result.exception,
947+
);
948+
}
949+
}
950+
}
951+
935952
/**
936953
* Closes the subscriber, stopping the reception of new messages and shutting
937954
* down the underlying stream. The behavior of the returned Promise will depend
@@ -987,24 +1004,11 @@ export class Subscriber extends EventEmitter {
9871004
) {
9881005
const waitTimeout = timeout.subtract(finalNackTimeout);
9891006

990-
const emptyPromise = new Promise(r => {
1007+
const emptyPromise = new Promise<void>(r => {
9911008
this._inventory.on('empty', r);
9921009
});
9931010

994-
const resultCompletion = await awaitWithTimeout(
995-
emptyPromise,
996-
waitTimeout,
997-
);
998-
if (resultCompletion.exception || resultCompletion.timedOut) {
999-
// Don't try to deal with errors at this point, just warn-log.
1000-
if (resultCompletion.timedOut === false) {
1001-
// This wasn't a timeout.
1002-
logs.debug.warn(
1003-
'Error during Subscriber.close(): %j',
1004-
resultCompletion.exception,
1005-
);
1006-
}
1007-
}
1011+
await this.#awaitTimeoutAndCheck(emptyPromise, waitTimeout);
10081012
}
10091013

10101014
// Now we head into immediate shutdown mode with what time is left.
@@ -1023,17 +1027,7 @@ export class Subscriber extends EventEmitter {
10231027

10241028
// Wait for user callbacks to complete.
10251029
const flushCompleted = this._waitForFlush();
1026-
const flushResult = await awaitWithTimeout(flushCompleted, timeout);
1027-
if (flushResult.exception || flushResult.timedOut) {
1028-
// Don't try to deal with errors at this point, just warn-log.
1029-
if (flushResult.timedOut === false) {
1030-
// This wasn't a timeout.
1031-
logs.debug.warn(
1032-
'Error during Subscriber.close(): %j',
1033-
flushResult.exception,
1034-
);
1035-
}
1036-
}
1030+
await this.#awaitTimeoutAndCheck(flushCompleted, timeout);
10371031

10381032
// Clean up OTel spans for any remaining messages.
10391033
remaining.forEach(m => {

0 commit comments

Comments
 (0)