Skip to content

Commit 41e2828

Browse files
committed
wip: close sockets on instance closed
1 parent 31ef100 commit 41e2828

File tree

4 files changed

+110
-63
lines changed

4 files changed

+110
-63
lines changed

src/cloud-sql-instance.ts

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ import {RSAKeys} from './rsa-keys';
2424
import {SslCert} from './ssl-cert';
2525
import {getRefreshInterval, isExpirationTimeValid} from './time';
2626
import {AuthTypes} from './auth-types';
27+
import {CloudSQLConnectorError} from './errors';
28+
29+
// Private types that describe exactly the methods
30+
// needed from tls.Socket to be able to close
31+
// sockets when the DNS Name changes.
32+
type EventFn = () => void;
33+
type ClosableSocket = {
34+
destroy: (error?: Error) => void;
35+
once: (name: string, handler: EventFn) => void;
36+
};
2737

2838
interface Fetcher {
2939
getInstanceMetadata({
@@ -45,7 +55,7 @@ interface CloudSQLInstanceOptions {
4555
ipType: IpAddressTypes;
4656
limitRateInterval?: number;
4757
sqlAdminFetcher: Fetcher;
48-
checkDomainInterval?: number
58+
checkDomainInterval?: number;
4959
}
5060

5161
interface RefreshResult {
@@ -78,11 +88,12 @@ export class CloudSQLInstance {
7888
// The ongoing refresh promise is referenced by the `next` property
7989
private next?: Promise<RefreshResult>;
8090
private scheduledRefreshID?: ReturnType<typeof setTimeout> | null = undefined;
81-
private checkDomainID?: ReturnType<typeof setTimeout> | null = undefined;
91+
private checkDomainID?: ReturnType<typeof setInterval> | null = undefined;
8292
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
8393
private throttle?: any;
8494
private closed = false;
85-
private checkDomainInterval:number;
95+
private checkDomainInterval: number;
96+
private sockets = new Set<ClosableSocket>();
8697

8798
public readonly instanceInfo: InstanceConnectionInfo;
8899
public ephemeralCert?: SslCert;
@@ -105,7 +116,7 @@ export class CloudSQLInstance {
105116
this.ipType = options.ipType || IpAddressTypes.PUBLIC;
106117
this.limitRateInterval = options.limitRateInterval || 30 * 1000; // 30 seconds
107118
this.sqlAdminFetcher = options.sqlAdminFetcher;
108-
this.checkDomainInterval = options.checkDomainInterval || 30* 1000;
119+
this.checkDomainInterval = options.checkDomainInterval || 30 * 1000;
109120
}
110121

111122
// p-throttle library has to be initialized in an async scope in order to
@@ -150,7 +161,7 @@ export class CloudSQLInstance {
150161
// Else resolve immediately.
151162
resolve();
152163
}
153-
}, 0);
164+
});
154165
});
155166
}
156167

@@ -160,10 +171,13 @@ export class CloudSQLInstance {
160171
this.next = undefined;
161172
return Promise.reject('closed');
162173
}
163-
if(this?.instanceInfo?.domainName && ! this.checkDomainID){
164-
this.checkDomainID = setInterval(()=>{
165-
this.checkDomainChanged()
166-
}, this.checkDomainInterval || 30*1000)
174+
if (this?.instanceInfo?.domainName && !this.checkDomainID) {
175+
this.checkDomainID = setInterval(
176+
() => {
177+
this.checkDomainChanged();
178+
},
179+
this.checkDomainInterval || 30 * 1000
180+
);
167181
}
168182

169183
const currentRefreshId = this.scheduledRefreshID;
@@ -309,7 +323,7 @@ export class CloudSQLInstance {
309323
// If refresh has not yet started, then cancel the setTimeout
310324
if (this.scheduledRefreshID) {
311325
clearTimeout(this.scheduledRefreshID);
312-
this.scheduledRefreshID = null
326+
this.scheduledRefreshID = null;
313327
}
314328
}
315329

@@ -325,9 +339,17 @@ export class CloudSQLInstance {
325339
close(): void {
326340
this.closed = true;
327341
this.cancelRefresh();
328-
if(this.checkDomainID){
342+
if (this.checkDomainID) {
329343
clearInterval(this.checkDomainID);
330-
this.checkDomainID = null
344+
this.checkDomainID = null;
345+
}
346+
for (const socket of this.sockets) {
347+
socket.destroy(
348+
new CloudSQLConnectorError({
349+
code: 'ERRCLOSED',
350+
message: 'The connector was closed.',
351+
})
352+
);
331353
}
332354
}
333355

@@ -348,4 +370,17 @@ export class CloudSQLInstance {
348370
this.close();
349371
}
350372
}
373+
addSocket(socket: ClosableSocket) {
374+
if (!this.instanceInfo.domainName) {
375+
// This was not connected by domain name. Ignore all sockets.
376+
return;
377+
}
378+
379+
// Add the socket to the list
380+
this.sockets.add(socket);
381+
// When the socket is closed, remove it.
382+
socket.once('closed', () => {
383+
this.sockets.delete(socket);
384+
});
385+
}
351386
}

src/connector.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ export declare interface ConnectionOptions {
4444
ipType?: IpAddressTypes;
4545
instanceConnectionName: string;
4646
domainName?: string;
47-
checkDomainInterval?: number
48-
limitRateInterval?: number
47+
checkDomainInterval?: number;
48+
limitRateInterval?: number;
4949
}
5050

5151
export declare interface SocketConnectionOptions extends ConnectionOptions {
@@ -143,7 +143,7 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
143143
ipType: opts.ipType || IpAddressTypes.PUBLIC,
144144
limitRateInterval: opts.limitRateInterval || 30 * 1000, // 30 sec
145145
sqlAdminFetcher: this.sqlAdminFetcher,
146-
checkDomainInterval: opts.checkDomainInterval
146+
checkDomainInterval: opts.checkDomainInterval,
147147
});
148148
this.set(this.cacheKey(opts), new CacheEntry(promise));
149149

