Skip to content

Commit 6ac0eb0

Browse files
authored
Fix bug when connecting to the same stream name different vhosts (#246)
* feat: add vhost prop to connection class * fix: include vhost in cache key to avoid collision when using multiple connections to different vhosts * fix: vhost names don't have to include / * feat: add vhost to ConnectionInfo * chore: lint * chore: format * fix: encode vhost when using it in an URL
1 parent 4611398 commit 6ac0eb0

File tree

7 files changed

+111
-17
lines changed

7 files changed

+111
-17
lines changed

src/client.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,12 @@ export class Client {
611611
if (!chosenNode) {
612612
throw new Error(`Stream was not found on any node`)
613613
}
614-
const cachedConnection = ConnectionPool.getUsableCachedConnection(purpose, streamName, chosenNode.host)
614+
const cachedConnection = ConnectionPool.getUsableCachedConnection(
615+
purpose,
616+
streamName,
617+
this.connection.vhost,
618+
chosenNode.host
619+
)
615620
if (cachedConnection) return cachedConnection
616621

617622
const newConnection = await this.getConnectionOnChosenNode(
@@ -622,7 +627,7 @@ export class Client {
622627
connectionClosedListener
623628
)
624629

625-
ConnectionPool.cacheConnection(purpose, streamName, newConnection.hostname, newConnection)
630+
ConnectionPool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection)
626631
return newConnection
627632
}
628633

src/connection.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ export type ConnectionInfo = {
6262
port: number
6363
id: string
6464
ready: boolean
65+
vhost: string
6566
readable?: boolean
6667
writable?: boolean
6768
localPort?: number
@@ -78,6 +79,7 @@ type ListenerEntry = {
7879

7980
export class Connection {
8081
public readonly hostname: string
82+
public readonly vhost: string
8183
public readonly leader: boolean
8284
public readonly streamName: string | undefined
8385
private socket: Socket
@@ -109,6 +111,7 @@ export class Connection {
109111
private readonly logger: Logger
110112
) {
111113
this.hostname = params.hostname
114+
this.vhost = params.vhost
112115
this.leader = params.leader ?? false
113116
this.streamName = params.streamName
114117
if (params.frameMax) this.frameMax = params.frameMax
@@ -374,6 +377,7 @@ export class Connection {
374377
writable: this.socket.writable,
375378
localPort: this.socket.localPort,
376379
ready: this.ready,
380+
vhost: this.vhost,
377381
}
378382
}
379383

@@ -484,7 +488,7 @@ export class Connection {
484488
}
485489

486490
private virtualHostIsNotValid(virtualHost: string) {
487-
if (!virtualHost || virtualHost.split("/").length !== 2) {
491+
if (!virtualHost) {
488492
return true
489493
}
490494

src/connection_pool.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,26 @@ export class ConnectionPool {
88
private static consumerConnectionProxies = new Map<InstanceKey, Connection[]>()
99
private static publisherConnectionProxies = new Map<InstanceKey, Connection[]>()
1010

11-
public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, host: string) {
11+
public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) {
1212
const map =
1313
purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
14-
const key = ConnectionPool.getCacheKey(streamName, host)
14+
const key = ConnectionPool.getCacheKey(streamName, vhost, host)
1515
const proxies = map.get(key) || []
1616
const connection = proxies.at(-1)
1717
const refCount = connection?.refCount
1818
return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined
1919
}
2020

21-
public static cacheConnection(purpose: ConnectionPurpose, streamName: string, host: string, client: Connection) {
21+
public static cacheConnection(
22+
purpose: ConnectionPurpose,
23+
streamName: string,
24+
vhost: string,
25+
host: string,
26+
client: Connection
27+
) {
2228
const map =
2329
purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
24-
const key = ConnectionPool.getCacheKey(streamName, host)
30+
const key = ConnectionPool.getCacheKey(streamName, vhost, host)
2531
const currentlyCached = map.get(key) || []
2632
currentlyCached.push(client)
2733
map.set(key, currentlyCached)
@@ -36,18 +42,18 @@ export class ConnectionPool {
3642
}
3743

3844
public static removeCachedConnection(connection: Connection) {
39-
const { leader, streamName, hostname: host } = connection
45+
const { leader, streamName, hostname: host, vhost } = connection
4046
if (streamName === undefined) return
4147
const m = leader ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
42-
const k = ConnectionPool.getCacheKey(streamName, host)
48+
const k = ConnectionPool.getCacheKey(streamName, vhost, host)
4349
const mappedClientList = m.get(k)
4450
if (mappedClientList) {
4551
const filtered = mappedClientList.filter((c) => c !== connection)
4652
m.set(k, filtered)
4753
}
4854
}
4955

50-
private static getCacheKey(streamName: string, host: string) {
51-
return `${streamName}@${host}`
56+
private static getCacheKey(streamName: string, vhost: string, host: string) {
57+
return `${streamName}@${vhost}@${host}`
5258
}
5359
}

src/consumer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ export class StreamConsumer implements Consumer {
7676
}
7777

7878
public getConnectionInfo(): ConnectionInfo {
79-
const { host, port, id, readable, localPort, ready } = this.connection.getConnectionInfo()
80-
return { host, port, id, readable, localPort, ready }
79+
const { host, port, id, readable, localPort, ready, vhost } = this.connection.getConnectionInfo()
80+
return { host, port, id, readable, localPort, ready, vhost }
8181
}
8282

8383
public get localOffset() {

src/publisher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ export class StreamPublisher implements Publisher {
180180
}
181181

182182
public getConnectionInfo(): ConnectionInfo {
183-
const { host, port, id, writable, localPort, ready } = this.connection.getConnectionInfo()
184-
return { host, port, id, writable, localPort, ready }
183+
const { host, port, id, writable, localPort, ready, vhost } = this.connection.getConnectionInfo()
184+
return { host, port, id, writable, localPort, ready, vhost }
185185
}
186186

187187
public on(event: "metadata_update", listener: MetadataUpdateListener): void

test/e2e/stream_cache.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { expect } from "chai"
2+
import got from "got"
3+
import { Client } from "../../src"
4+
import { createClient, createStreamName } from "../support/fake_data"
5+
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
6+
import { getTestNodesFromEnv, password, username } from "../support/util"
7+
8+
async function createVhost(vhost: string): Promise<undefined> {
9+
const uriVhost = encodeURIComponent(vhost)
10+
const port = process.env.RABBIT_MQ_MANAGEMENT_PORT || 15672
11+
const firstNode = getTestNodesFromEnv().shift()!
12+
await got.put<RabbitConnectionResponse>(`http://${firstNode.host}:${port}/api/vhosts/${uriVhost}`, {
13+
username: username,
14+
password: password,
15+
})
16+
await got
17+
.put<RabbitConnectionResponse>(`http://${firstNode.host}:${port}/api/permissions/${uriVhost}/${username}`, {
18+
json: {
19+
read: ".*",
20+
write: ".*",
21+
configure: ".*",
22+
},
23+
username: username,
24+
password: password,
25+
})
26+
.json()
27+
}
28+
29+
async function deleteVhost(vhost: string): Promise<RabbitConnectionResponse> {
30+
const uriVhost = encodeURIComponent(vhost)
31+
const port = process.env.RABBIT_MQ_MANAGEMENT_PORT || 15672
32+
const firstNode = getTestNodesFromEnv().shift()!
33+
const r = await got.delete<RabbitConnectionResponse>(`http://${firstNode.host}:${port}/api/vhosts/${uriVhost}`, {
34+
username: username,
35+
password: password,
36+
})
37+
38+
return r.body
39+
}
40+
41+
describe("cache", () => {
42+
const vhost1 = "vhost1"
43+
let streamName: string
44+
const rabbit = new Rabbit(username, password)
45+
let client: Client
46+
let client2: Client
47+
before(async () => {
48+
await createVhost(vhost1)
49+
})
50+
beforeEach(async () => {
51+
client = await createClient(username, password)
52+
client2 = await createClient(username, password, undefined, undefined, undefined, undefined, undefined, vhost1)
53+
streamName = createStreamName()
54+
await client.createStream({ stream: streamName })
55+
await client2.createStream({ stream: streamName })
56+
})
57+
afterEach(async () => {
58+
try {
59+
await client.close()
60+
await client2.close()
61+
await deleteVhost(vhost1)
62+
await rabbit.deleteStream(streamName)
63+
await rabbit.closeAllConnections()
64+
await rabbit.deleteAllQueues({ match: /my-stream-/ })
65+
} catch (_e) {}
66+
})
67+
68+
it("should cache using the vhost as well as the stream name", async () => {
69+
const publisher1 = await client.declarePublisher({
70+
stream: streamName,
71+
})
72+
expect(publisher1.getConnectionInfo().vhost).eql("/")
73+
const publisher2 = await client2.declarePublisher({
74+
stream: streamName,
75+
})
76+
expect(publisher2.getConnectionInfo().vhost).eql(vhost1)
77+
})
78+
})

test/support/fake_data.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ export async function createClient(
6565
frameMax?: number,
6666
bufferSizeSettings?: BufferSizeSettings,
6767
port?: number,
68-
connectionName?: string
68+
connectionName?: string,
69+
vhost?: string
6970
): Promise<Client> {
7071
const [firstNode] = getTestNodesFromEnv()
7172
return connect(
@@ -74,7 +75,7 @@ export async function createClient(
7475
port: port ?? firstNode.port,
7576
username,
7677
password,
77-
vhost: "/",
78+
vhost: vhost ?? "/",
7879
frameMax: frameMax ?? 0,
7980
heartbeat: 0,
8081
listeners: listeners,

0 commit comments

Comments
 (0)