Skip to content

Commit 0e90ccd

Browse files
l4mbymagne
andauthored
242 metadataupdate ends up with unhandledrejection (#245)
* fix: handle consumer unsubscribe * fix: missing conditional * wip: refactor and close doubts * fix: enable skipped test * chore: remove comment --------- Co-authored-by: magne <[email protected]>
1 parent 6843499 commit 0e90ccd

File tree

4 files changed

+74
-24
lines changed

4 files changed

+74
-24
lines changed

src/client.ts

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import { SubscribeResponse } from "./responses/subscribe_response"
4141
import { UnsubscribeResponse } from "./responses/unsubscribe_response"
4242
import { SuperStreamConsumer } from "./super_stream_consumer"
4343
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
44-
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util"
44+
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util"
4545
import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy"
4646

4747
export type ConnectionClosedListener = (hadError: boolean) => void
@@ -163,7 +163,7 @@ export class Client {
163163
})
164164
}
165165
const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter)
166-
connection.onPublisherClosed(publisher.extendedId, params.stream, async () => {
166+
connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => {
167167
await publisher.close(false)
168168
this.publishers.delete(publisher.extendedId)
169169
})
@@ -215,7 +215,7 @@ export class Client {
215215
},
216216
params.filter
217217
)
218-
connection.onConsumerClosed(consumer.extendedId, params.stream, async () => {
218+
connection.registerForCloseConsumer(consumer.extendedId, params.stream, async () => {
219219
if (params.connectionClosedListener) {
220220
params.connectionClosedListener(false)
221221
}
@@ -230,24 +230,21 @@ export class Client {
230230
}
231231

232232
public async closeConsumer(extendedConsumerId: string) {
233-
const { consumer, connection } = this.consumers.get(extendedConsumerId) ?? {
234-
consumer: undefined,
235-
connection: undefined,
236-
}
237-
const consumerId = extractConsumerId(extendedConsumerId)
238-
239-
if (!consumer) {
233+
const activeConsumer = this.consumers.get(extendedConsumerId)
234+
if (!activeConsumer) {
240235
this.logger.error("Consumer does not exist")
241236
throw new Error(`Consumer with id: ${extendedConsumerId} does not exist`)
242237
}
243-
const res = await connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
244-
await consumer.close(true)
245-
this.consumers.delete(extendedConsumerId)
246-
if (!res.ok) {
247-
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
238+
239+
const consumerId = extractConsumerId(extendedConsumerId)
240+
const { streamInfos } = await this.connection.sendAndWait<MetadataResponse>(
241+
new MetadataRequest({ streams: [activeConsumer.consumer.streamName] })
242+
)
243+
if (streamInfos.length > 0 && streamExists(streamInfos[0])) {
244+
await this.unsubscribe(activeConsumer.connection, consumerId)
248245
}
249-
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
250-
return res.ok
246+
await this.closing(activeConsumer.consumer, extendedConsumerId)
247+
return true
251248
}
252249

253250
public async declareSuperStreamConsumer(
@@ -406,11 +403,7 @@ export class Client {
406403
const uniqueConnectionIds = new Set<string>()
407404
uniqueConnectionIds.add(this.connection.connectionId)
408405

409-
await new Promise(async (res) => {
410-
setTimeout(() => {
411-
res(true)
412-
}, 5000)
413-
})
406+
await wait(5000)
414407
await this.connection.restart()
415408

416409
for (const { consumer, connection, params } of this.consumers.values()) {
@@ -703,6 +696,20 @@ export class Client {
703696
return Connection.connect({ ...connectionParams, hostname: chosenNode.host, port: chosenNode.port }, this.logger)
704697
}
705698

699+
private async unsubscribe(connection: Connection, consumerId: number) {
700+
const res = await connection.sendAndWait<UnsubscribeResponse>(new UnsubscribeRequest(consumerId))
701+
if (!res.ok) {
702+
throw new Error(`Unsubscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
703+
}
704+
return res
705+
}
706+
707+
private async closing(consumer: StreamConsumer, extendedConsumerId: string) {
708+
await consumer.close(true)
709+
this.consumers.delete(extendedConsumerId)
710+
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
711+
}
712+
706713
static async connect(params: ClientParams, logger?: Logger): Promise<Client> {
707714
return new Client(logger ?? new NullLogger(), {
708715
...params,
@@ -842,3 +849,10 @@ const extractPublisherId = (extendedPublisherId: string) => {
842849
}
843850

844851
const getVhostOrDefault = (vhost: string) => vhost ?? "/"
852+
853+
const streamExists = (streamInfo: StreamMetadata): boolean => {
854+
return (
855+
streamInfo.responseCode !== ResponseCode.StreamDoesNotExist &&
856+
streamInfo.responseCode !== ResponseCode.SubscriptionIdDoesNotExist
857+
)
858+
}

src/connection.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,20 @@ export class Connection {
265265
)
266266
}
267267

268-
public onPublisherClosed(publisherExtendedId: string, streamName: string, callback: () => void | Promise<void>) {
268+
public registerForClosePublisher(
269+
publisherExtendedId: string,
270+
streamName: string,
271+
callback: () => void | Promise<void>
272+
) {
269273
this.publisherListeners.push({ extendedId: publisherExtendedId, stream: streamName })
270274
this.closeEventsEmitter.once(`close_publisher_${publisherExtendedId}`, callback)
271275
}
272276

273-
public onConsumerClosed(consumerExtendedId: string, streamName: string, callback: () => void | Promise<void>) {
277+
public registerForCloseConsumer(
278+
consumerExtendedId: string,
279+
streamName: string,
280+
callback: () => void | Promise<void>
281+
) {
274282
this.consumerListeners.push({ extendedId: consumerExtendedId, stream: streamName })
275283
this.closeEventsEmitter.once(`close_consumer_${consumerExtendedId}`, callback)
276284
}

src/util.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,14 @@ export const bigIntMax = (n: bigint[]): bigint | undefined => {
3939
if (!n.length) return undefined
4040
return n.reduce((acc, i) => (i > acc ? i : acc), n[0])
4141
}
42+
43+
export const wait = async (ms: number) => {
44+
return new Promise((res) => {
45+
setTimeout(() => res(true), ms)
46+
})
47+
}
48+
49+
export const ResponseCode = {
50+
StreamDoesNotExist: 2,
51+
SubscriptionIdDoesNotExist: 4,
52+
} as const

test/e2e/declare_consumer.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,23 @@ describe("declare consumer", () => {
8484
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
8585
}).timeout(10000)
8686

87+
it("closing a consumer on an existing stream - raises connectionClosedListener", async () => {
88+
const messages: Buffer[] = []
89+
await publisher.send(Buffer.from("hello"))
90+
let called = false
91+
await client.declareConsumer(
92+
{ stream: streamName, offset: Offset.first(), connectionClosedListener: () => (called = true) },
93+
(message: Message) => {
94+
messages.push(message.content)
95+
}
96+
)
97+
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
98+
99+
await client.close({ closingCode: 0, closingReason: "", manuallyClose: false })
100+
101+
await eventually(() => expect(called).true)
102+
}).timeout(10000)
103+
87104
it("declaring a consumer on an existing stream with identifiers", async () => {
88105
const messages: Buffer[] = []
89106
await publisher.send(Buffer.from("hello"))

0 commit comments

Comments
 (0)