Skip to content

Commit c624619

Browse files
authored
fix: memory leak and deadlock due to error event in multiplexed session (#2477)
* fix: memory leak and deadlock due to error event in multiplexed session * review comments
1 parent 28b0b05 commit c624619

File tree

6 files changed

+372
-250
lines changed

6 files changed

+372
-250
lines changed

observability-test/helper.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ export const waitingSessionsEvents = [
3636
'Using Session',
3737
];
3838

39-
export const cacheSessionEvents = [
40-
'Cache hit: has usable multiplexed session',
41-
'Acquired multiplexed session',
42-
];
39+
export const cacheSessionEvents = ['Cache hit: has usable multiplexed session'];
4340

4441
/**
4542
* This utility exists as a test helper because mocha has builtin "context"

src/multiplexed-session.ts

Lines changed: 80 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ import {
2525
startTrace,
2626
} from './instrument';
2727

28-
export const MUX_SESSION_AVAILABLE = 'mux-session-available';
29-
export const MUX_SESSION_CREATE_ERROR = 'mux-session-create-error';
30-
3128
/**
3229
* Interface for implementing multiplexed session logic, it should extend the
3330
* {@link https://nodejs.org/api/events.html|EventEmitter} class
@@ -72,30 +69,35 @@ export class MultiplexedSession
7269
// frequency to create new mux session
7370
refreshRate: number;
7471
_multiplexedSession: Session | null;
75-
_refreshHandle!: NodeJS.Timer;
72+
_refreshHandle!: NodeJS.Timeout;
7673
_observabilityOptions?: ObservabilityOptions;
74+
// shared promise lock to handle concurrent session creation requests
75+
_sharedMuxSessionWaitPromise: Promise<void> | null;
7776
constructor(database: Database) {
7877
super();
7978
this.database = database;
8079
// default frequency is 7 days
8180
this.refreshRate = 7;
8281
this._multiplexedSession = null;
8382
this._observabilityOptions = database._observabilityOptions;
83+
this._sharedMuxSessionWaitPromise = null;
8484
}
8585

8686
/**
8787
* Creates a new multiplexed session and manages its maintenance.
8888
*
8989
* This method initiates the session creation process by calling the `_createSession` method, which returns a Promise.
90+
*
91+
* Errors are silently swallowed here to prevent unhandled promise rejections or application crashes during background operations.
9092
*/
9193
createSession(): void {
9294
this._createSession()
9395
.then(() => {
9496
this._maintain();
9597
})
96-
.catch(err => {
97-
this.emit('error', err);
98-
});
98+
// Ignore errors here. If this fails, the next user request will
99+
// automatically trigger a retry via `_getSession`.
100+
.catch(err => {});
99101
}
100102

101103
/**
@@ -104,39 +106,51 @@ export class MultiplexedSession
104106
* This method sends a request to the database to create a new session with multiplexing enabled.
105107
* The response from the database would be an array, the first value of the array will be containing the multiplexed session.
106108
*
107-
* @returns {Promise<void>} A Promise that resolves when the session has been successfully created and assigned, an event
108-
* `mux-session-available` will be emitted to signal that the session is ready.
109-
*
110-
* In case of error, an error will get emitted along with the error event.
109+
* @returns {Promise<void>} Resolves when the session is successfully created.
110+
* @throws {Error} If the request fails, the error is thrown to the caller.
111111
*
112112
* @private
113113
*/
114114
async _createSession(): Promise<void> {
115-
const traceConfig = {
116-
opts: this._observabilityOptions,
117-
dbName: this.database.formattedName_,
115+
// If a session is already being created, just wait for it to complete.
116+
if (this._sharedMuxSessionWaitPromise) {
117+
return this._sharedMuxSessionWaitPromise;
118+
}
119+
120+
// Define the async task that performs the actual session creation and tracing.
121+
const task = async () => {
122+
const traceConfig = {
123+
opts: this._observabilityOptions,
124+
dbName: this.database.formattedName_,
125+
};
126+
return startTrace(
127+
'MultiplexedSession.createSession',
128+
traceConfig,
129+
async span => {
130+
span.addEvent('Requesting a multiplexed session');
131+
try {
132+
const [createSessionResponse] = await this.database.createSession({
133+
multiplexed: true,
134+
});
135+
this._multiplexedSession = createSessionResponse;
136+
span.addEvent('Created a multiplexed session');
137+
} catch (e) {
138+
setSpanError(span, e as Error);
139+
throw e;
140+
} finally {
141+
span.end();
142+
}
143+
},
144+
);
118145
};
119-
return startTrace(
120-
'MultiplexedSession.createSession',
121-
traceConfig,
122-
async span => {
123-
span.addEvent('Requesting a multiplexed session');
124-
try {
125-
const [createSessionResponse] = await this.database.createSession({
126-
multiplexed: true,
127-
});
128-
this._multiplexedSession = createSessionResponse;
129-
span.addEvent('Created a multiplexed session');
130-
this.emit(MUX_SESSION_AVAILABLE);
131-
} catch (e) {
132-
setSpanError(span, e as Error);
133-
this.emit(MUX_SESSION_CREATE_ERROR, e);
134-
throw e;
135-
} finally {
136-
span.end();
137-
}
138-
},
139-
);
146+
147+
// Assign the running task to the shared promise variable, and ensure
148+
// the lock is released when it completes.
149+
this._sharedMuxSessionWaitPromise = task().finally(() => {
150+
this._sharedMuxSessionWaitPromise = null;
151+
});
152+
153+
return this._sharedMuxSessionWaitPromise;
140154
}
141155

142156
/**
@@ -150,18 +164,26 @@ export class MultiplexedSession
150164
* and ignored. This is because the currently active multiplexed session has a 30-day expiry, providing
151165
* the maintainer with four opportunities (one every 7 days) to refresh the active session.
152166
*
167+
* Hence, if the `_createSession` fails here, the system will either simply retry at the next interval or
168+
* upon the next user request if the session expires.
169+
*
153170
* @returns {void} This method does not return any value.
154171
*
155172
*/
156173
_maintain(): void {
174+
// If a maintenance loop is already running, stop it first.
175+
// This prevents creating duplicate intervals if _maintain is called multiple times.
176+
if (this._refreshHandle) {
177+
clearInterval(this._refreshHandle);
178+
}
157179
const refreshRate = this.refreshRate! * 24 * 60 * 60000;
158180
this._refreshHandle = setInterval(async () => {
159-
try {
160-
await this._createSession();
161-
} catch (err) {
162-
return;
163-
}
181+
await this._createSession().catch(() => {});
164182
}, refreshRate);
183+
184+
// Unreference the timer so it does not prevent the Node.js process from exiting.
185+
// If the application has finished all other work, this background timer shouldn't
186+
// force the process to stay open.
165187
this._refreshHandle.unref();
166188
}
167189

@@ -174,7 +196,7 @@ export class MultiplexedSession
174196
*
175197
*/
176198
getSession(callback: GetSessionCallback): void {
177-
this._acquire().then(
199+
this._getSession().then(
178200
session =>
179201
callback(
180202
null,
@@ -185,34 +207,17 @@ export class MultiplexedSession
185207
);
186208
}
187209

188-
/**
189-
* Acquires a session asynchronously.
190-
*
191-
* Once a session is successfully acquired, it returns the session object (which may be `null` if unsuccessful).
192-
*
193-
* @returns {Promise<Session | null>}
194-
* A Promise that resolves with the acquired session (or `null` if no session is available after retries).
195-
*
196-
*/
197-
async _acquire(): Promise<Session | null> {
198-
const span = getActiveOrNoopSpan();
199-
const session = await this._getSession();
200-
span.addEvent('Acquired multiplexed session');
201-
return session;
202-
}
203-
204210
/**
205211
* Attempts to get a session, waiting for it to become available if necessary.
206212
*
207-
* Waits for the `MUX_SESSION_AVAILABLE` event or for the `MUX_SESSION_CREATE_ERROR`
208-
* to be emitted if the multiplexed session is not yet available. The method listens
209-
* for these events, and once `mux-session-available` is emitted, it resolves and returns
210-
* the session.
213+
* Logic Flow:
214+
* 1. Cache Hit: If a session exists, return it immediately.
215+
* 2. Join Wait: If another request is currently creating the session (`_sharedMuxSessionWaitPromise` exists), await it.
216+
* 3. Create: If neither, initiate a new creation request (`_createSession`).
211217
*
212-
* In case of an error, the promise will get rejected and the error will get bubble up to the parent method.
218+
* @throws {Error} In case of an error, the promise will get rejected and the error will get bubble up to the parent method.
213219
*
214-
* @returns {Promise<Session | null>} A promise that resolves with the current multiplexed session if available,
215-
* or `null` if the session is not available.
220+
* @returns {Promise<Session | null>} A promise that resolves with the active multiplexed session.
216221
*
217222
* @private
218223
*
@@ -225,35 +230,20 @@ export class MultiplexedSession
225230
return this._multiplexedSession;
226231
}
227232

228-
// Define event and promises to wait for the session to become available or for the error
229233
span.addEvent('Waiting for a multiplexed session to become available');
230-
let removeAvailableListener: Function;
231-
let removeErrorListener: Function;
232-
const promises = [
233-
new Promise((_, reject) => {
234-
this.once(MUX_SESSION_CREATE_ERROR, reject);
235-
removeErrorListener = this.removeListener.bind(
236-
this,
237-
MUX_SESSION_CREATE_ERROR,
238-
reject,
239-
);
240-
}),
241-
new Promise(resolve => {
242-
this.once(MUX_SESSION_AVAILABLE, resolve);
243-
removeAvailableListener = this.removeListener.bind(
244-
this,
245-
MUX_SESSION_AVAILABLE,
246-
resolve,
247-
);
248-
}),
249-
];
250234

251-
try {
252-
await Promise.race(promises);
253-
} finally {
254-
removeAvailableListener!();
255-
removeErrorListener!();
235+
// If initialization is ALREADY in progress, join the existing line!
236+
if (this._sharedMuxSessionWaitPromise) {
237+
await this._sharedMuxSessionWaitPromise;
238+
} else {
239+
// If the session is null, and nobody is currently initializing it
240+
// It means a previous attempt failed and we are in a "Dead" state
241+
// We must kickstart the process again
242+
await this._createSession();
256243
}
244+
245+
span.addEvent('Acquired multiplexed session');
246+
257247
// Return the multiplexed session after it becomes available
258248
return this._multiplexedSession;
259249
}

src/session-factory.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,29 +131,34 @@ export class SessionFactory
131131
parent: database,
132132
id: name,
133133
} as {} as ServiceObjectConfig);
134+
// initialize regular pool
134135
this.pool_ =
135136
typeof poolOptions === 'function'
136137
? new (poolOptions as SessionPoolConstructor)(database, null)
137138
: new SessionPool(database, poolOptions);
139+
140+
// initialize multiplexed session
138141
this.multiplexedSession_ = new MultiplexedSession(database);
139-
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
140142
this.multiplexedSession_.createSession();
143+
141144
// set the isMultiplexed property to false if multiplexed session is disabled, otherwise set the property to true
142-
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'false'
143-
? (this.isMultiplexed = false)
144-
: (this.isMultiplexed = true);
145+
this.isMultiplexed = !(
146+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'false'
147+
);
148+
145149
// set the isMultiplexedPartitionedOps property to false if multiplexed session is disabled for paritioned ops, otherwise set the property to true
146-
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'false' &&
147-
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS ===
148-
'false'
149-
? (this.isMultiplexedPartitionOps = false)
150-
: (this.isMultiplexedPartitionOps = true);
150+
this.isMultiplexedPartitionOps = !(
151+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'false' &&
152+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS ===
153+
'false'
154+
);
151155

152156
// set the isMultiplexedRW property to false if multiplexed session is disabled for read/write, otherwise set the property to true
153-
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'false' &&
154-
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW === 'false'
155-
? (this.isMultiplexedRW = false)
156-
: (this.isMultiplexedRW = true);
157+
this.isMultiplexedRW = !(
158+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'false' &&
159+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW === 'false'
160+
);
161+
157162
// Regular sessions should only be created if mux is disabled.
158163
if (!this.isMultiplexed) {
159164
this.pool_.on('error', this.emit.bind(database, 'error'));

test/database.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -375,18 +375,6 @@ describe('Database', () => {
375375
});
376376
});
377377

378-
it('should re-emit Multiplexed Session errors', done => {
379-
const error = new Error('err');
380-
381-
const sessionFactory = new SessionFactory(database, NAME);
382-
383-
database.on('error', err => {
384-
assert.strictEqual(err, error);
385-
done();
386-
});
387-
sessionFactory.multiplexedSession_?.emit('error', error);
388-
});
389-
390378
it('should inherit from ServiceObject', done => {
391379
const options = {};
392380

0 commit comments

Comments
 (0)