Skip to content

Commit a6de1cf

Browse files
authored
fix cannot close client after consumer (#272)
* fix cannot close client after consumer * lint * added log
1 parent 5f2d499 commit a6de1cf

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

src/client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ export class Client {
6868
private publishers = new Map<string, PublisherMappedValue>()
6969
private compressions = new Map<CompressionType, Compression>()
7070
private locatorConnection: Connection
71-
private pool: ConnectionPool = new ConnectionPool()
71+
private pool: ConnectionPool
7272

7373
private constructor(
7474
private readonly logger: Logger,
@@ -77,6 +77,7 @@ export class Client {
7777
this.compressions.set(CompressionType.None, NoneCompression.create())
7878
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
7979
this.locatorConnection = this.getLocatorConnection()
80+
this.pool = new ConnectionPool(logger)
8081
}
8182

8283
getCompression(compressionType: CompressionType) {

src/connection_pool.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import { inspect } from "util"
12
import { Connection } from "./connection"
3+
import { Logger } from "./logger"
24
import { getMaxSharedConnectionInstances } from "./util"
35

46
type InstanceKey = string
@@ -7,6 +9,8 @@ export type ConnectionPurpose = "consumer" | "publisher"
79
export class ConnectionPool {
810
private connectionsMap: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
911

12+
constructor(private readonly log: Logger) {}
13+
1014
public async getConnection(
1115
entityType: ConnectionPurpose,
1216
streamName: string,
@@ -32,8 +36,14 @@ export class ConnectionPool {
3236

3337
public async releaseConnection(connection: Connection, manuallyClose = true): Promise<void> {
3438
connection.decrRefCount()
35-
if (connection.refCount <= 0) {
36-
await connection.close({ closingCode: 0, closingReason: "", manuallyClose })
39+
if (connection.refCount <= 0 && connection.ready) {
40+
try {
41+
await connection.close({ closingCode: 0, closingReason: "", manuallyClose })
42+
} catch (e) {
43+
// in case the client is closed immediately after a consumer, its connection has still not
44+
// reset the ready flag, so we get an "Error: write after end"
45+
this.log.warn(`Could not close connection: ${inspect(e)}`)
46+
}
3747
this.removeCachedConnection(connection)
3848
}
3949
}

test/e2e/client_close.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { Client, Offset } from "../../src"
2+
import { createClient, createStreamName } from "../support/fake_data"
3+
import { Rabbit } from "../support/rabbit"
4+
import { username, password } from "../support/util"
5+
6+
describe("close client", () => {
7+
const rabbit = new Rabbit(username, password)
8+
let streamName: string
9+
let client: Client
10+
11+
beforeEach(async () => {
12+
client = await createClient(username, password)
13+
streamName = createStreamName()
14+
await rabbit.createStream(streamName)
15+
})
16+
17+
afterEach(async () => {
18+
try {
19+
await rabbit.deleteStream(streamName)
20+
await rabbit.closeAllConnections()
21+
await rabbit.deleteAllQueues({ match: /my-stream-/ })
22+
} catch (e) {}
23+
})
24+
25+
it("can close client after closing publisher", async () => {
26+
const publisher = await client.declarePublisher({ stream: streamName })
27+
28+
await publisher.close(true)
29+
await client.close()
30+
})
31+
32+
it("can close client after closing consumer", async () => {
33+
const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (_msg) => {
34+
/* nothing */
35+
})
36+
37+
await consumer.close(true)
38+
await client.close()
39+
})
40+
})

0 commit comments

Comments
 (0)