@@ -247,6 +247,9 @@ export class Connector {
247247
tlsSocket.once('secureConnect', async () => {
248248
cloudSqlInstance.setEstablishedConnection();
249249
});
250+
251+
cloudSqlInstance.addSocket(tlsSocket);
252+
250253
return tlsSocket;
251254
}
252255

test/cloud-sql-instance.ts

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ t.test('CloudSQLInstance', async t => {
6767
instanceConnectionName: 'my-project:us-east1:my-instance',
6868
sqlAdminFetcher: fetcher,
6969
});
70-
t.after(() =>instance.close())
70+
t.after(() => instance.close());
7171

7272
t.same(
7373
instance.ephemeralCert.cert,
@@ -116,7 +116,7 @@ t.test('CloudSQLInstance', async t => {
116116
limitRateInterval: 50,
117117
},
118118
});
119-
t.after(() =>instance.close())
119+
t.after(() => instance.close());
120120

121121
await t.rejects(
122122
instance.refresh(),
@@ -137,7 +137,7 @@ t.test('CloudSQLInstance', async t => {
137137
limitRateInterval: 50,
138138
},
139139
});
140-
t.after(() =>instance.close())
140+
t.after(() => instance.close());
141141
instance.refresh = () => {
142142
if (refreshCount === 2) {
143143
const end = Date.now();
@@ -180,7 +180,7 @@ t.test('CloudSQLInstance', async t => {
180180
limitRateInterval: 50,
181181
},
182182
});
183-
t.after(() =>instance.close())
183+
t.after(() => instance.close());
184184
await (() =>
185185
new Promise((res): void => {
186186
let refreshCount = 0;
@@ -237,7 +237,7 @@ t.test('CloudSQLInstance', async t => {
237237
limitRateInterval: 50,
238238
},
239239
});
240-
t.after(() =>instance.close())
240+
t.after(() => instance.close());
241241
await (() =>
242242
new Promise((res): void => {
243243
let refreshCount = 0;
@@ -268,7 +268,7 @@ t.test('CloudSQLInstance', async t => {
268268
limitRateInterval: 50,
269269
},
270270
});
271-
t.after(() =>instance.close())
271+
t.after(() => instance.close());
272272

273273
await instance.refresh();
274274

@@ -307,7 +307,7 @@ t.test('CloudSQLInstance', async t => {
307307
limitRateInterval: 50,
308308
},
309309
});
310-
t.after(() =>instance.close())
310+
t.after(() => instance.close());
311311

312312
let cancelRefreshCalled = false;
313313
let refreshCalled = false;
@@ -345,7 +345,7 @@ t.test('CloudSQLInstance', async t => {
345345
sqlAdminFetcher: fetcher,
346346
},
347347
});
348-
t.after(() =>instance.close())
348+
t.after(() => instance.close());
349349

350350
const start = Date.now();
351351
// starts regular refresh cycle
@@ -385,7 +385,7 @@ t.test('CloudSQLInstance', async t => {
385385
sqlAdminFetcher: fetcher,
386386
},
387387
});
388-
t.after(()=> instance.close())
388+
t.after(() => instance.close());
389389
const start = Date.now();
390390
// starts out refresh logic
391391
let refreshCount = 1;
@@ -433,7 +433,7 @@ t.test('CloudSQLInstance', async t => {
433433
limitRateInterval: 50,
434434
},
435435
});
436-
t.after(() =>instance.close())
436+
t.after(() => instance.close());
437437

438438
// starts a new refresh cycle but do not await on it
439439
instance.refresh();
@@ -461,7 +461,7 @@ t.test('CloudSQLInstance', async t => {
461461
limitRateInterval: 50,
462462
},
463463
});
464-
t.after(() =>instance.close())
464+
t.after(() => instance.close());
465465

