Skip to content

Commit e9338cb

Browse files
committed
feat: make sure connections are closed after abort if aborted during socket r/w
1 parent 45a4b65 commit e9338cb

File tree

2 files changed

+45
-18
lines changed

2 files changed

+45
-18
lines changed

src/cmap/connection.ts

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -704,21 +704,21 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
704704

705705
const drainEvent = once<void>(this.socket, 'drain', options);
706706
const timeout = options?.timeoutContext?.timeoutForSocketWrite;
707-
if (timeout) {
708-
try {
709-
return await Promise.race([drainEvent, timeout]);
710-
} catch (error) {
711-
let err = error;
712-
if (TimeoutError.is(error)) {
713-
err = new MongoOperationTimeoutError('Timed out at socket write');
714-
this.cleanup(err);
715-
}
716-
throw error;
717-
} finally {
718-
timeout.clear();
707+
const drained = timeout ? Promise.race([drainEvent, timeout]) : drainEvent;
708+
try {
709+
return await drained;
710+
} catch (writeError) {
711+
if (TimeoutError.is(writeError)) {
712+
const timeoutError = new MongoOperationTimeoutError('Timed out at socket write');
713+
this.onError(timeoutError);
714+
throw timeoutError;
715+
} else if (writeError === options.signal?.reason) {
716+
this.onError(writeError);
719717
}
718+
throw writeError;
719+
} finally {
720+
timeout?.clear();
720721
}
721-
return await drainEvent;
722722
}
723723

724724
/**
@@ -748,16 +748,17 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
748748
}
749749
}
750750
} catch (readError) {
751-
const err = readError;
752751
if (TimeoutError.is(readError)) {
753-
const error = new MongoOperationTimeoutError(
752+
const timeoutError = new MongoOperationTimeoutError(
754753
`Timed out during socket read (${readError.duration}ms)`
755754
);
756755
this.dataEvents = null;
757-
this.onError(error);
758-
throw error;
756+
this.onError(timeoutError);
757+
throw timeoutError;
758+
} else if (readError === options.signal?.reason) {
759+
this.onError(readError);
759760
}
760-
throw err;
761+
throw readError;
761762
} finally {
762763
this.dataEvents = null;
763764
this.messageStream.pause();

test/integration/node-specific/abort_signal.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
type AutoEncryptionOptions,
1313
ClientEncryption,
1414
type Collection,
15+
type ConnectionClosedEvent,
1516
type Db,
1617
FindCursor,
1718
ListCollectionsCursor,
@@ -400,21 +401,31 @@ describe('AbortSignal support', () => {
400401
let controller: AbortController;
401402
let signal: AbortSignal;
402403
let cursor: AbstractCursor<{ a: number }>;
404+
let checkedOutId;
405+
const waitForConnectionClosed = async () => {
406+
for await (const [ev] of events.on(client, 'connectionClosed')) {
407+
if ((ev as ConnectionClosedEvent).connectionId === checkedOutId) return ev;
408+
}
409+
};
403410

404411
beforeEach(async function () {
412+
checkedOutId = undefined;
405413
controller = new AbortController();
406414
signal = controller.signal;
407415
cursor = method(filter, { signal });
408416
});
409417

410418
afterEach(async function () {
419+
checkedOutId = undefined;
411420
sinon.restore();
412421
await cursor?.close();
413422
});
414423

415424
it(`rejects ${cursorAPI.toString()}`, async () => {
416425
await db.command({ ping: 1 }, { readPreference: 'primary' }); // fill the connection pool with 1 connection.
426+
const connectionClosed = waitForConnectionClosed();
417427

428+
client.on('connectionCheckedOut', ev => (checkedOutId = ev.connectionId));
418429
const willBeResultBlocked = iterateUntilDocumentOrError(cursor, cursorAPI, args);
419430

420431
for (const [, server] of client.topology.s.servers) {
@@ -435,6 +446,8 @@ describe('AbortSignal support', () => {
435446
const result = await willBeResultBlocked;
436447

437448
expect(result).to.be.instanceOf(DOMException);
449+
450+
await connectionClosed;
438451
});
439452
}
440453

@@ -461,25 +474,38 @@ describe('AbortSignal support', () => {
461474
}
462475
});
463476

477+
checkedOutId = undefined;
464478
controller = new AbortController();
465479
signal = controller.signal;
466480
cursor = method(filter, { signal });
467481
});
468482

483+
let checkedOutId;
484+
const waitForConnectionClosed = async () => {
485+
for await (const [ev] of events.on(client, 'connectionClosed')) {
486+
if ((ev as ConnectionClosedEvent).connectionId === checkedOutId) return ev;
487+
}
488+
};
489+
469490
afterEach(async function () {
491+
checkedOutId = undefined;
470492
await clearFailPoint(this.configuration);
471493
await cursor?.close();
472494
});
473495

474496
it(`rejects ${cursorAPI.toString()}`, async () => {
475497
await db.command({ ping: 1 }, { readPreference: 'primary' }); // fill the connection pool with 1 connection.
498+
const connectionClosed = waitForConnectionClosed();
476499

500+
client.on('connectionCheckedOut', ev => (checkedOutId = ev.connectionId));
477501
client.on('commandStarted', e => e.commandName === cursorName && controller.abort());
478502
const willBeResultBlocked = iterateUntilDocumentOrError(cursor, cursorAPI, args);
479503

480504
const result = await willBeResultBlocked;
481505

482506
expect(result).to.be.instanceOf(DOMException);
507+
508+
await connectionClosed;
483509
});
484510
}
485511

0 commit comments

Comments
 (0)