Skip to content

Commit 6f67147

Browse files
HarshaNallurujeremymengdeyaaeldeen
authored
[Event Hubs] Fix memory leak in EHBufferedProducerClient (Azure#26748)
## Issue First reported at Azure#25426 `EventHubBufferedProducerClient` leaks memory when there are no packets to send to event-hubs. ### `EventHubBufferedProducerClient` - EventHubBufferedProducerClient is used to publish events to Event Hub - It does not publish events immediately. Instead, events are buffered so they can be efficiently batched and published when the batch is full or the `maxWaitTimeInMs` has elapsed with no new events enqueued. ## Stress Testing The test sends about 5000 events and stops for a long duration, revealing the memory blowing up. <img src="https://github.com/Azure/azure-sdk-for-js/assets/10452642/b34ef5e5-3fad-467b-80fe-503d2fd34d92" width=500> ## Investigation Studying the heap snapshots, and comparing them at various timestamps during the test run hinted at the issue. The problem is at the `BatchingPartitionChannel#_startPublishLoop` and the `AwaitableQueue#shift` implementations. ### `BatchingPartitionChannel#_startPublishLoop` - Starts the loop that creates batches and sends them to the Event Hub - Runs forever or until the `abortSignal` is invoked ### `AwaitableQueue#shift` - Returns a Promise that will resolve with the next item in the queue. - If there are no items in the queue, the pending Promise stays forever. ## Problem `BatchingPartitionChannel#_startPublishLoop` relies on a while loop to run forever. - The root cause of the leak in the `#_startPublishLoop` is a race between two promises(`AwaitableQueue#shift()` and `delay()`) - `AwaitableQueue#shift()` never settles because there is no activity. - `Promise.race` resolves as soon as one of the promises is fulfilled. - `Promise.race` though resolves, keeps a reference for the pending promise from `AwaitableQueue#shift()`, references get accumulated because of the while loop causing the leak. ## Solution The fix is to abort the pending promise from `AwaitableQueue#shift()` once the race has been won by the other `delay()` promise. ## What's in the PR? ### `@azure/core-util` - Added a helper method `racePromisesAndAbortLosers`, an abstraction that leverages `"Promise.race()"` and aborts the losers of the race as soon as the first promise fulfills. - Changelog ### `@azure/event-hubs` - Updated `BatchingPartitionChannel#_startPublishLoop` to use `racePromisesAndAbortLosers` instead of `Promise.race`. - Updated `AwaitableQueue#shift()` to return a promise that is cancellable so that the pending promise cancels once the first promise from `Promise.race` resolves . ### Moved the stress testing to Azure#27184 --------- Co-authored-by: Jeremy Meng <[email protected]> Co-authored-by: Deyaaeldeen Almahallawi <[email protected]>
1 parent e2d75d2 commit 6f67147

File tree

13 files changed

+262
-98
lines changed

13 files changed

+262
-98
lines changed

sdk/core/core-util/CHANGELOG.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
# Release History
22

3-
## 1.4.1 (Unreleased)
3+
## 1.5.0 (2023-09-21)
44

55
### Features Added
66

7-
### Breaking Changes
8-
9-
### Bugs Fixed
10-
11-
### Other Changes
7+
- Adds helper method `cancelablePromiseRace`, an abstraction that leverages `"promise.race()"` and aborts the losers of the race as soon as the first promise settles.
8+
[PR #26748](https://github.com/Azure/azure-sdk-for-js/pull/26748)
129

1310
## 1.4.0 (2023-08-03)
1411

sdk/core/core-util/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@azure/core-util",
3-
"version": "1.4.1",
3+
"version": "1.5.0",
44
"description": "Core library for shared utility methods",
55
"sdk-type": "client",
66
"main": "dist/index.js",

sdk/core/core-util/review/core-util.api.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@
66

77
import { AbortSignalLike } from '@azure/abort-controller';
88

9+
// @public
10+
export type AbortablePromiseBuilder<T> = (abortOptions: {
11+
abortSignal?: AbortSignalLike;
12+
}) => Promise<T>;
13+
14+
// @public
15+
export interface AbortOptions {
16+
abortErrorMsg?: string;
17+
abortSignal?: AbortSignalLike;
18+
}
19+
20+
// @public
21+
export function cancelablePromiseRace<T extends unknown[]>(abortablePromiseBuilders: AbortablePromiseBuilder<T[number]>[], options?: {
22+
abortSignal?: AbortSignalLike;
23+
}): Promise<T[number]>;
24+
925
// @public
1026
export function computeSha256Hash(content: string, encoding: "base64" | "hex"): Promise<string>;
1127

@@ -16,19 +32,15 @@ export function computeSha256Hmac(key: string, stringToSign: string, encoding: "
1632
export function createAbortablePromise<T>(buildPromise: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void, options?: CreateAbortablePromiseOptions): Promise<T>;
1733

1834
// @public
19-
export interface CreateAbortablePromiseOptions {
20-
abortErrorMsg?: string;
21-
abortSignal?: AbortSignalLike;
35+
export interface CreateAbortablePromiseOptions extends AbortOptions {
2236
cleanupBeforeAbort?: () => void;
2337
}
2438

2539
// @public
2640
export function delay(timeInMs: number, options?: DelayOptions): Promise<void>;
2741

2842
// @public
29-
export interface DelayOptions {
30-
abortErrorMsg?: string;
31-
abortSignal?: AbortSignalLike;
43+
export interface DelayOptions extends AbortOptions {
3244
}
3345

3446
// @public
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import { AbortSignalLike, AbortController } from "@azure/abort-controller";
5+
6+
/**
7+
* Options related to abort controller.
8+
*/
9+
export interface AbortOptions {
10+
/**
11+
* The abortSignal associated with containing operation.
12+
*/
13+
abortSignal?: AbortSignalLike;
14+
/**
15+
* The abort error message associated with containing operation.
16+
*/
17+
abortErrorMsg?: string;
18+
}
19+
20+
/**
21+
* Represents a function that returns a promise that can be aborted.
22+
*/
23+
export type AbortablePromiseBuilder<T> = (abortOptions: {
24+
abortSignal?: AbortSignalLike;
25+
}) => Promise<T>;
26+
27+
/**
28+
* promise.race() wrapper that aborts rest of promises as soon as the first promise settles.
29+
*/
30+
export async function cancelablePromiseRace<T extends unknown[]>(
31+
abortablePromiseBuilders: AbortablePromiseBuilder<T[number]>[],
32+
options?: { abortSignal?: AbortSignalLike }
33+
): Promise<T[number]> {
34+
const aborter = new AbortController();
35+
function abortHandler(): void {
36+
aborter.abort();
37+
}
38+
options?.abortSignal?.addEventListener("abort", abortHandler);
39+
try {
40+
return await Promise.race(
41+
abortablePromiseBuilders.map((p) => p({ abortSignal: aborter.signal }))
42+
);
43+
} finally {
44+
aborter.abort();
45+
options?.abortSignal?.removeEventListener("abort", abortHandler);
46+
}
47+
}

sdk/core/core-util/src/createAbortablePromise.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
4+
import { AbortError } from "@azure/abort-controller";
5+
import { AbortOptions } from "./aborterUtils";
56

67
/**
78
* Options for the createAbortablePromise function.
89
*/
9-
export interface CreateAbortablePromiseOptions {
10+
export interface CreateAbortablePromiseOptions extends AbortOptions {
1011
/** A function to be called if the promise was aborted */
1112
cleanupBeforeAbort?: () => void;
12-
/** An abort signal */
13-
abortSignal?: AbortSignalLike;
14-
/** An abort error message */
15-
abortErrorMsg?: string;
1613
}
1714

1815
/**

sdk/core/core-util/src/delay.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { AbortSignalLike } from "@azure/abort-controller";
4+
import { AbortOptions } from "./aborterUtils";
55
import { createAbortablePromise } from "./createAbortablePromise";
66

77
const StandardAbortMessage = "The delay was aborted.";
88

99
/**
1010
* Options for support abort functionality for the delay method
1111
*/
12-
export interface DelayOptions {
13-
/**
14-
* The abortSignal associated with containing operation.
15-
*/
16-
abortSignal?: AbortSignalLike;
17-
/**
18-
* The abort error message associated with containing operation.
19-
*/
20-
abortErrorMsg?: string;
21-
}
12+
export interface DelayOptions extends AbortOptions {}
2213

2314
/**
2415
* A wrapper for setTimeout that resolves a promise after timeInMs milliseconds.

sdk/core/core-util/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT license.
33

44
export { delay, DelayOptions } from "./delay";
5+
export { AbortOptions, cancelablePromiseRace, AbortablePromiseBuilder } from "./aborterUtils";
56
export { createAbortablePromise, CreateAbortablePromiseOptions } from "./createAbortablePromise";
67
export { getRandomIntegerInclusive } from "./random";
78
export { isObject, UnknownObject } from "./object";
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
import * as sinon from "sinon";
5+
import { AbortController, AbortSignalLike } from "@azure/abort-controller";
6+
import chai from "chai";
7+
import chaiAsPromised from "chai-as-promised";
8+
import { cancelablePromiseRace, createAbortablePromise } from "../../src";
9+
10+
chai.use(chaiAsPromised);
11+
const { assert } = chai;
12+
13+
describe("createAbortablePromise", function () {
14+
let token: ReturnType<typeof setTimeout>;
15+
const delayTime = 2500;
16+
const createPromise = ({
17+
abortSignal,
18+
abortErrorMsg,
19+
}: { abortSignal?: AbortSignalLike; abortErrorMsg?: string } = {}): Promise<unknown> =>
20+
createAbortablePromise(
21+
(resolve) => {
22+
token = setTimeout(resolve, delayTime);
23+
},
24+
{
25+
cleanupBeforeAbort: () => clearTimeout(token),
26+
abortSignal,
27+
abortErrorMsg,
28+
}
29+
);
30+
afterEach(function () {
31+
sinon.restore();
32+
});
33+
34+
it("should resolve if not aborted nor rejected", async function () {
35+
const clock = sinon.useFakeTimers();
36+
const promise = createPromise();
37+
const time = await clock.nextAsync();
38+
clock.restore();
39+
assert.strictEqual(time, delayTime);
40+
await assert.isFulfilled(promise);
41+
});
42+
43+
it("should reject when aborted", async function () {
44+
const aborter = new AbortController();
45+
const abortErrorMsg = "The test operation was aborted.";
46+
const promise = createPromise({
47+
abortSignal: aborter.signal,
48+
abortErrorMsg,
49+
});
50+
aborter.abort();
51+
await assert.isRejected(promise, abortErrorMsg);
52+
});
53+
});
54+
55+
describe("cancelablePromiseRace", function () {
56+
let function1Aborted = false;
57+
let function2Aborted = false;
58+
let function3Aborted = false;
59+
const function1Delay = 100;
60+
let function2Delay = 200;
61+
const function3Delay = 2000; // Default: function1Delay < function2Delay < function3Delay
62+
const function2Message = "function 2 is rejected";
63+
const function3Message = "function 3 is rejected";
64+
65+
const function1 = async (abortOptions: { abortSignal?: AbortSignalLike }): Promise<number> => {
66+
let token: ReturnType<typeof setTimeout>;
67+
return createAbortablePromise(
68+
(resolve) => {
69+
token = setTimeout(resolve, function1Delay);
70+
},
71+
{
72+
cleanupBeforeAbort: () => {
73+
clearTimeout(token);
74+
function1Aborted = true;
75+
},
76+
abortSignal: abortOptions.abortSignal,
77+
}
78+
);
79+
};
80+
81+
const function2 = async (abortOptions: { abortSignal?: AbortSignalLike }): Promise<string> => {
82+
let token: ReturnType<typeof setTimeout>;
83+
return createAbortablePromise(
84+
(reject) => {
85+
token = setTimeout(() => reject(function2Message), function2Delay);
86+
},
87+
{
88+
cleanupBeforeAbort: () => {
89+
clearTimeout(token);
90+
function2Aborted = true;
91+
},
92+
abortSignal: abortOptions.abortSignal,
93+
}
94+
);
95+
};
96+
97+
const function3 = async (abortOptions: { abortSignal?: AbortSignalLike }): Promise<void> => {
98+
let token: ReturnType<typeof setTimeout>;
99+
return createAbortablePromise(
100+
(resolve, reject) => {
101+
token =
102+
Math.random() < 0.5
103+
? setTimeout(resolve, function3Delay)
104+
: setTimeout(() => reject(function3Message), function3Delay);
105+
},
106+
{
107+
cleanupBeforeAbort: () => {
108+
clearTimeout(token);
109+
function3Aborted = true;
110+
},
111+
abortSignal: abortOptions.abortSignal,
112+
}
113+
);
114+
};
115+
116+
afterEach(function () {
117+
// reset to default values
118+
function1Aborted = false;
119+
function2Aborted = false;
120+
function2Delay = 200;
121+
function3Aborted = false;
122+
});
123+
124+
it("should resolve with the first promise that resolves, abort the rest", async function () {
125+
await cancelablePromiseRace<[number, string, void]>([function1, function2, function3]); // 1 finishes first, 2&3 are aborted
126+
assert.isFalse(function1Aborted); // checks 1 is not aborted
127+
assert.isTrue(function2Aborted); // checks 2 is aborted
128+
assert.isTrue(function3Aborted); // checks 3 is aborted
129+
});
130+
131+
it("should reject with the first promise that rejects, abort the rest", async function () {
132+
function2Delay = function1Delay / 2;
133+
assert.strictEqual(
134+
await cancelablePromiseRace<[number, string, void]>([function1, function2, function3]),
135+
function2Message
136+
); // 2 rejects and finishes first, 1&3 are aborted
137+
assert.isTrue(function1Aborted); // checks 1 is aborted
138+
assert.isFalse(function2Aborted); // checks 2 is not aborted
139+
assert.isTrue(function3Aborted); // checks 3 is aborted
140+
});
141+
142+
it("should respect the abort signal supplied", async function () {
143+
const aborter = new AbortController();
144+
setTimeout(() => aborter.abort(), function1Delay / 2);
145+
let errorThrown = false;
146+
try {
147+
await cancelablePromiseRace<[number, string, void]>([function1, function2, function3], {
148+
abortSignal: aborter.signal,
149+
}); // all are aborted
150+
} catch (error) {
151+
errorThrown = true;
152+
assert.strictEqual((error as { message: string }).message, "The operation was aborted.");
153+
}
154+
assert.isTrue(errorThrown);
155+
assert.isTrue(function1Aborted); // checks 1 is aborted
156+
assert.isTrue(function2Aborted); // checks 2 is aborted
157+
assert.isTrue(function3Aborted); // checks 3 is aborted
158+
});
159+
});

sdk/core/core-util/test/public/createAbortablePromise.spec.ts

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)