Skip to content

Commit 8796cc6

Browse files
committed
refactor: Load connection info asynchronously.
wip: make it work with mysql wip: connector test fixes.
1 parent 8ac675a commit 8796cc6

13 files changed

+1568
-201
lines changed

package-lock.json

Lines changed: 830 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
"mysql2": "^3.2.0",
7171
"nock": "^13.3.0",
7272
"pg": "^8.10.0",
73+
"prisma": "^6.4.1",
7374
"tap": "^21.0.0",
7475
"tedious": "^16.1.0",
7576
"typeorm": "^0.3.19",
@@ -84,8 +85,9 @@
8485
},
8586
"dependencies": {
8687
"@googleapis/sqladmin": "^24.0.0",
88+
"@prisma/client": "^6.4.1",
8789
"gaxios": "^6.1.1",
8890
"google-auth-library": "^9.2.0",
8991
"p-throttle": "^7.0.0"
9092
}
91-
}
93+
}

src/cloud-sql-instance.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,12 @@ interface RefreshResult {
6767

6868
export class CloudSQLInstance {
6969
static async getCloudSQLInstance(
70+
instanceName: InstanceConnectionInfo,
7071
options: CloudSQLInstanceOptions
7172
): Promise<CloudSQLInstance> {
7273
const instance = new CloudSQLInstance({
7374
options: options,
74-
instanceInfo: await resolveInstanceName(
75-
options.instanceConnectionName,
76-
options.domainName
77-
),
75+
instanceInfo: instanceName,
7876
});
7977
await instance.refresh();
8078
return instance;

src/connector.ts

Lines changed: 123 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
import {createServer, Server, Socket} from 'node:net';
15+
import {createServer, Server} from 'node:net';
1616
import tls from 'node:tls';
1717
import {promisify} from 'node:util';
1818
import {AuthClient, GoogleAuth} from 'google-auth-library';
@@ -22,6 +22,11 @@ import {IpAddressTypes} from './ip-addresses';
2222
import {AuthTypes} from './auth-types';
2323
import {SQLAdminFetcher} from './sqladmin-fetcher';
2424
import {CloudSQLConnectorError} from './errors';
25+
import {SocketWrapper, SocketWrapperOptions} from './socket-wrapper';
26+
import stream from 'node:stream';
27+
import {resolveInstanceName} from './parse-instance-connection-name';
28+
import {promise} from 'tap';
29+
import {InstanceConnectionInfo} from './instance-connection-info';
2530

2631
// These Socket types are subsets from nodejs definitely typed repo, ref:
2732
// https://github.com/DefinitelyTyped/DefinitelyTyped/blob/ae0fe42ff0e6e820e8ae324acf4f8e944aa1b2b7/types/node/v18/net.d.ts#L437
@@ -53,11 +58,13 @@ export declare interface SocketConnectionOptions extends ConnectionOptions {
5358
}
5459

5560
interface StreamFunction {
56-
(): tls.TLSSocket;
61+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
62+
(...opts: any | undefined): stream.Duplex;
5763
}
5864

5965
interface PromisedStreamFunction {
60-
(): Promise<tls.TLSSocket>;
66+
//eslint-disable-next-line @typescript-eslint/no-explicit-any
67+
(...opts: any | undefined): Promise<stream.Duplex>;
6168
}
6269

6370
// DriverOptions is the interface describing the object returned by
@@ -108,33 +115,36 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
108115
this.sqlAdminFetcher = sqlAdminFetcher;
109116
}
110117

111-
private cacheKey(opts: ConnectionOptions): string {
112-
//TODO: for now, the cache key function must be synchronous.
113-
// When we implement the async connection info from
114-
// https://github.com/GoogleCloudPlatform/cloud-sql-nodejs-connector/pull/426
115-
// then the cache key should contain both the domain name
116-
// and the resolved instance name.
117-
return (
118-
(opts.instanceConnectionName || opts.domainName) +
119-
'-' +
120-
opts.authType +
121-
'-' +
122-
opts.ipType
123-
);
118+
private async cacheKey(instanceName:InstanceConnectionInfo,
119+
opts: ConnectionOptions): Promise<string> {
120+
let key:Array<string>
121+
if(instanceName.domainName) {
122+
key = [instanceName.domainName]
123+
} else {
124+
key = [instanceName.projectId,
125+
instanceName.regionId,
126+
instanceName.instanceId];
127+
128+
}
129+
key.push(String(opts.authType))
130+
key.push(String(opts.ipType))
131+
132+
return key.join('-')
124133
}
125134

126-
async loadInstance(opts: ConnectionOptions): Promise<void> {
135+
async loadInstance(opts: ConnectionOptions): Promise<CloudSQLInstance> {
127136
// in case an instance to that connection name has already
128137
// been setup there's no need to set it up again
129-
const key = this.cacheKey(opts);
138+
const instanceName = await resolveInstanceName(opts.instanceConnectionName, opts.domainName)
139+
const key = await this.cacheKey(instanceName, opts);
130140
const entry = this.get(key);
131141
if (entry) {
132142
if (entry.isResolved()) {
133143
await entry.instance?.checkDomainChanged();
134144
if (!entry.instance?.isClosed()) {
135145
// The instance is open and the domain has not changed.
136146
// use the cached instance.
137-
return;
147+
return entry.promise;
138148
}
139149
} else if (entry.isError()) {
140150
// The instance failed it's initial refresh. Remove it from the
@@ -143,36 +153,26 @@ class CloudSQLInstanceMap extends Map<string, CacheEntry> {
143153
throw entry.err;
144154
} else {
145155
// The instance initial refresh is in progress.
146-
await entry.promise;
147-
return;
156+
return entry.promise;
148157
}
149158
}
150159

160+
151161
// Start the refresh and add a cache entry.
152-
const promise = CloudSQLInstance.getCloudSQLInstance({
162+
const instanceOpts = {
153163
instanceConnectionName: opts.instanceConnectionName,
154164
domainName: opts.domainName,
155165
authType: opts.authType || AuthTypes.PASSWORD,
156166
ipType: opts.ipType || IpAddressTypes.PUBLIC,
157167
limitRateInterval: opts.limitRateInterval || 30 * 1000, // 30 sec
158168
sqlAdminFetcher: this.sqlAdminFetcher,
159169
checkDomainInterval: opts.checkDomainInterval,
160-
});
170+
}
171+
const promise = CloudSQLInstance.getCloudSQLInstance(instanceName, instanceOpts);
161172
this.set(key, new CacheEntry(promise));
162173

163174
// Wait for the cache entry to resolve.
164-
await promise;
165-
}
166-
167-
getInstance(opts: ConnectionOptions): CloudSQLInstance {
168-
const connectionInstance = this.get(this.cacheKey(opts));
169-
if (!connectionInstance || !connectionInstance.instance) {
170-
throw new CloudSQLConnectorError({
171-
message: `Cannot find info for instance: ${opts.instanceConnectionName}`,
172-
code: 'ENOINSTANCEINFO',
173-
});
174-
}
175-
return connectionInstance.instance;
175+
return promise;
176176
}
177177
}
178178

@@ -193,7 +193,7 @@ export class Connector {
193193
private readonly instances: CloudSQLInstanceMap;
194194
private readonly sqlAdminFetcher: SQLAdminFetcher;
195195
private readonly localProxies: Set<Server>;
196-
private readonly sockets: Set<Socket>;
196+
private readonly sockets: Set<stream.Duplex>;
197197

198198
constructor(opts: ConnectorOptions = {}) {
199199
this.sqlAdminFetcher = new SQLAdminFetcher({
@@ -207,69 +207,95 @@ export class Connector {
207207
this.sockets = new Set();
208208
}
209209

210-
// Connector.getOptions is a method that accepts a Cloud SQL instance
211-
// connection name along with the connection type and returns an object
212-
// that can be used to configure a driver to be used with Cloud SQL. e.g:
213-
//
214-
// const connector = new Connector()
215-
// const opts = await connector.getOptions({
216-
// ipType: 'PUBLIC',
217-
// instanceConnectionName: 'PROJECT:REGION:INSTANCE',
218-
// });
219-
// const pool = new Pool(opts)
220-
// const res = await pool.query('SELECT * FROM pg_catalog.pg_tables;')
221-
async getOptions(opts: ConnectionOptions): Promise<DriverOptions> {
222-
const {instances} = this;
223-
await instances.loadInstance(opts);
210+
async connect(opts: ConnectionOptions): Promise<tls.TLSSocket> {
211+
const cloudSqlInstance = await this.instances.loadInstance(opts);
212+
213+
const {
214+
instanceInfo,
215+
ephemeralCert,
216+
host,
217+
port,
218+
privateKey,
219+
serverCaCert,
220+
serverCaMode,
221+
dnsName,
222+
} = cloudSqlInstance;
223+
224+
if (
225+
instanceInfo &&
226+
ephemeralCert &&
227+
host &&
228+
port &&
229+
privateKey &&
230+
serverCaCert
231+
) {
232+
const tlsSocket = getSocket({
233+
instanceInfo,
234+
ephemeralCert,
235+
host,
236+
port,
237+
privateKey,
238+
serverCaCert,
239+
serverCaMode,
240+
dnsName: instanceInfo.domainName || dnsName, // use the configured domain name, or the instance dnsName.
241+
});
242+
tlsSocket.once('error', () => {
243+
cloudSqlInstance.forceRefresh();
244+
});
245+
tlsSocket.once('secureConnect', async () => {
246+
cloudSqlInstance.setEstablishedConnection();
247+
});
248+
return tlsSocket;
249+
}
250+
throw new CloudSQLConnectorError({
251+
message: 'Invalid Cloud SQL Instance info',
252+
code: 'EBADINSTANCEINFO',
253+
});
254+
}
224255

256+
getOptions({
257+
authType = AuthTypes.PASSWORD,
258+
ipType = IpAddressTypes.PUBLIC,
259+
instanceConnectionName,
260+
}: ConnectionOptions): DriverOptions {
261+
// bring 'this' into a closure-scope variable.
262+
//eslint-disable-next-line @typescript-eslint/no-this-alias
263+
const connector = this;
225264
return {
226-
stream() {
227-
const cloudSqlInstance = instances.getInstance(opts);
228-
const {
229-
instanceInfo,
230-
ephemeralCert,
231-
host,
232-
port,
233-
privateKey,
234-
serverCaCert,
235-
serverCaMode,
236-
dnsName,
237-
} = cloudSqlInstance;
238-
239-
if (
240-
instanceInfo &&
241-
ephemeralCert &&
242-
host &&
243-
port &&
244-
privateKey &&
245-
serverCaCert
246-
) {
247-
const tlsSocket = getSocket({
248-
instanceInfo,
249-
ephemeralCert,
250-
host,
251-
port,
252-
privateKey,
253-
serverCaCert,
254-
serverCaMode,
255-
dnsName: instanceInfo.domainName || dnsName, // use the configured domain name, or the instance dnsName.
256-
});
257-
tlsSocket.once('error', () => {
258-
cloudSqlInstance.forceRefresh();
259-
});
260-
tlsSocket.once('secureConnect', async () => {
261-
cloudSqlInstance.setEstablishedConnection();
262-
});
263-
264-
cloudSqlInstance.addSocket(tlsSocket);
265-
266-
return tlsSocket;
265+
stream(opts) {
266+
let host;
267+
let startConnection = false;
268+
if (opts) {
269+
if (opts?.config?.host) {
270+
// Mysql driver passes the host in the options, and expects
271+
// this to start the connection.
272+
host = opts?.config?.host;
273+
startConnection = true;
274+
}
275+
if (opts?.host) {
276+
// Sql Server (Tedious) driver passes host in the options
277+
// this to start the connection.
278+
host = opts?.host;
279+
startConnection = true;
280+
}
281+
} else {
282+
// Postgres driver does not pass options.
283+
// Postgres will call Socket.connect(port,host).
284+
startConnection = false;
267285
}
268286

269-
throw new CloudSQLConnectorError({
270-
message: 'Invalid Cloud SQL Instance info',
271-
code: 'EBADINSTANCEINFO',
272-
});
287+
return new SocketWrapper(
288+
new SocketWrapperOptions({
289+
connector,
290+
host,
291+
startConnection,
292+
connectionConfig: {
293+
authType,
294+
ipType,
295+
instanceConnectionName,
296+
},
297+
})
298+
);
273299
},
274300
};
275301
}
@@ -291,8 +317,8 @@ export class Connector {
291317
instanceConnectionName,
292318
});
293319
return {
294-
async connector() {
295-
return driverOptions.stream();
320+
async connector(opts) {
321+
return driverOptions.stream(opts);
296322
},
297323
// note: the connector handles a secured encrypted connection
298324
// with that in mind, the driver encryption is disabled here

0 commit comments

Comments
 (0)