Skip to content

Commit c21f135

Browse files
committed
262# hide manuallyClose param in close methods
1 parent 2d7185f commit c21f135

File tree

6 files changed

+24
-17
lines changed

6 files changed

+24
-17
lines changed

src/client.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class Client {
156156
}
157157
const publisher = new StreamPublisher(this.pool, streamPublisherParams, lastPublishingId, filter)
158158
connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => {
159-
await publisher.close(false)
159+
await publisher.automaticClose()
160160
this.publishers.delete(publisher.extendedId)
161161
})
162162
this.publishers.set(publisher.extendedId, { publisher, connection, params, filter })
@@ -176,7 +176,7 @@ export class Client {
176176
if (!res.ok) {
177177
throw new Error(`Delete Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
178178
}
179-
await publisher?.close(true)
179+
await publisher?.close()
180180
this.publishers.delete(extendedPublisherId)
181181
this.logger.info(`deleted publisher with publishing id ${publisherId}`)
182182
return res.ok
@@ -280,7 +280,7 @@ export class Client {
280280
}
281281

282282
private async closeAllPublishers() {
283-
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(true)))
283+
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close()))
284284
this.publishers = new Map<string, PublisherMappedValue>()
285285
}
286286

src/consumer.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@ export const computeExtendedConsumerId = (consumerId: number, connectionId: stri
1414
export interface Consumer {
1515
/**
1616
* Close the publisher
17-
*
18-
* @param {boolean} manuallyClose - Weather you want to close the publisher manually or not
1917
*/
20-
// TODO - clarify the parameter
21-
close(manuallyClose: boolean): Promise<void>
18+
close(): Promise<void>
2219

2320
/**
2421
* Store the stream offset on the server
@@ -99,7 +96,12 @@ export class StreamConsumer implements Consumer {
9996

10097
async close(): Promise<void> {
10198
this.closed = true
102-
await this.pool.releaseConnection(this.connection)
99+
await this.pool.releaseConnection(this.connection, true)
100+
}
101+
102+
async automaticClose(): Promise<void> {
103+
this.closed = true
104+
await this.pool.releaseConnection(this.connection, false)
103105
}
104106

105107
public storeOffset(offsetValue: bigint): Promise<void> {

src/publisher.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,8 @@ export interface Publisher {
144144

145145
/**
146146
* Close the publisher
147-
*
148-
* @param {boolean} manuallyClose - Weather you want to close the publisher manually or not
149147
*/
150-
// TODO - clarify the parameter
151-
close(manuallyClose: boolean): Promise<void>
148+
close(): Promise<void>
152149

153150
closed: boolean
154151
ref: string
@@ -278,10 +275,18 @@ export class StreamPublisher implements Publisher {
278275
return this.publisherRef
279276
}
280277

281-
public async close(manuallyClose: boolean): Promise<void> {
278+
public async close(): Promise<void> {
279+
if (!this.closed) {
280+
await this.flush()
281+
await this.pool.releaseConnection(this.connection, true)
282+
}
283+
this._closed = true
284+
}
285+
286+
public async automaticClose(): Promise<void> {
282287
if (!this.closed) {
283288
await this.flush()
284-
await this.pool.releaseConnection(this.connection, manuallyClose)
289+
await this.pool.releaseConnection(this.connection, false)
285290
}
286291
this._closed = true
287292
}

src/super_stream_consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ export class SuperStreamConsumer {
6868
}
6969

7070
async close(): Promise<void> {
71-
await Promise.all([...this.consumers.values()].map((c) => c.close(true)))
71+
await Promise.all([...this.consumers.values()].map((c) => c.close()))
7272
}
7373
}

src/super_stream_publisher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class SuperStreamPublisher {
4343
}
4444

4545
public async close(): Promise<void> {
46-
await Promise.all([...this.publishers.values()].map((p) => p.close(true)))
46+
await Promise.all([...this.publishers.values()].map((p) => p.close()))
4747
this.publishers = new Map()
4848
}
4949

test/e2e/sub_entry_consume.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ describe("consume a batch of messages", () => {
2929
await rabbit.deleteStream(streamName)
3030
await rabbit.closeAllConnections()
3131
await rabbit.deleteAllQueues({ match: /my-stream-/ })
32-
await consumer?.close(true)
32+
await consumer?.close()
3333
} catch (e) {}
3434
})
3535

0 commit comments

Comments
 (0)