Skip to content

Commit bb48695

Browse files
committed
262# hide manuallyClose param in close methods
1 parent 2d7185f commit bb48695

File tree

7 files changed

+119
-34
lines changed

7 files changed

+119
-34
lines changed

src/client.ts

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@ import { inspect } from "util"
44
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
55
import { Connection, ConnectionInfo, ConnectionParams, errorMessageOf } from "./connection"
66
import { ConnectionPool, ConnectionPurpose } from "./connection_pool"
7-
import { Consumer, ConsumerFunc, ConsumerUpdateListener, StreamConsumer, computeExtendedConsumerId } from "./consumer"
7+
import {
8+
Consumer,
9+
ConsumerFunc,
10+
ConsumerUpdateListener,
11+
StreamConsumer,
12+
PrivateConsumer,
13+
computeExtendedConsumerId,
14+
} from "./consumer"
815
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
916
import { Logger, NullLogger } from "./logger"
10-
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
17+
import { FilterFunc, Message, Publisher, PrivatePublisher, StreamPublisher } from "./publisher"
1118
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
1219
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
1320
import { CreateSuperStreamRequest } from "./requests/create_super_stream_request"
@@ -48,10 +55,10 @@ export type ConnectionClosedListener = (hadError: boolean) => void
4855

4956
export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean }
5057

