Skip to content

Commit b97db24

Browse files
apietroni51magne
andauthored
260 refactoring: manage connections inside pool (#264)
* 260 refactoring: manage connections inside pool * refactor: unique map for connections and clarification of connection purpose * fixed arguments in pool method * chore: renaming variable --------- Co-authored-by: magne <[email protected]>
1 parent 3a25656 commit b97db24

File tree

4 files changed

+88
-94
lines changed

4 files changed

+88
-94
lines changed

src/client.ts

Lines changed: 51 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export class Client {
6767
private consumers = new Map<string, ConsumerMappedValue>()
6868
private publishers = new Map<string, PublisherMappedValue>()
6969
private compressions = new Map<CompressionType, Compression>()
70-
private connection: Connection
70+
private locatorConnection: Connection
7171
private pool: ConnectionPool = new ConnectionPool()
7272

7373
private constructor(
@@ -76,20 +76,19 @@ export class Client {
7676
) {
7777
this.compressions.set(CompressionType.None, NoneCompression.create())
7878
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
79-
this.connection = this.getLocatorConnection()
80-
this.connection.incrRefCount()
79+
this.locatorConnection = this.getLocatorConnection()
8180
}
8281

8382
getCompression(compressionType: CompressionType) {
84-
return this.connection.getCompression(compressionType)
83+
return this.locatorConnection.getCompression(compressionType)
8584
}
8685

8786
registerCompression(compression: Compression) {
88-
this.connection.registerCompression(compression)
87+
this.locatorConnection.registerCompression(compression)
8988
}
9089

9190
public start(): Promise<Client> {
92-
return this.connection.start().then(
91+
return this.locatorConnection.start().then(
9392
(_res) => {
9493
return this
9594
},
@@ -104,26 +103,18 @@ export class Client {
104103
this.logger.info(`${this.id} Closing client...`)
105104
if (this.publisherCounts()) {
106105
this.logger.info(`Stopping all producers...`)
107-
await this.closeAllPublishers(true)
106+
await this.closeAllPublishers()
108107
}
109108
if (this.consumerCounts()) {
110109
this.logger.info(`Stopping all consumers...`)
111-
await this.closeAllConsumers(true)
112-
}
113-
this.connection.decrRefCount()
114-
await this.closeConnectionIfUnused(this.connection, params)
115-
}
116-
117-
private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) {
118-
if (connection.refCount <= 0) {
119-
this.pool.removeCachedConnection(this.connection)
120-
await this.connection.close({ ...params, manuallyClose: true })
110+
await this.closeAllConsumers()
121111
}
112+
await this.locatorConnection.close({ ...params, manuallyClose: true })
122113
}
123114

124115
public async queryMetadata(params: QueryMetadataParams): Promise<StreamMetadata[]> {
125116
const { streams } = params
126-
const res = await this.connection.sendAndWait<MetadataResponse>(new MetadataRequest({ streams }))
117+
const res = await this.locatorConnection.sendAndWait<MetadataResponse>(new MetadataRequest({ streams }))
127118
if (!res.ok) {
128119
throw new Error(`Query Metadata command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
129120
}
@@ -135,7 +126,7 @@ export class Client {
135126

136127
public async queryPartitions(params: QueryPartitionsParams): Promise<string[]> {
137128
const { superStream } = params
138-
const res = await this.connection.sendAndWait<PartitionsResponse>(new PartitionsQuery({ superStream }))
129+
const res = await this.locatorConnection.sendAndWait<PartitionsResponse>(new PartitionsQuery({ superStream }))
139130
if (!res.ok) {
140131
throw new Error(`Query Partitions command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
141132
}
@@ -158,7 +149,7 @@ export class Client {
158149
}
159150
let lastPublishingId = 0n
160151
if (streamPublisherParams.publisherRef) {
161-
lastPublishingId = await this.connection.queryPublisherSequence({
152+
lastPublishingId = await this.locatorConnection.queryPublisherSequence({
162153
stream: streamPublisherParams.stream,
163154
publisherRef: streamPublisherParams.publisherRef,
164155
})
@@ -178,7 +169,7 @@ export class Client {
178169
public async deletePublisher(extendedPublisherId: string) {
179170
const { publisher, connection } = this.publishers.get(extendedPublisherId) ?? {
180171
publisher: undefined,
181-
connection: this.connection,
172+
connection: this.locatorConnection,
182173
}
183174
const publisherId = extractPublisherId(extendedPublisherId)
184175
const res = await connection.sendAndWait<DeletePublisherResponse>(new DeletePublisherRequest(publisherId))
@@ -241,7 +232,7 @@ export class Client {
241232
}
242233

243234
const consumerId = extractConsumerId(extendedConsumerId)
244-
const { streamInfos } = await this.connection.sendAndWait<MetadataResponse>(
235+
const { streamInfos } = await this.locatorConnection.sendAndWait<MetadataResponse>(
245236
new MetadataRequest({ streams: [activeConsumer.consumer.streamName] })
246237
)
247238
if (streamInfos.length > 0 && streamExists(streamInfos[0])) {
@@ -280,16 +271,16 @@ export class Client {
280271
}
281272

282273
public queryOffset(params: QueryOffsetParams) {
283-
return this.connection.queryOffset(params)
274+
return this.locatorConnection.queryOffset(params)
284275
}
285276

286-
private async closeAllConsumers(manuallyClose: boolean) {
287-
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose)))
277+
private async closeAllConsumers() {
278+
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close()))
288279
this.consumers = new Map<string, ConsumerMappedValue>()
289280
}
290281

291-
private async closeAllPublishers(manuallyClose: boolean) {
292-
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose)))
282+
private async closeAllPublishers() {
283+
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(true)))
293284
this.publishers = new Map<string, PublisherMappedValue>()
294285
}
295286

@@ -307,7 +298,7 @@ export class Client {
307298

308299
public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise<true> {
309300
this.logger.debug(`Create Stream...`)
310-
const res = await this.connection.sendAndWait<CreateStreamResponse>(new CreateStreamRequest(params))
301+
const res = await this.locatorConnection.sendAndWait<CreateStreamResponse>(new CreateStreamRequest(params))
311302
if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) {
312303
return true
313304
}
@@ -321,7 +312,7 @@ export class Client {
321312

322313
public async deleteStream(params: { stream: string }): Promise<true> {
323314
this.logger.debug(`Delete Stream...`)
324-
const res = await this.connection.sendAndWait<DeleteStreamResponse>(new DeleteStreamRequest(params.stream))
315+
const res = await this.locatorConnection.sendAndWait<DeleteStreamResponse>(new DeleteStreamRequest(params.stream))
325316
if (!res.ok) {
326317
throw new Error(`Delete Stream command returned error with code ${res.code}`)
327318
}
@@ -349,7 +340,7 @@ export class Client {
349340
numberOfPartitions,
350341
bindingKeys
351342
)
352-
const res = await this.connection.sendAndWait<CreateSuperStreamResponse>(
343+
const res = await this.locatorConnection.sendAndWait<CreateSuperStreamResponse>(
353344
new CreateSuperStreamRequest({ ...params, partitions, bindingKeys: streamBindingKeys })
354345
)
355346
if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) {
@@ -371,7 +362,7 @@ export class Client {
371362
}
372363

373364
this.logger.debug(`Delete Super Stream...`)
374-
const res = await this.connection.sendAndWait<DeleteSuperStreamResponse>(
365+
const res = await this.locatorConnection.sendAndWait<DeleteSuperStreamResponse>(
375366
new DeleteSuperStreamRequest(params.streamName)
376367
)
377368
if (!res.ok) {
@@ -382,7 +373,7 @@ export class Client {
382373
}
383374

384375
public async streamStatsRequest(streamName: string) {
385-
const res = await this.connection.sendAndWait<StreamStatsResponse>(new StreamStatsRequest(streamName))
376+
const res = await this.locatorConnection.sendAndWait<StreamStatsResponse>(new StreamStatsRequest(streamName))
386377
if (!res.ok) {
387378
throw new Error(`Stream Stats command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
388379
}
@@ -391,24 +382,24 @@ export class Client {
391382
}
392383

393384
public getConnectionInfo(): ConnectionInfo {
394-
return this.connection.getConnectionInfo()
385+
return this.locatorConnection.getConnectionInfo()
395386
}
396387

397388
public async subscribe(params: SubscribeParams): Promise<SubscribeResponse> {
398-
const res = await this.connection.sendAndWait<SubscribeResponse>(new SubscribeRequest({ ...params }))
389+
const res = await this.locatorConnection.sendAndWait<SubscribeResponse>(new SubscribeRequest({ ...params }))
399390
if (!res.ok) {
400391
throw new Error(`Subscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
401392
}
402393
return res
403394
}
404395

405396
public async restart() {
406-
this.logger.info(`Restarting client connection ${this.connection.connectionId}`)
397+
this.logger.info(`Restarting client connection ${this.locatorConnection.connectionId}`)
407398
const uniqueConnectionIds = new Set<string>()
408-
uniqueConnectionIds.add(this.connection.connectionId)
399+
uniqueConnectionIds.add(this.locatorConnection.connectionId)
409400

410401
await wait(5000)
411-
await this.connection.restart()
402+
await this.locatorConnection.restart()
412403

413404
for (const { consumer, connection, params } of this.consumers.values()) {
414405
if (!uniqueConnectionIds.has(connection.connectionId)) {
@@ -431,19 +422,19 @@ export class Client {
431422
}
432423

433424
public get maxFrameSize() {
434-
return this.connection.maxFrameSize ?? DEFAULT_FRAME_MAX
425+
return this.locatorConnection.maxFrameSize ?? DEFAULT_FRAME_MAX
435426
}
436427

437428
public get serverVersions() {
438-
return this.connection.serverVersions
429+
return this.locatorConnection.serverVersions
439430
}
440431

441432
public get rabbitManagementVersion() {
442-
return this.connection.rabbitManagementVersion
433+
return this.locatorConnection.rabbitManagementVersion
443434
}
444435

445436
public async routeQuery(params: { routingKey: string; superStream: string }) {
446-
const res = await this.connection.sendAndWait<RouteResponse>(new RouteQuery(params))
437+
const res = await this.locatorConnection.sendAndWait<RouteResponse>(new RouteQuery(params))
447438
if (!res.ok) {
448439
throw new Error(`Route Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
449440
}
@@ -452,7 +443,7 @@ export class Client {
452443
}
453444

454445
public async partitionsQuery(params: { superStream: string }) {
455-
const res = await this.connection.sendAndWait<PartitionsResponse>(new PartitionsQuery(params))
446+
const res = await this.locatorConnection.sendAndWait<PartitionsResponse>(new PartitionsQuery(params))
456447
if (!res.ok) {
457448
throw new Error(`Partitions Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
458449
}
@@ -629,23 +620,27 @@ export class Client {
629620
connectionClosedListener?: ConnectionClosedListener
630621
): Promise<Connection> {
631622
const [metadata] = await this.queryMetadata({ streams: [streamName] })
632-
const chosenNode = chooseNode(metadata, purpose === "publisher")
623+
const isPublisher = purpose === "publisher"
624+
const chosenNode = chooseNode(metadata, isPublisher)
633625
if (!chosenNode) {
634626
throw new Error(`Stream was not found on any node`)
635627
}
636-
const cachedConnection = this.pool.getCachedConnection(purpose, streamName, this.connection.vhost, chosenNode.host)
637-
if (cachedConnection) return cachedConnection
638-
639-
const newConnection = await this.getConnectionOnChosenNode(
628+
const connection = await this.pool.getConnection(
640629
purpose,
641630
streamName,
642-
chosenNode,
643-
metadata,
644-
connectionClosedListener
631+
this.locatorConnection.vhost,
632+
chosenNode.host,
633+
async () => {
634+
return await this.getConnectionOnChosenNode(
635+
isPublisher,
636+
streamName,
637+
chosenNode,
638+
metadata,
639+
connectionClosedListener
640+
)
641+
}
645642
)
646-
647-
this.pool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection)
648-
return newConnection
643+
return connection
649644
}
650645

651646
private createSuperStreamPartitionsAndBindingKeys(
@@ -688,13 +683,13 @@ export class Client {
688683
}
689684

690685
private async getConnectionOnChosenNode(
691-
purpose: ConnectionPurpose,
686+
isPublisher: boolean,
692687
streamName: string,
693688
chosenNode: { host: string; port: number },
694689
metadata: StreamMetadata,
695690
connectionClosedListener?: ConnectionClosedListener
696691
): Promise<Connection> {
697-
const connectionParams = this.buildConnectionParams(purpose === "publisher", streamName, connectionClosedListener)
692+
const connectionParams = this.buildConnectionParams(isPublisher, streamName, connectionClosedListener)
698693
if (this.params.addressResolver && this.params.addressResolver.enabled) {
699694
const maxAttempts = computeMaxAttempts(metadata)
700695
const resolver = this.params.addressResolver
@@ -727,7 +722,7 @@ export class Client {
727722
}
728723

729724
private async closing(consumer: StreamConsumer, extendedConsumerId: string) {
730-
await consumer.close(true)
725+
await consumer.close()
731726
this.consumers.delete(extendedConsumerId)
732727
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
733728
}

src/connection_pool.ts

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,53 +5,58 @@ type InstanceKey = string
55
export type ConnectionPurpose = "consumer" | "publisher"
66

77
export class ConnectionPool {
8-
private consumerConnectionProxies: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
9-
private publisherConnectionProxies: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
10-
11-
public getCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) {
12-
const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies
13-
const key = this.getCacheKey(streamName, vhost, host)
14-
const proxies = map.get(key) || []
15-
const connection = proxies.at(-1)
16-
const refCount = connection?.refCount
17-
return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined
18-
}
8+
private connectionsMap: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
199

20-
public cacheConnection(
21-
purpose: ConnectionPurpose,
10+
public async getConnection(
11+
entityType: ConnectionPurpose,
2212
streamName: string,
2313
vhost: string,
2414
host: string,
25-
client: Connection
15+
connectionCreator: () => Promise<Connection>
2616
) {
27-
const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies
28-
const key = this.getCacheKey(streamName, vhost, host)
29-
const currentlyCached = map.get(key) || []
30-
currentlyCached.push(client)
31-
map.set(key, currentlyCached)
17+
const key = this.getCacheKey(streamName, vhost, host, entityType)
18+
const connections = this.connectionsMap.get(key) || []
19+
const connection = connections.at(-1)
20+
const refCount = connection?.refCount
21+
const cachedConnection =
22+
refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined
23+
24+
if (cachedConnection) {
25+
return cachedConnection
26+
} else {
27+
const newConnection = await connectionCreator()
28+
this.cacheConnection(key, newConnection)
29+
return newConnection
30+
}
3231
}
3332

34-
public removeIfUnused(connection: Connection) {
33+
public async releaseConnection(connection: Connection, manuallyClose = true): Promise<void> {
34+
connection.decrRefCount()
3535
if (connection.refCount <= 0) {
36+
await connection.close({ closingCode: 0, closingReason: "", manuallyClose })
3637
this.removeCachedConnection(connection)
37-
return true
3838
}
39-
return false
4039
}
4140

42-
public removeCachedConnection(connection: Connection) {
41+
private cacheConnection(key: string, connection: Connection) {
42+
const currentlyCached = this.connectionsMap.get(key) || []
43+
currentlyCached.push(connection)
44+
this.connectionsMap.set(key, currentlyCached)
45+
}
46+
47+
private removeCachedConnection(connection: Connection) {
4348
const { leader, streamName, hostname: host, vhost } = connection
4449
if (streamName === undefined) return
45-
const m = leader ? this.publisherConnectionProxies : this.consumerConnectionProxies
46-
const k = this.getCacheKey(streamName, vhost, host)
47-
const mappedClientList = m.get(k)
50+
const entityType = leader ? "publisher" : "consumer"
51+
const k = this.getCacheKey(streamName, vhost, host, entityType)
52+
const mappedClientList = this.connectionsMap.get(k)
4853
if (mappedClientList) {
4954
const filtered = mappedClientList.filter((c) => c !== connection)
50-
m.set(k, filtered)
55+
this.connectionsMap.set(k, filtered)
5156
}
5257
}
5358

54-
private getCacheKey(streamName: string, vhost: string, host: string) {
55-
return `${streamName}@${vhost}@${host}`
59+
private getCacheKey(streamName: string, vhost: string, host: string, entityType: ConnectionPurpose) {
60+
return `${streamName}@${vhost}@${host}@${entityType}`
5661
}
5762
}

src/consumer.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,9 @@ export class StreamConsumer implements Consumer {
9797
this.singleActive = params.singleActive ?? false
9898
}
9999

100-
async close(manuallyClose: boolean): Promise<void> {
100+
async close(): Promise<void> {
101101
this.closed = true
102-
this.connection.decrRefCount()
103-
if (this.pool.removeIfUnused(this.connection)) {
104-
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
105-
}
102+
await this.pool.releaseConnection(this.connection)
106103
}
107104

108105
public storeOffset(offsetValue: bigint): Promise<void> {

0 commit comments

Comments
 (0)