Skip to content

Commit c85d973

Browse files
authored
[ServiceBus] wait and throw error if sender link is not sendable (Azure#26927)
We already have the logic in `MessageSender._trySend()`. This PR adds the same to `ManagementClient._makeManagementRequest()` so that when the link is not sendable, we wait for one second then check again. A `SendBusyError` is thrown if we still cannot send messages. ### Packages impacted by this PR `@azure/service-bus` ### Issues associated with this PR Azure#26855
1 parent f686957 commit c85d973

File tree

4 files changed

+85
-43
lines changed

4 files changed

+85
-43
lines changed

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
### Other Changes
1212

13+
- Check whether we can send messages before making management requests. [PR #26927](https://github.com/Azure/azure-sdk-for-js/pull/26927)
14+
1315
## 7.9.0 (2023-04-11)
1416

1517
### Bugs Fixed

sdk/servicebus/service-bus/src/core/managementClient.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import {
3737
} from "../serviceBusMessage";
3838
import { LinkEntity, RequestResponseLinkOptions } from "./linkEntity";
3939
import { managementClientLogger, receiverLogger, senderLogger, ServiceBusLogger } from "../log";
40-
import { toBuffer } from "../util/utils";
40+
import { toBuffer, waitForSendable } from "../util/utils";
4141
import {
4242
InvalidMaxMessageCountError,
4343
throwErrorIfConnectionClosed,
@@ -257,12 +257,12 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
257257
this._initLock,
258258
async () => {
259259
managementClientLogger.verbose(
260-
`{this._logPrefix} lock acquired for initializing replyTo address and link`
260+
`${this.logPrefix} lock acquired for initializing replyTo address and link`
261261
);
262262
if (!this.isOpen()) {
263263
this.replyTo = generate_uuid();
264264
managementClientLogger.verbose(
265-
`{this._logPrefix} new replyTo address: ${this.replyTo} generated`
265+
`${this.logPrefix} new replyTo address: ${this.replyTo} generated`
266266
);
267267
}
268268
const { abortSignal } = options ?? {};
@@ -403,6 +403,16 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
403403
}
404404

405405
try {
406+
const { timeoutInMs } = sendRequestOptions;
407+
await waitForSendable(
408+
internalLogger,
409+
this.logPrefix,
410+
this.name,
411+
timeoutInMs ?? Constants.defaultOperationTimeoutInMs,
412+
this.link?.sender,
413+
this.link?.session?.outgoing?.available()
414+
);
415+
406416
return await this.link!.sendRequest(request, sendRequestOptions);
407417
} catch (err: any) {
408418
const translatedError = translateServiceBusError(err);

sdk/servicebus/service-bus/src/core/messageSender.ts

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@ import {
1717
RetryConfig,
1818
RetryOperationType,
1919
RetryOptions,
20-
delay,
2120
retry,
2221
AmqpAnnotatedMessage,
2322
} from "@azure/core-amqp";
2423
import { ServiceBusMessage, toRheaMessage } from "../serviceBusMessage";
2524
import { ConnectionContext } from "../connectionContext";
2625
import { LinkEntity } from "./linkEntity";
27-
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
26+
import { getUniqueName, waitForSendable, waitForTimeoutOrAbortOrResolve } from "../util/utils";
2827
import { throwErrorIfConnectionClosed } from "../util/errors";
2928
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
3029
import { CreateMessageBatchOptions } from "../models";
@@ -205,41 +204,16 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
205204
this.link?.session?.outgoing?.available()
206205
);
207206

208-
let waitTimeForSendable = 1000;
209-
if (!this.link?.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
210-
logger.verbose(
211-
"%s Sender '%s', waiting for 1 second for sender to become sendable",
212-
this.logPrefix,
213-
this.name
214-
);
215-
216-
await delay(waitTimeForSendable);
217-
218-
logger.verbose(
219-
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
220-
this.logPrefix,
221-
this.name,
222-
this.link?.credit,
223-
this.link?.session?.outgoing?.available()
224-
);
225-
} else {
226-
waitTimeForSendable = 0;
227-
}
228-
229-
if (!this.link?.sendable()) {
230-
// let us retry to send the message after some time.
231-
const msg =
232-
`[${this.logPrefix}] Sender "${this.name}", ` +
233-
`cannot send the message right now. Please try later.`;
234-
logger.warning(msg);
235-
const amqpError: AmqpError = {
236-
condition: ErrorNameConditionMapper.SenderBusyError,
237-
description: msg,
238-
};
239-
throw translateServiceBusError(amqpError);
240-
}
207+
const waitingTime = await waitForSendable(
208+
logger,
209+
this.logPrefix,
210+
this.name,
211+
timeoutInMs - timeTakenByInit,
212+
this.link,
213+
this.link?.session?.outgoing?.available()
214+
);
241215

242-
if (timeoutInMs <= timeTakenByInit + waitTimeForSendable) {
216+
if (timeoutInMs <= timeTakenByInit + waitingTime) {
243217
const desc: string =
244218
`${this.logPrefix} Sender "${this.name}" ` +
245219
`with address "${this.address}", was not able to send the message right now, due ` +
@@ -255,7 +229,7 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
255229
try {
256230
const delivery = await this.link!.send(encodedMessage, {
257231
format: sendBatch ? 0x80013700 : 0,
258-
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000,
232+
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitingTime) / 1000,
259233
abortSignal,
260234
});
261235
logger.verbose(

sdk/servicebus/service-bus/src/util/utils.ts

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22
// Licensed under the MIT license.
33

44
import Long from "long";
5-
import { logger, receiverLogger, messageLogger } from "../log";
6-
import { OperationTimeoutError, generate_uuid } from "rhea-promise";
5+
import { logger, receiverLogger, messageLogger, ServiceBusLogger } from "../log";
6+
import { AmqpError, OperationTimeoutError, generate_uuid } from "rhea-promise";
77
import isBuffer from "is-buffer";
88
import { Buffer } from "buffer";
99
import * as Constants from "../util/constants";
1010
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
1111
import { PipelineResponse } from "@azure/core-rest-pipeline";
1212
import { isDefined } from "@azure/core-util";
1313
import { HttpResponse, toHttpResponse } from "./compat";
14-
import { StandardAbortMessage } from "@azure/core-amqp";
14+
import { ErrorNameConditionMapper, StandardAbortMessage, delay } from "@azure/core-amqp";
15+
import { translateServiceBusError } from "../serviceBusError";
1516

1617
// This is the only dependency we have on DOM types, so rather than require
1718
// the DOM lib we can just shim this in.
@@ -654,3 +655,58 @@ export const getHttpResponseOnly = (pipelineResponse: PipelineResponse): HttpRes
654655
* Type with the service versions for the ATOM API.
655656
*/
656657
export type ServiceBusAtomAPIVersion = "2021-05" | "2017-04";
658+
659+
/**
660+
* @internal
661+
* Waits for one second if a sender is not sendable then check again. Throws
662+
* SenderBusyError if it is still not sendable.
663+
* Only waits when operation timeout is greater than one second.
664+
* @returns the actual waiting time.
665+
*/
666+
export async function waitForSendable(
667+
sendLogger: ServiceBusLogger,
668+
logPrefix: string,
669+
name: string,
670+
timeout: number,
671+
sender:
672+
| {
673+
sendable: () => boolean;
674+
credit: number;
675+
}
676+
| undefined,
677+
outgoingAvaiable: number
678+
): Promise<number> {
679+
let waitTimeForSendable = 1000;
680+
if (!sender?.sendable() && timeout > waitTimeForSendable) {
681+
sendLogger.verbose(
682+
"%s Sender '%s', waiting for 1 second for sender to become sendable",
683+
logPrefix,
684+
name
685+
);
686+
687+
await delay(waitTimeForSendable);
688+
689+
sendLogger.verbose(
690+
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
691+
logPrefix,
692+
name,
693+
sender?.credit,
694+
outgoingAvaiable
695+
);
696+
} else {
697+
waitTimeForSendable = 0;
698+
}
699+
700+
if (!sender?.sendable()) {
701+
// let us retry to send the message after some time.
702+
const msg =
703+
`[${logPrefix}] Sender "${name}", ` + `cannot send the message right now. Please try later.`;
704+
sendLogger.warning(msg);
705+
const amqpError: AmqpError = {
706+
condition: ErrorNameConditionMapper.SenderBusyError,
707+
description: msg,
708+
};
709+
throw translateServiceBusError(amqpError);
710+
}
711+
return waitTimeForSendable;
712+
}

0 commit comments

Comments
 (0)