Skip to content

Commit e987353

Browse files
authored
[Event Hubs] Improve prefetching (Azure#27647)
### Packages impacted by this PR @azure/event-hubs ### Issues associated with this PR Azure#27253 ### Describe the problem that is addressed by this PR Azure#27253 (comment) ### What are the possible designs available to address the problem? If there are more than one possible design, why was the one in this PR chosen? ### Are there test cases added in this PR? _(If not, why?)_ To be tested using stress testing framework. UPDATE: The results are in and it is confirmed there is no more space leak! ### Provide a list of related PRs _(if any)_ Azure#26065 ### Command used to generate this PR:**_(Applicable only to SDK release request PRs)_ ### Checklists - [x] Added impacted package name to the issue description - [ ] Does this PR needs any fixes in the SDK Generator?** _(If so, create an Issue in the [Autorest/typescript](https://github.com/Azure/autorest.typescript) repository and link it here)_ - [x] Added a changelog (if necessary)
1 parent a5db245 commit e987353

File tree

4 files changed

+14
-15
lines changed

4 files changed

+14
-15
lines changed

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Improve event prefetching to not overload the internal queue.
1011

1112
### Other Changes
1213

sdk/eventhub/event-hubs/src/partitionReceiver.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ export interface PartitionReceiver {
9494
interface ConnectOptions {
9595
abortSignal: AbortSignalLike | undefined;
9696
timeoutInMs: number;
97-
prefetchCount: number;
9897
}
9998

10099
interface ReceiverState {
@@ -156,7 +155,7 @@ export function createReceiver(
156155
logger.verbose(`is open? -> ${isOpen}`);
157156
return isOpen;
158157
},
159-
async connect({ abortSignal, timeoutInMs, prefetchCount }: ConnectOptions): Promise<void> {
158+
async connect({ abortSignal, timeoutInMs }: ConnectOptions): Promise<void> {
160159
if (state.isConnecting || obj.isOpen()) {
161160
return;
162161
}
@@ -174,7 +173,6 @@ export function createReceiver(
174173
obj,
175174
state,
176175
queue,
177-
prefetchCount,
178176
eventPosition,
179177
logger,
180178
options,
@@ -203,6 +201,7 @@ export function createReceiver(
203201
maxWaitTimeInSeconds: number = 60,
204202
abortSignal?: AbortSignalLike
205203
) => {
204+
const prefetchCount = options.prefetchCount ?? maxMessageCount * 3;
206205
const cleanupBeforeAbort = (): Promise<void> => {
207206
logger.info(abortLogMessage);
208207
return obj.close();
@@ -224,10 +223,10 @@ export function createReceiver(
224223
.connect({
225224
abortSignal,
226225
timeoutInMs: getRetryAttemptTimeoutInMs(options.retryOptions),
227-
prefetchCount: options.prefetchCount ?? maxMessageCount * 3,
228226
})
229227
.then(() => {
230-
logger.verbose(`setting the wait timer for ${maxWaitTimeInSeconds} seconds`);
228+
addCredits(state.link, Math.max(prefetchCount, maxMessageCount) - queue.length);
229+
logger.verbose(`setting the max wait time to ${maxWaitTimeInSeconds} seconds`);
231230
return waitForEvents(
232231
maxMessageCount,
233232
maxWaitTimeInSeconds * 1000,
@@ -504,7 +503,6 @@ function createRheaOptions(
504503
obj: PartitionReceiver,
505504
state: ReceiverState,
506505
queue: ReceivedEventData[],
507-
prefetchCount: number,
508506
eventPosition: EventPosition,
509507
logger: SimpleLogger,
510508
options: PartitionReceiverOptions
@@ -516,7 +514,7 @@ function createRheaOptions(
516514
source: {
517515
address,
518516
},
519-
credit_window: prefetchCount,
517+
credit_window: 0,
520518
properties: {
521519
[receiverIdPropertyName]: consumerId,
522520
},
@@ -550,7 +548,6 @@ async function setupLink(
550548
obj: PartitionReceiver,
551549
state: ReceiverState,
552550
queue: ReceivedEventData[],
553-
prefetchCount: number,
554551
eventPosition: EventPosition,
555552
logger: SimpleLogger,
556553
options: PartitionReceiverOptions,
@@ -563,7 +560,6 @@ async function setupLink(
563560
obj,
564561
state,
565562
queue,
566-
prefetchCount,
567563
eventPosition,
568564
logger,
569565
options
@@ -577,3 +573,9 @@ async function setupLink(
577573
logger.verbose("is created successfully");
578574
ctx.receivers[name] = obj;
579575
}
576+
577+
function addCredits(receiver: Link | undefined, creditsToAdd: number): void {
578+
if (creditsToAdd > 0) {
579+
receiver?.addCredit(creditsToAdd);
580+
}
581+
}

sdk/eventhub/event-hubs/test/internal/cancellation.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ testWithServiceTypes((serviceVersion) => {
9898
it(`initialize supports cancellation (${caseType})`, async () => {
9999
const abortSignal = getSignal();
100100
try {
101-
await client.connect({ abortSignal, timeoutInMs: 60000, prefetchCount: 1 });
101+
await client.connect({ abortSignal, timeoutInMs: 60000 });
102102
throw new Error(TEST_FAILURE);
103103
} catch (err: any) {
104104
should.equal(err.name, "AbortError");
@@ -119,7 +119,7 @@ testWithServiceTypes((serviceVersion) => {
119119

120120
it(`receiveBatch supports cancellation when connection already exists (${caseType})`, async () => {
121121
// Open the connection.
122-
await client.connect({ abortSignal: undefined, timeoutInMs: 60000, prefetchCount: 1 });
122+
await client.connect({ abortSignal: undefined, timeoutInMs: 60000 });
123123
try {
124124
const abortSignal = getSignal();
125125
await client.receiveBatch(10, undefined, abortSignal);

sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,10 @@ testWithServiceTypes((serviceVersion) => {
131131
await receiver1.connect({
132132
abortSignal: undefined,
133133
timeoutInMs: 60000,
134-
prefetchCount: 1,
135134
});
136135
await receiver2.connect({
137136
abortSignal: undefined,
138137
timeoutInMs: 60000,
139-
prefetchCount: 1,
140138
});
141139

142140
// We are going to override sender1's close method so that it also invokes receiver2's close method.
@@ -199,12 +197,10 @@ testWithServiceTypes((serviceVersion) => {
199197
await receiver1.connect({
200198
abortSignal: undefined,
201199
timeoutInMs: 60000,
202-
prefetchCount: 1,
203200
});
204201
await receiver2.connect({
205202
abortSignal: undefined,
206203
timeoutInMs: 60000,
207-
prefetchCount: 1,
208204
});
209205

210206
// We are going to override sender1's close method so that it also invokes receiver2's close method.

0 commit comments

Comments
 (0)