Skip to content

Commit 3f2d1cd

Browse files
authored
[Event Hubs] Minor edits (Azure#25040)
Simplifies some code and threads the abort signal to more promises in link opening. It is part of my work to simplify link opening logic to get ready to add receiver redirect.
1 parent 83d9b65 commit 3f2d1cd

File tree

5 files changed

+52
-64
lines changed

5 files changed

+52
-64
lines changed

sdk/eventhub/event-hubs/src/connectionContext.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import { EventHubSender } from "./eventHubSender";
3333
import { getRuntimeInfo } from "./util/runtimeInfo";
3434
import { isCredential } from "./util/typeGuards";
3535
import { packageJsonInfo } from "./util/constants";
36+
import { AbortSignalLike } from "@azure/abort-controller";
37+
import { createAbortablePromise } from "@azure/core-util";
3638

3739
/**
3840
* @internal
@@ -77,7 +79,7 @@ export interface ConnectionContext extends ConnectionContextBase {
7779
* An AMQP link cannot be opened if the AMQP connection
7880
* is in the process of closing or disconnecting.
7981
*/
80-
readyToOpenLink(): Promise<void>;
82+
readyToOpenLink(options?: { abortSignal?: AbortSignalLike }): Promise<void>;
8183
/**
8284
* Closes all AMQP links, sessions and connection.
8385
*/
@@ -100,7 +102,7 @@ export interface ConnectionContextInternalMembers extends ConnectionContext {
100102
/**
101103
* Resolves once the context's connection emits a `disconnected` event.
102104
*/
103-
waitForDisconnectedEvent(): Promise<void>;
105+
waitForDisconnectedEvent(options?: { abortSignal?: AbortSignalLike }): Promise<void>;
104106
/**
105107
* Resolves once the connection has finished being reset.
106108
* Connections are reset as part of reacting to a `disconnected` event.
@@ -203,27 +205,27 @@ export namespace ConnectionContext {
203205
// then the rhea connection is in the process of terminating.
204206
return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen());
205207
},
206-
async readyToOpenLink() {
208+
async readyToOpenLink(optionsArg?: { abortSignal?: AbortSignalLike }) {
207209
// Check that the connection isn't in the process of closing.
208210
// This can happen when the idle timeout has been reached but
209211
// the underlying socket is waiting to be destroyed.
210212
if (this.isConnectionClosing()) {
211213
// Wait for the disconnected event that indicates the underlying socket has closed.
212-
await this.waitForDisconnectedEvent();
214+
await this.waitForDisconnectedEvent(optionsArg);
213215
}
214216

215217
// Wait for the connection to be reset.
216218
await this.waitForConnectionReset();
217219
},
218-
waitForDisconnectedEvent() {
219-
return new Promise((resolve) => {
220+
waitForDisconnectedEvent(optionsArg?: { abortSignal?: AbortSignalLike }) {
221+
return createAbortablePromise((resolve) => {
220222
logger.verbose(
221223
`[${this.connectionId}] Attempting to reinitialize connection` +
222224
` but the connection is in the process of closing.` +
223225
` Waiting for the disconnect event before continuing.`
224226
);
225227
this.connection.once(ConnectionEvents.disconnected, resolve);
226-
});
228+
}, optionsArg);
227229
},
228230
waitForConnectionReset() {
229231
// Check if the connection is currently in the process of disconnecting.

sdk/eventhub/event-hubs/src/eventHubReceiver.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ export class EventHubReceiver extends LinkEntity {
415415
);
416416

417417
// Wait for the connectionContext to be ready to open the link.
418-
await this._context.readyToOpenLink();
418+
await this._context.readyToOpenLink({ abortSignal });
419419
await this._negotiateClaim({ setTokenRenewal: false, abortSignal, timeoutInMs });
420420

421421
const receiverOptions: CreateReceiverOptions = {
@@ -466,7 +466,7 @@ export class EventHubReceiver extends LinkEntity {
466466
* Creates the options that need to be specified while creating an AMQP receiver link.
467467
*/
468468
private _createReceiverOptions(options: CreateReceiverOptions): RheaReceiverOptions {
469-
const rcvrOptions: RheaReceiverOptions = {
469+
const receiverOptions: RheaReceiverOptions = {
470470
name: this.name,
471471
autoaccept: true,
472472
source: {
@@ -481,26 +481,26 @@ export class EventHubReceiver extends LinkEntity {
481481
};
482482

483483
if (typeof this.ownerLevel === "number") {
484-
rcvrOptions.properties = {
484+
receiverOptions.properties = {
485485
[Constants.attachEpoch]: types.wrap_long(this.ownerLevel),
486486
};
487487
}
488488

489489
if (this.options.trackLastEnqueuedEventProperties) {
490-
rcvrOptions.desired_capabilities = Constants.enableReceiverRuntimeMetricName;
490+
receiverOptions.desired_capabilities = Constants.enableReceiverRuntimeMetricName;
491491
}
492492

493493
const eventPosition = options.eventPosition || this.eventPosition;
494494
if (eventPosition) {
495495
// Set filter on the receiver if event position is specified.
496496
const filterClause = getEventPositionFilter(eventPosition);
497497
if (filterClause) {
498-
(rcvrOptions.source as any).filter = {
498+
(receiverOptions.source as any).filter = {
499499
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004),
500500
};
501501
}
502502
}
503-
return rcvrOptions;
503+
return receiverOptions;
504504
}
505505

506506
/**
@@ -740,7 +740,5 @@ export function waitForEvents(
740740
.then(() => delay(readIntervalWaitTimeInMs, updatedOptions))
741741
.then(receivedAfterWait),
742742
delay(maxWaitTimeInMs, updatedOptions).then(receivedNone),
743-
]).finally(() => {
744-
aborter.abort();
745-
});
743+
]).finally(() => aborter.abort());
746744
}

sdk/eventhub/event-hubs/src/linkEntity.ts

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ export class LinkEntity {
116116
setTokenRenewal,
117117
timeoutInMs,
118118
}: {
119-
setTokenRenewal: boolean | undefined;
120-
abortSignal: AbortSignalLike | undefined;
119+
setTokenRenewal?: boolean;
120+
abortSignal?: AbortSignalLike;
121121
timeoutInMs: number;
122122
}): Promise<void> {
123123
// Acquire the lock and establish a cbs session if it does not exist on the connection.
@@ -137,12 +137,11 @@ export class LinkEntity {
137137
if (!this._context.cbsSession.isOpen()) {
138138
await defaultCancellableLock.acquire(
139139
this._context.cbsSession.cbsLock,
140-
() => {
141-
return this._context.cbsSession.init({
140+
() =>
141+
this._context.cbsSession.init({
142142
abortSignal,
143143
timeoutInMs: timeoutInMs - (Date.now() - startTime),
144-
});
145-
},
144+
}),
146145
{
147146
abortSignal,
148147
timeoutInMs,
@@ -184,14 +183,11 @@ export class LinkEntity {
184183
);
185184
await defaultCancellableLock.acquire(
186185
this._context.negotiateClaimLock,
187-
() => {
188-
return this._context.cbsSession.negotiateClaim(
189-
this.audience,
190-
tokenObject.token,
191-
tokenType,
192-
{ abortSignal, timeoutInMs: timeoutInMs - (Date.now() - startTime) }
193-
);
194-
},
186+
() =>
187+
this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType, {
188+
abortSignal,
189+
timeoutInMs: timeoutInMs - (Date.now() - startTime),
190+
}),
195191
{
196192
abortSignal,
197193
timeoutInMs: timeoutInMs - (Date.now() - startTime),
@@ -205,7 +201,7 @@ export class LinkEntity {
205201
this.address
206202
);
207203
if (setTokenRenewal) {
208-
await this._ensureTokenRenewal();
204+
this._ensureTokenRenewal();
209205
}
210206
}
211207

@@ -222,24 +218,23 @@ export class LinkEntity {
222218
if (this._tokenRenewalTimer) {
223219
clearTimeout(this._tokenRenewalTimer);
224220
}
225-
this._tokenRenewalTimer = setTimeout(async () => {
226-
try {
227-
await this._negotiateClaim({
221+
this._tokenRenewalTimer = setTimeout(
222+
() =>
223+
this._negotiateClaim({
228224
setTokenRenewal: true,
229-
abortSignal: undefined,
230-
timeoutInMs: getRetryAttemptTimeoutInMs(undefined),
231-
});
232-
} catch (err: any) {
233-
logger.verbose(
234-
"[%s] %s '%s' with address %s, an error occurred while renewing the token: %O",
235-
this._context.connectionId,
236-
this._type,
237-
this.name,
238-
this.address,
239-
err
240-
);
241-
}
242-
}, this._tokenTimeoutInMs);
225+
timeoutInMs: getRetryAttemptTimeoutInMs(),
226+
}).catch((err) =>
227+
logger.verbose(
228+
"[%s] %s '%s' with address %s, an error occurred while renewing the token: %O",
229+
this._context.connectionId,
230+
this._type,
231+
this.name,
232+
this.address,
233+
err
234+
)
235+
),
236+
this._tokenTimeoutInMs
237+
);
243238
logger.verbose(
244239
"[%s] %s '%s' with address %s, has next token renewal in %d milliseconds @(%s).",
245240
this._context.connectionId,
@@ -289,10 +284,6 @@ export class LinkEntity {
289284
* @returns The entity type.
290285
*/
291286
private get _type(): string {
292-
let result = "LinkEntity";
293-
if ((this as any).constructor && (this as any).constructor.name) {
294-
result = (this as any).constructor.name;
295-
}
296-
return result;
287+
return this.constructor.name ?? "LinkEntity";
297288
}
298289
}

sdk/eventhub/event-hubs/src/managementClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ export class ManagementClient extends LinkEntity {
359359
this._mgmtReqResLink.sender.name,
360360
this._mgmtReqResLink.receiver.name
361361
);
362-
await this._ensureTokenRenewal();
362+
this._ensureTokenRenewal();
363363
}
364364
} catch (err: any) {
365365
const translatedError = translate(err);

sdk/eventhub/event-hubs/src/util/retries.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@
22
// Licensed under the MIT license.
33

44
import { Constants, RetryOptions } from "@azure/core-amqp";
5-
import { isDefined } from "@azure/core-util";
65

76
/**
87
* @internal
98
*/
10-
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number {
11-
const timeoutInMs =
12-
!isDefined(retryOptions) ||
13-
typeof retryOptions.timeoutInMs !== "number" ||
14-
!isFinite(retryOptions.timeoutInMs) ||
15-
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs
16-
? Constants.defaultOperationTimeoutInMs
17-
: retryOptions.timeoutInMs;
18-
return timeoutInMs;
9+
export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions = {}): number {
10+
const { timeoutInMs } = retryOptions;
11+
return typeof timeoutInMs !== "number" ||
12+
!isFinite(timeoutInMs) ||
13+
timeoutInMs < Constants.defaultOperationTimeoutInMs
14+
? Constants.defaultOperationTimeoutInMs
15+
: timeoutInMs;
1916
}

0 commit comments

Comments
 (0)