466466
// simulates an ongoing instance, already has data
467467
await instance.refresh();
@@ -498,7 +498,7 @@ t.test('CloudSQLInstance', async t => {
498498
limitRateInterval: 50,
499499
},
500500
});
501-
t.after(() =>instance.close())
501+
t.after(() => instance.close());
502502

503503
await instance.refresh();
504504
instance.setEstablishedConnection();
@@ -534,7 +534,7 @@ t.test('CloudSQLInstance', async t => {
534534
limitRateInterval: 50,
535535
},
536536
});
537-
t.after(() =>instance.close())
537+
t.after(() => instance.close());
538538

539539
await instance.refresh();
540540
instance.setEstablishedConnection();
@@ -602,7 +602,7 @@ t.test('CloudSQLInstance', async t => {
602602
limitRateInterval: 0,
603603
},
604604
});
605-
t.after(() =>instance.close())
605+
t.after(() => instance.close());
606606
await (() =>
607607
new Promise((res): void => {
608608
let refreshCount = 0;
@@ -626,5 +626,6 @@ t.test('CloudSQLInstance', async t => {
626626
instance.refresh();
627627
instance.setEstablishedConnection();
628628
}))();
629-
});
629+
}
630+
);
630631
});

test/connector.ts

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,6 @@ t.test('Connector by domain resolves and creates instance', async t => {
639639
connector.close();
640640
});
641641

642-
643642
t.test('Connector by domain resolves and creates instance', async t => {
644643
const th = setupConnectorModule(t);
645644
const connector = new th.Connector();
@@ -681,38 +680,47 @@ t.test('Connector by domain resolves and creates instance', async t => {
681680
connector.close();
682681
});
683682

683+
t.test(
684+
'Connector checks if name changes in background and closes connector',
685+
async t => {
686+
const th = setupConnectorModule(t);
687+
const connector = new th.Connector();
688+
t.after(() => {
689+
connector.close();
690+
});
684691

685-
t.test('Connector checks if name changes in background and closes connector', async t => {
686-
const th = setupConnectorModule(t);
687-
const connector = new th.Connector();
688-
t.after(() => {
689-
connector.close();
690-
});
691-
692-
// Get options loads the instance
693-
await connector.getOptions({
694-
ipType: 'PUBLIC',
695-
authType: 'PASSWORD',
696-
domainName: 'db.example.com',
697-
checkDomainInterval: 10, // 10ms for testing
698-
});
699-
700-
// Ensure there is only one entry.
701-
t.same(connector.instances.size, 1);
702-
const oldInstance = connector.instances.get(
703-
'db.example.com-PASSWORD-PUBLIC'
704-
).instance;
705-
t.same(oldInstance.instanceInfo.domainName, 'db.example.com');
706-
t.same(oldInstance.instanceInfo.instanceId, 'instance');
707-
708-
// getOptions after DNS response changes closes the old instance
709-
// and loads a new one.
710-
th.resolveTxtResponse = 'project:region2:instance2';
711-
await new Promise((res) =>{
712-
setTimeout(res, 50);
713-
})
692+
// Get options loads the instance
693+
await connector.getOptions({
694+
ipType: 'PUBLIC',
695+
authType: 'PASSWORD',
696+
domainName: 'db.example.com',
697+
checkDomainInterval: 10, // 10ms for testing
698+
});
714699

715-
t.same(oldInstance.isClosed(), true, 'old instance is closed');
700+
// Ensure there is only one entry.
701+
t.same(connector.instances.size, 1);
702+
const oldInstance = connector.instances.get(
703+
'db.example.com-PASSWORD-PUBLIC'
704+
).instance;
705+
t.same(oldInstance.instanceInfo.domainName, 'db.example.com');
706+
t.same(oldInstance.instanceInfo.instanceId, 'instance');
707+
708+
// add a mock socket to the old instance
709+
const mockSocket = {
710+
destroyed: false,
711+
once(e:string, fn:()=>void) {},
712+
destroy(){this.destroyed = true}
713+
}
714+
oldInstance.addSocket(mockSocket);
715+
716+
// getOptions after DNS response changes closes the old instance
717+
// and loads a new one.
718+
th.resolveTxtResponse = 'project:region2:instance2';
719+
await new Promise(res => {
720+
setTimeout(res, 50);
721+
});
716722

717-
connector.close();
718-
});
723+
t.same(oldInstance.isClosed(), true, 'old instance is closed');
724+
t.same(mockSocket.destroyed, true, 'old instance closed its sockets');
725+
}
726+
);

0 commit comments

Comments
 (0)