51-
type ConsumerMappedValue = { connection: Connection; consumer: StreamConsumer; params: DeclareConsumerParams }
58+
type ConsumerMappedValue = { connection: Connection; consumer: PrivateConsumer; params: DeclareConsumerParams }
5259
type PublisherMappedValue = {
5360
connection: Connection
54-
publisher: StreamPublisher
61+
publisher: PrivatePublisher
5562
params: DeclarePublisherParams
5663
filter: FilterFunc | undefined
5764
}
@@ -154,7 +161,7 @@ export class Client {
154161
publisherRef: streamPublisherParams.publisherRef,
155162
})
156163
}
157-
const publisher = new StreamPublisher(this.pool, streamPublisherParams, lastPublishingId, filter)
164+
const publisher = new PrivatePublisher(this.pool, streamPublisherParams, lastPublishingId, filter)
158165
connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => {
159166
await publisher.close(false)
160167
this.publishers.delete(publisher.extendedId)
@@ -163,7 +170,7 @@ export class Client {
163170
this.logger.info(
164171
`New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}`
165172
)
166-
return publisher
173+
return new StreamPublisher(publisher)
167174
}
168175

169176
public async deletePublisher(extendedPublisherId: string) {
@@ -194,7 +201,7 @@ export class Client {
194201
throw new Error(`Broker does not support message filtering.`)
195202
}
196203

197-
const consumer = new StreamConsumer(
204+
const consumer = new PrivateConsumer(
198205
this.pool,
199206
handle,
200207
{
@@ -211,20 +218,24 @@ export class Client {
211218
params.filter
212219
)
213220
connection.registerForCloseConsumer(consumer.extendedId, params.stream, async () => {
214-
if (params.connectionClosedListener) {
215-
params.connectionClosedListener(false)
216-
}
217-
await this.closeConsumer(consumer.extendedId)
221+
const activeConsumer = await this.prepareCloseConsumer(consumer.extendedId)
222+
await this.closing(activeConsumer.consumer, consumer.extendedId, false)
218223
})
219224
this.consumers.set(consumer.extendedId, { connection, consumer, params })
220225
await this.declareConsumerOnConnection(params, consumerId, connection, superStreamConsumer?.superStream)
221226
this.logger.info(
222227
`New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}`
223228
)
224-
return consumer
229+
return new StreamConsumer(consumer)
225230
}
226231

227232
public async closeConsumer(extendedConsumerId: string) {
233+
const activeConsumer = await this.prepareCloseConsumer(extendedConsumerId)
234+
await this.closing(activeConsumer.consumer, extendedConsumerId, true)
235+
return true
236+
}
237+
238+
private async prepareCloseConsumer(extendedConsumerId: string) {
228239
const activeConsumer = this.consumers.get(extendedConsumerId)
229240
if (!activeConsumer) {
230241
this.logger.error("Consumer does not exist")
@@ -238,8 +249,7 @@ export class Client {
238249
if (streamInfos.length > 0 && streamExists(streamInfos[0])) {
239250
await this.unsubscribe(activeConsumer.connection, consumerId)
240251
}
241-
await this.closing(activeConsumer.consumer, extendedConsumerId)
242-
return true
252+
return activeConsumer
243253
}
244254

245255
public async declareSuperStreamConsumer(
@@ -275,7 +285,7 @@ export class Client {
275285
}
276286

277287
private async closeAllConsumers() {
278-
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close()))
288+
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(true)))
279289
this.consumers = new Map<string, ConsumerMappedValue>()
280290
}
281291

@@ -593,7 +603,7 @@ export class Client {
593603
}
594604
}
595605

596-
private async getConsumerOrServerSavedOffset(consumer: StreamConsumer) {
606+
private async getConsumerOrServerSavedOffset(consumer: PrivateConsumer) {
597607
if (consumer.isSingleActive && consumer.consumerRef && consumer.consumerUpdateListener) {
598608
try {
599609
const offset = await consumer.consumerUpdateListener(consumer.consumerRef, consumer.streamName)
@@ -721,8 +731,8 @@ export class Client {
721731
return res
722732
}
723733

724-
private async closing(consumer: StreamConsumer, extendedConsumerId: string) {
725-
await consumer.close()
734+
private async closing(consumer: PrivateConsumer, extendedConsumerId: string, manuallyClose: boolean) {
735+
await consumer.close(manuallyClose)
726736
this.consumers.delete(extendedConsumerId)
727737
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
728738
}

src/consumer.ts

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@ export const computeExtendedConsumerId = (consumerId: number, connectionId: stri
1414
export interface Consumer {
1515
/**
1616
* Close the publisher
17-
*
18-
* @param {boolean} manuallyClose - Weather you want to close the publisher manually or not
1917
*/
20-
// TODO - clarify the parameter
21-
close(manuallyClose: boolean): Promise<void>
18+
close(): Promise<void>
2219

2320
/**
2421
* Store the stream offset on the server
@@ -54,6 +51,34 @@ export interface Consumer {
5451
}
5552

5653
export class StreamConsumer implements Consumer {
54+
constructor(private readonly c: PrivateConsumer) {}
55+
close(): Promise<void> {
56+
return this.c.close(true)
57+
}
58+
storeOffset(offsetValue: bigint): Promise<void> {
59+
return this.c.storeOffset(offsetValue)
60+
}
61+
queryOffset(): Promise<bigint> {
62+
return this.c.queryOffset()
63+
}
64+
getConnectionInfo(): ConnectionInfo {
65+
return this.c.getConnectionInfo()
66+
}
67+
updateConsumerOffset(offset: Offset): void {
68+
this.c.updateConsumerOffset(offset)
69+
}
70+
public get consumerId(): number {
71+
return this.c.consumerId
72+
}
73+
public get consumerRef(): string | undefined {
74+
return this.c.consumerRef
75+
}
76+
public get extendedId(): string {
77+
return this.c.extendedId
78+
}
79+
}
80+
81+
export class PrivateConsumer {
5782
private connection: Connection
5883
private stream: string
5984
public consumerId: number
@@ -97,9 +122,9 @@ export class StreamConsumer implements Consumer {
97122
this.singleActive = params.singleActive ?? false
98123
}
99124

100-
async close(): Promise<void> {
125+
async close(manuallyClose: boolean): Promise<void> {
101126
this.closed = true
102-
await this.pool.releaseConnection(this.connection)
127+
await this.pool.releaseConnection(this.connection, manuallyClose)
103128
}
104129

105130
public storeOffset(offsetValue: bigint): Promise<void> {

src/publisher.ts

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,8 @@ export interface Publisher {
144144

145145
/**
146146
* Close the publisher
147-
*
148-
* @param {boolean} manuallyClose - Weather you want to close the publisher manually or not
149147
*/
150-
// TODO - clarify the parameter
151-
close(manuallyClose: boolean): Promise<void>
148+
close(): Promise<void>
152149

153150
closed: boolean
154151
ref: string
@@ -159,7 +156,60 @@ export interface Publisher {
159156
export type FilterFunc = (msg: Message) => string | undefined
160157
type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void
161158
export type SendResult = { sent: boolean; publishingId: bigint }
159+
162160
export class StreamPublisher implements Publisher {
161+
constructor(private readonly p: PrivatePublisher) {}
162+
163+
send(message: Buffer, opts?: MessageOptions): Promise<SendResult> {
164+
return this.p.send(message, opts)
165+
}
166+
basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise<SendResult> {
167+
return this.p.basicSend(publishingId, content, opts)
168+
}
169+
flush(): Promise<boolean> {
170+
return this.p.flush()
171+
}
172+
sendSubEntries(messages: Message[], compressionType?: CompressionType): Promise<void> {
173+
return this.p.sendSubEntries(messages, compressionType)
174+
}
175+
on(event: "metadata_update", listener: MetadataUpdateListener): void
176+
on(event: "publish_confirm", listener: PublishConfirmCallback): void
177+
on(event: "metadata_update" | "publish_confirm", listener: MetadataUpdateListener | PublishConfirmCallback): void {
178+
switch (event) {
179+
case "metadata_update":
180+
this.p.on(event, listener as MetadataUpdateListener)
181+
break
182+
case "publish_confirm":
183+
this.p.on(event, listener as PublishConfirmCallback)
184+
break
185+
default:
186+
break
187+
}
188+
}
189+
getLastPublishingId(): Promise<bigint> {
190+
return this.p.getLastPublishingId()
191+
}
192+
getConnectionInfo(): ConnectionInfo {
193+
return this.p.getConnectionInfo()
194+
}
195+
close(): Promise<void> {
196+
return this.p.close(true)
197+
}
198+
public get closed(): boolean {
199+
return this.p.closed
200+
}
201+
public get ref(): string {
202+
return this.p.ref
203+
}
204+
public get publisherId(): number {
205+
return this.p.publisherId
206+
}
207+
public get extendedId(): string {
208+
return this.p.extendedId
209+
}
210+
}
211+
212+
export class PrivatePublisher {
163213
private connection: Connection
164214
private stream: string
165215
readonly publisherId: number

src/super_stream_consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ export class SuperStreamConsumer {
6868
}
6969

7070
async close(): Promise<void> {
71-
await Promise.all([...this.consumers.values()].map((c) => c.close(true)))
71+
await Promise.all([...this.consumers.values()].map((c) => c.close()))
7272
}
7373
}

src/super_stream_publisher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class SuperStreamPublisher {
4343
}
4444

4545
public async close(): Promise<void> {
46-
await Promise.all([...this.publishers.values()].map((p) => p.close(true)))
46+
await Promise.all([...this.publishers.values()].map((p) => p.close()))
4747
this.publishers = new Map()
4848
}
4949

test/e2e/declare_consumer.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ describe("declare consumer", () => {
8585
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
8686
}).timeout(10000)
8787

88-
it.skip("closing a consumer on an existing stream - raises connectionClosedListener", async () => {
88+
it("closing a consumer on an existing stream - does not raise connectionClosedListener", async () => {
8989
const messages: Buffer[] = []
9090
await publisher.send(Buffer.from("hello"))
9191
let called = false
@@ -97,9 +97,9 @@ describe("declare consumer", () => {
9797
)
9898
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
9999

100-
await client.close({ closingCode: 0, closingReason: "", manuallyClose: false })
100+
await client.close({ closingCode: 0, closingReason: "" })
101101

102-
await eventually(() => expect(called).true)
102+
await eventually(() => expect(called).false)
103103
}).timeout(10000)
104104

105105
it("declaring a consumer on an existing stream with identifiers", async () => {

test/e2e/sub_entry_consume.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ describe("consume a batch of messages", () => {
2929
await rabbit.deleteStream(streamName)
3030
await rabbit.closeAllConnections()
3131
await rabbit.deleteAllQueues({ match: /my-stream-/ })
32-
await consumer?.close(true)
32+
await consumer?.close()
3333
} catch (e) {}
3434
})
3535

0 commit comments

Comments
 (0)