Skip to content

Commit 71aff25

Browse files
committed
wip: close sockets on instance closed
1 parent 072eb1e commit 71aff25

File tree

4 files changed

+98
-60
lines changed

4 files changed

+98
-60
lines changed

src/cloud-sql-instance.ts

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ import {RSAKeys} from './rsa-keys';
2424
import {SslCert} from './ssl-cert';
2525
import {getRefreshInterval, isExpirationTimeValid} from './time';
2626
import {AuthTypes} from './auth-types';
27+
import {CloudSQLConnectorError} from './errors';
28+
29+
// A closable socket has an "end" function that
30+
type EventFn = () => void;
31+
type ClosableSocket = {
32+
destroy: (error?: Error) => void;
33+
once: (name: string, handler: EventFn) => void;
34+
};
2735

2836
interface Fetcher {
2937
getInstanceMetadata({
@@ -45,7 +53,7 @@ interface CloudSQLInstanceOptions {
4553
ipType: IpAddressTypes;
4654
limitRateInterval?: number;
4755
sqlAdminFetcher: Fetcher;
48-
checkDomainInterval?: number
56+
checkDomainInterval?: number;
4957
}
5058

5159
interface RefreshResult {
@@ -82,7 +90,8 @@ export class CloudSQLInstance {
8290
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
8391
private throttle?: any;
8492
private closed = false;
85-
private checkDomainInterval:number;
93+
private checkDomainInterval: number;
94+
private sockets = new Set<ClosableSocket>();
8695

8796
public readonly instanceInfo: InstanceConnectionInfo;
8897
public ephemeralCert?: SslCert;
@@ -105,7 +114,7 @@ export class CloudSQLInstance {
105114
this.ipType = options.ipType || IpAddressTypes.PUBLIC;
106115
this.limitRateInterval = options.limitRateInterval || 30 * 1000; // 30 seconds
107116
this.sqlAdminFetcher = options.sqlAdminFetcher;
108-
this.checkDomainInterval = options.checkDomainInterval || 30* 1000;
117+
this.checkDomainInterval = options.checkDomainInterval || 30 * 1000;
109118
}
110119

111120
// p-throttle library has to be initialized in an async scope in order to
@@ -158,10 +167,13 @@ export class CloudSQLInstance {
158167
this.next = undefined;
159168
return Promise.reject('closed');
160169
}
161-
if(this?.instanceInfo?.domainName && ! this.checkDomainID){
162-
this.checkDomainID = setInterval(()=>{
163-
this.checkDomainChanged()
164-
}, this.checkDomainInterval || 30*1000)
170+
if (this?.instanceInfo?.domainName && !this.checkDomainID) {
171+
this.checkDomainID = setInterval(
172+
() => {
173+
this.checkDomainChanged();
174+
},
175+
this.checkDomainInterval || 30 * 1000
176+
);
165177
}
166178

167179
const currentRefreshId = this.scheduledRefreshID;
@@ -307,7 +319,7 @@ export class CloudSQLInstance {
307319
// If refresh has not yet started, then cancel the setTimeout
308320
if (this.scheduledRefreshID) {
309321
clearTimeout(this.scheduledRefreshID);
310-
this.scheduledRefreshID = null
322+
this.scheduledRefreshID = null;
311323
}
312324
}
313325

@@ -323,9 +335,17 @@ export class CloudSQLInstance {
323335
close(): void {
324336
this.closed = true;
325337
this.cancelRefresh();
326-
if(this.checkDomainID){
338+
if (this.checkDomainID) {
327339
clearInterval(this.checkDomainID);
328-
this.checkDomainID = null
340+
this.checkDomainID = null;
341+
}
342+
for (const socket of this.sockets) {
343+
socket.destroy(
344+
new CloudSQLConnectorError({
345+
code: 'ERRCLOSED',
346+
message: 'The connector was closed.',
347+
})
348+
);
329349
}
330350
}
331351

@@ -346,4 +366,17 @@ export class CloudSQLInstance {
346366
this.close();
347367
}
348368
}
369+
addSocket(socket: ClosableSocket) {
370+
if (!this.instanceInfo.domainName) {
371+
// This was not connected by domain name. Ignore all sockets.
372+
return;
373+
}
374+
375+
// Add the socket to the list
376+
this.sockets.add(socket);
377+
// When the socket is closed, remove it.
378+
socket.once('closed', () => {
379+
this.sockets.delete(socket);
380+
});
381+
}
349382
}

src/connector.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ export declare interface ConnectionOptions {
4444
ipType?: IpAddressTypes;
4545
instanceConnectionName: string;
4646
domainName?: string;
47-
checkDomainInterval?: number
48-
limitRateInterval?: number
47+
checkDomainInterval?: number;
48+
limitRateInterval?: number;
4949
}
5050

5151
export declare interface SocketConnectionOptions extends ConnectionOptions {
@@ -143,7 +143,7 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
143143
ipType: opts.ipType || IpAddressTypes.PUBLIC,
144144
limitRateInterval: opts.limitRateInterval || 30 * 1000, // 30 sec
145145
sqlAdminFetcher: this.sqlAdminFetcher,
146-
checkDomainInterval: opts.checkDomainInterval
146+
checkDomainInterval: opts.checkDomainInterval,
147147
});
148148
this.set(this.cacheKey(opts), new CacheEntry(promise));
149149

@@ -248,6 +248,9 @@ export class Connector {
248248
tlsSocket.once('secureConnect', async () => {
249249
cloudSqlInstance.setEstablishedConnection();
250250
});
251+
252+
cloudSqlInstance.addSocket(tlsSocket);
253+
251254
return tlsSocket;
252255
}
253256

test/cloud-sql-instance.ts

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ t.test('CloudSQLInstance', async t => {
6767
instanceConnectionName: 'my-project:us-east1:my-instance',
6868
sqlAdminFetcher: fetcher,
6969
});
70-
t.after(() =>instance.close())
70+
t.after(() => instance.close());
7171

7272
t.same(
7373
instance.ephemeralCert.cert,
@@ -116,7 +116,7 @@ t.test('CloudSQLInstance', async t => {
116116
limitRateInterval: 50,
117117
},
118118
});
119-
t.after(() =>instance.close())
119+
t.after(() => instance.close());
120120

121121
await t.rejects(
122122
instance.refresh(),
@@ -137,7 +137,7 @@ t.test('CloudSQLInstance', async t => {
137137
limitRateInterval: 50,
138138
},
139139
});
140-
t.after(() =>instance.close())
140+
t.after(() => instance.close());
141141
instance.refresh = () => {
142142
if (refreshCount === 2) {
143143
const end = Date.now();
@@ -180,7 +180,7 @@ t.test('CloudSQLInstance', async t => {
180180
limitRateInterval: 50,
181181
},
182182
});
183-
t.after(() =>instance.close())
183+
t.after(() => instance.close());
184184
await (() =>
185185
new Promise((res): void => {
186186
let refreshCount = 0;
@@ -237,7 +237,7 @@ t.test('CloudSQLInstance', async t => {
237237
limitRateInterval: 50,
238238
},
239239
});
240-
t.after(() =>instance.close())
240+
t.after(() => instance.close());
241241
await (() =>
242242
new Promise((res): void => {
243243
let refreshCount = 0;
@@ -268,7 +268,7 @@ t.test('CloudSQLInstance', async t => {
268268
limitRateInterval: 50,
269269
},
270270
});
271-
t.after(() =>instance.close())
271+
t.after(() => instance.close());
272272

273273
await instance.refresh();
274274

@@ -308,7 +308,7 @@ t.test('CloudSQLInstance', async t => {
308308
limitRateInterval: 50,
309309
},
310310
});
311-
t.after(() =>instance.close())
311+
t.after(() => instance.close());
312312

313313
let cancelRefreshCalled = false;
314314
let refreshCalled = false;
@@ -347,7 +347,7 @@ t.test('CloudSQLInstance', async t => {
347347
sqlAdminFetcher: fetcher,
348348
},
349349
});
350-
t.after(() =>instance.close())
350+
t.after(() => instance.close());
351351

352352
const start = Date.now();
353353
// starts regular refresh cycle
@@ -387,7 +387,7 @@ t.test('CloudSQLInstance', async t => {
387387
sqlAdminFetcher: fetcher,
388388
},
389389
});
390-
t.after(()=> instance.close())
390+
t.after(() => instance.close());
391391
const start = Date.now();
392392
// starts out refresh logic
393393
let refreshCount = 1;
@@ -435,7 +435,7 @@ t.test('CloudSQLInstance', async t => {
435435
limitRateInterval: 50,
436436
},
437437
});
438-
t.after(() =>instance.close())
438+
t.after(() => instance.close());
439439

440440
// starts a new refresh cycle but do not await on it
441441
instance.refresh();
@@ -463,7 +463,7 @@ t.test('CloudSQLInstance', async t => {
463463
limitRateInterval: 50,
464464
},
465465
});
466-
t.after(() =>instance.close())
466+
t.after(() => instance.close());
467467

468468
// simulates an ongoing instance, already has data
469469
await instance.refresh();
@@ -500,7 +500,7 @@ t.test('CloudSQLInstance', async t => {
500500
limitRateInterval: 50,
501501
},
502502
});
503-
t.after(() =>instance.close())
503+
t.after(() => instance.close());
504504

505505
await instance.refresh();
506506
instance.setEstablishedConnection();
@@ -536,7 +536,7 @@ t.test('CloudSQLInstance', async t => {
536536
limitRateInterval: 50,
537537
},
538538
});
539-
t.after(() =>instance.close())
539+
t.after(() => instance.close());
540540

541541
await instance.refresh();
542542
instance.setEstablishedConnection();
@@ -605,7 +605,7 @@ t.test('CloudSQLInstance', async t => {
605605
limitRateInterval: 0,
606606
},
607607
});
608-
t.after(() =>instance.close())
608+
t.after(() => instance.close());
609609
await (() =>
610610
new Promise((res): void => {
611611
let refreshCount = 0;
@@ -629,5 +629,6 @@ t.test('CloudSQLInstance', async t => {
629629
instance.refresh();
630630
instance.setEstablishedConnection();
631631
}))();
632-
});
632+
}
633+
);
633634
});

