Skip to content

Commit 2f65a7b

Browse files
committed
refactor: Load connection info asynchronously.
wip: make it work with mysql wip: connector test fixes.
1 parent ae5d4d4 commit 2f65a7b

13 files changed

+1574
-201
lines changed

package-lock.json

Lines changed: 830 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
"mysql2": "^3.2.0",
7171
"nock": "^13.3.0",
7272
"pg": "^8.10.0",
73+
"prisma": "^6.4.1",
7374
"tap": "^21.0.0",
7475
"tedious": "^16.1.0",
7576
"typeorm": "^0.3.19",
@@ -84,8 +85,9 @@
8485
},
8586
"dependencies": {
8687
"@googleapis/sqladmin": "^24.0.0",
88+
"@prisma/client": "^6.4.1",
8789
"gaxios": "^6.1.1",
8890
"google-auth-library": "^9.2.0",
8991
"p-throttle": "^7.0.0"
9092
}
91-
}
93+
}

src/cloud-sql-instance.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,12 @@ interface RefreshResult {
6767

6868
export class CloudSQLInstance {
6969
static async getCloudSQLInstance(
70+
instanceName: InstanceConnectionInfo,
7071
options: CloudSQLInstanceOptions
7172
): Promise<CloudSQLInstance> {
7273
const instance = new CloudSQLInstance({
7374
options: options,
74-
instanceInfo: await resolveInstanceName(
75-
options.instanceConnectionName,
76-
options.domainName
77-
),
75+
instanceInfo: instanceName,
7876
});
7977
await instance.refresh();
8078
return instance;

src/connector.ts

Lines changed: 130 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
import {createServer, Server, Socket} from 'node:net';
15+
import {createServer, Server} from 'node:net';
1616
import tls from 'node:tls';
1717
import {promisify} from 'node:util';
1818
import {AuthClient, GoogleAuth} from 'google-auth-library';
@@ -22,6 +22,10 @@ import {IpAddressTypes} from './ip-addresses';
2222
import {AuthTypes} from './auth-types';
2323
import {SQLAdminFetcher} from './sqladmin-fetcher';
2424
import {CloudSQLConnectorError} from './errors';
25+
import {SocketWrapper, SocketWrapperOptions} from './socket-wrapper';
26+
import stream from 'node:stream';
27+
import {resolveInstanceName} from './parse-instance-connection-name';
28+
import {InstanceConnectionInfo} from './instance-connection-info';
2529

2630
// These Socket types are subsets from nodejs definitely typed repo, ref:
2731
// https://github.com/DefinitelyTyped/DefinitelyTyped/blob/ae0fe42ff0e6e820e8ae324acf4f8e944aa1b2b7/types/node/v18/net.d.ts#L437
@@ -53,11 +57,13 @@ export declare interface SocketConnectionOptions extends ConnectionOptions {
5357
}
5458

5559
interface StreamFunction {
56-
(): tls.TLSSocket;
60+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
61+
(...opts: any | undefined): stream.Duplex;
5762
}
5863

5964
interface PromisedStreamFunction {
60-
(): Promise<tls.TLSSocket>;
65+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
66+
(...opts: any | undefined): Promise<stream.Duplex>;
6167
}
6268

6369
// DriverOptions is the interface describing the object returned by
@@ -108,33 +114,42 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
108114
this.sqlAdminFetcher = sqlAdminFetcher;
109115
}
110116

111-
private cacheKey(opts: ConnectionOptions): string {
112-
//TODO: for now, the cache key function must be synchronous.
113-
// When we implement the async connection info from
114-
// https://github.com/GoogleCloudPlatform/cloud-sql-nodejs-connector/pull/426
115-
// then the cache key should contain both the domain name
116-
// and the resolved instance name.
117-
return (
118-
(opts.instanceConnectionName || opts.domainName) +
119-
'-' +
120-
opts.authType +
121-
'-' +
122-
opts.ipType
123-
);
117+
private async cacheKey(
118+
instanceName: InstanceConnectionInfo,
119+
opts: ConnectionOptions
120+
): Promise<string> {
121+
let key: Array<string>;
122+
if (instanceName.domainName) {
123+
key = [instanceName.domainName];
124+
} else {
125+
key = [
126+
instanceName.projectId,
127+
instanceName.regionId,
128+
instanceName.instanceId,
129+
];
130+
}
131+
key.push(String(opts.authType));
132+
key.push(String(opts.ipType));
133+
134+
return key.join('-');
124135
}
125136

126-
async loadInstance(opts: ConnectionOptions): Promise<void> {
137+
async loadInstance(opts: ConnectionOptions): Promise<CloudSQLInstance> {
127138
// in case an instance to that connection name has already
128139
// been setup there's no need to set it up again
129-
const key = this.cacheKey(opts);
140+
const instanceName = await resolveInstanceName(
141+
opts.instanceConnectionName,
142+
opts.domainName
143+
);
144+
const key = await this.cacheKey(instanceName, opts);
130145
const entry = this.get(key);
131146
if (entry) {
132147
if (entry.isResolved()) {
133148
await entry.instance?.checkDomainChanged();
134149
if (!entry.instance?.isClosed()) {
135150
// The instance is open and the domain has not changed.
136151
// use the cached instance.
137-
return;
152+
return entry.promise;
138153
}
139154
} else if (entry.isError()) {
140155
// The instance failed it's initial refresh. Remove it from the
@@ -143,36 +158,28 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
143158
throw entry.err;
144159
} else {
145160
// The instance initial refresh is in progress.
146-
await entry.promise;
147-
return;
161+
return entry.promise;
148162
}
149163
}
150164

151165
// Start the refresh and add a cache entry.
152-
const promise = CloudSQLInstance.getCloudSQLInstance({
166+
const instanceOpts = {
153167
instanceConnectionName: opts.instanceConnectionName,
154168
domainName: opts.domainName,
155169
authType: opts.authType || AuthTypes.PASSWORD,
156170
ipType: opts.ipType || IpAddressTypes.PUBLIC,
157171
limitRateInterval: opts.limitRateInterval || 30 * 1000, // 30 sec
158172
sqlAdminFetcher: this.sqlAdminFetcher,
159173
checkDomainInterval: opts.checkDomainInterval,
160-
});
174+
};
175+
const promise = CloudSQLInstance.getCloudSQLInstance(
176+
instanceName,
177+
instanceOpts
178+
);
161179
this.set(key, new CacheEntry(promise));
162180

163181
// Wait for the cache entry to resolve.
164-
await promise;
165-
}
166-
167-
getInstance(opts: ConnectionOptions): CloudSQLInstance {
168-
const connectionInstance = this.get(this.cacheKey(opts));
169-
if (!connectionInstance || !connectionInstance.instance) {
170-
throw new CloudSQLConnectorError({
171-
message: `Cannot find info for instance: ${opts.instanceConnectionName}`,
172-
code: 'ENOINSTANCEINFO',
173-
});
174-
}
175-
return connectionInstance.instance;
182+
return promise;
176183
}
177184
}
178185

@@ -193,7 +200,7 @@ export class Connector {
193200
private readonly instances: CloudSQLInstanceMap;
194201
private readonly sqlAdminFetcher: SQLAdminFetcher;
195202
private readonly localProxies: Set<Server>;
196-
private readonly sockets: Set<Socket>;
203+
private readonly sockets: Set<stream.Duplex>;
197204

198205
constructor(opts: ConnectorOptions = {}) {
199206
this.sqlAdminFetcher = new SQLAdminFetcher({
@@ -207,69 +214,95 @@ export class Connector {
207214
this.sockets = new Set();
208215
}
209216

210-
// Connector.getOptions is a method that accepts a Cloud SQL instance
211-
// connection name along with the connection type and returns an object
212-
// that can be used to configure a driver to be used with Cloud SQL. e.g:
213-
//
214-
// const connector = new Connector()
215-
// const opts = await connector.getOptions({
216-
// ipType: 'PUBLIC',
217-
// instanceConnectionName: 'PROJECT:REGION:INSTANCE',
218-
// });
219-
// const pool = new Pool(opts)
220-
// const res = await pool.query('SELECT * FROM pg_catalog.pg_tables;')
221-
async getOptions(opts: ConnectionOptions): Promise<DriverOptions> {
222-
const {instances} = this;
223-
await instances.loadInstance(opts);
217+
async connect(opts: ConnectionOptions): Promise<tls.TLSSocket> {
218+
const cloudSqlInstance = await this.instances.loadInstance(opts);
219+
220+
const {
221+
instanceInfo,
222+
ephemeralCert,
223+
host,
224+
port,
225+
privateKey,
226+
serverCaCert,
227+
serverCaMode,
228+
dnsName,
229+
} = cloudSqlInstance;
230+
231+
if (
232+
instanceInfo &&
233+
ephemeralCert &&
234+
host &&
235+
port &&
236+
privateKey &&
237+
serverCaCert
238+
) {
239+
const tlsSocket = getSocket({
240+
instanceInfo,
241+
ephemeralCert,
242+
host,
243+
port,
244+
privateKey,
245+
serverCaCert,
246+
serverCaMode,
247+
dnsName: instanceInfo.domainName || dnsName, // use the configured domain name, or the instance dnsName.
248+
});
249+
tlsSocket.once('error', () => {
250+
cloudSqlInstance.forceRefresh();
251+
});
252+
tlsSocket.once('secureConnect', async () => {
253+
cloudSqlInstance.setEstablishedConnection();
254+
});
255+
return tlsSocket;
256+
}
257+
throw new CloudSQLConnectorError({
258+
message: 'Invalid Cloud SQL Instance info',
259+
code: 'EBADINSTANCEINFO',
260+
});
261+
}
224262

263+
getOptions({
264+
authType = AuthTypes.PASSWORD,
265+
ipType = IpAddressTypes.PUBLIC,
266+
instanceConnectionName,
267+
}: ConnectionOptions): DriverOptions {
268+
// bring 'this' into a closure-scope variable.
269+
//eslint-disable-next-line @typescript-eslint/no-this-alias
270+
const connector = this;
225271
return {
226-
stream() {
227-
const cloudSqlInstance = instances.getInstance(opts);
228-
const {
229-
instanceInfo,
230-
ephemeralCert,
231-
host,
232-
port,
233-
privateKey,
234-
serverCaCert,
235-
serverCaMode,
236-
dnsName,
237-
} = cloudSqlInstance;
238-
239-
if (
240-
instanceInfo &&
241-
ephemeralCert &&
242-
host &&
243-
port &&
244-
privateKey &&
245-
serverCaCert
246-
) {
247-
const tlsSocket = getSocket({
248-
instanceInfo,
249-
ephemeralCert,
250-
host,
251-
port,
252-
privateKey,
253-
serverCaCert,
254-
serverCaMode,
255-
dnsName: instanceInfo.domainName || dnsName, // use the configured domain name, or the instance dnsName.
256-
});
257-
tlsSocket.once('error', () => {
258-
cloudSqlInstance.forceRefresh();
259-
});
260-
tlsSocket.once('secureConnect', async () => {
261-
cloudSqlInstance.setEstablishedConnection();
262-
});
263-
264-
cloudSqlInstance.addSocket(tlsSocket);
265-
266-
return tlsSocket;
272+
stream(opts) {
273+
let host;
274+
let startConnection = false;
275+
if (opts) {
276+
if (opts?.config?.host) {
277+
// Mysql driver passes the host in the options, and expects
278+
// this to start the connection.
279+
host = opts?.config?.host;
280+
startConnection = true;
281+
}
282+
if (opts?.host) {
283+
// Sql Server (Tedious) driver passes host in the options
284+
// this to start the connection.
285+
host = opts?.host;
286+
startConnection = true;
287+
}
288+
} else {
289+
// Postgres driver does not pass options.
290+
// Postgres will call Socket.connect(port,host).
291+
startConnection = false;
267292
}
268293

269-
throw new CloudSQLConnectorError({
270-
message: 'Invalid Cloud SQL Instance info',
271-
code: 'EBADINSTANCEINFO',
272-
});
294+
return new SocketWrapper(
295+
new SocketWrapperOptions({
296+
connector,
297+
host,
298+
startConnection,
299+
connectionConfig: {
300+
authType,
301+
ipType,
302+
instanceConnectionName,
303+
},
304+
})
305+
);
273306
},
274307
};
275308
}
@@ -291,8 +324,8 @@ export class Connector {
291324
instanceConnectionName,
292325
});
293326
return {
294-
async connector() {
295-
return driverOptions.stream();
327+
async connector(opts) {
328+
return driverOptions.stream(opts);
296329
},
297330
// note: the connector handles a secured encrypted connection
298331
// with that in mind, the driver encryption is disabled here

0 commit comments

Comments
 (0)