test/connector.ts

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,6 @@ t.test('Connector by domain resolves and creates instance', async t => {
639639
connector.close();
640640
});
641641

642-
643642
t.test('Connector by domain resolves and creates instance', async t => {
644643
const th = setupConnectorModule(t);
645644
const connector = new th.Connector();
@@ -681,38 +680,40 @@ t.test('Connector by domain resolves and creates instance', async t => {
681680
connector.close();
682681
});
683682

683+
t.test(
684+
'Connector checks if name changes in background and closes connector',
685+
async t => {
686+
const th = setupConnectorModule(t);
687+
const connector = new th.Connector();
688+
t.after(() => {
689+
connector.close();
690+
});
684691

685-
t.test('Connector checks if name changes in background and closes connector', async t => {
686-
const th = setupConnectorModule(t);
687-
const connector = new th.Connector();
688-
t.after(() => {
689-
connector.close();
690-
});
691-
692-
// Get options loads the instance
693-
await connector.getOptions({
694-
ipType: 'PUBLIC',
695-
authType: 'PASSWORD',
696-
domainName: 'db.example.com',
697-
checkDomainInterval: 10, // 10ms for testing
698-
});
699-
700-
// Ensure there is only one entry.
701-
t.same(connector.instances.size, 1);
702-
const oldInstance = connector.instances.get(
703-
'db.example.com-PASSWORD-PUBLIC'
704-
).instance;
705-
t.same(oldInstance.instanceInfo.domainName, 'db.example.com');
706-
t.same(oldInstance.instanceInfo.instanceId, 'instance');
692+
// Get options loads the instance
693+
await connector.getOptions({
694+
ipType: 'PUBLIC',
695+
authType: 'PASSWORD',
696+
domainName: 'db.example.com',
697+
checkDomainInterval: 10, // 10ms for testing
698+
});
707699

708-
// getOptions after DNS response changes closes the old instance
709-
// and loads a new one.
710-
th.resolveTxtResponse = 'project:region2:instance2';
711-
await new Promise((res) =>{
712-
setTimeout(res, 50);
713-
})
700+
// Ensure there is only one entry.
701+
t.same(connector.instances.size, 1);
702+
const oldInstance = connector.instances.get(
703+
'db.example.com-PASSWORD-PUBLIC'
704+
).instance;
705+
t.same(oldInstance.instanceInfo.domainName, 'db.example.com');
706+
t.same(oldInstance.instanceInfo.instanceId, 'instance');
707+
708+
// getOptions after DNS response changes closes the old instance
709+
// and loads a new one.
710+
th.resolveTxtResponse = 'project:region2:instance2';
711+
await new Promise(res => {
712+
setTimeout(res, 50);
713+
});
714714

715-
t.same(oldInstance.isClosed(), true, 'old instance is closed');
715+
t.same(oldInstance.isClosed(), true, 'old instance is closed');
716716

717-
connector.close();
718-
});
717+
connector.close();
718+
}
719+
);

0 commit comments

Comments
 (0)