Skip to content

Commit 3f35c40

Browse files
l4mbymagne
andauthored
feat: add optional consumer identifier (coders51#243)
* feat: add optional consumer identifier * chore: add tests and update readme --------- Co-authored-by: magne <[email protected]>
1 parent f607189 commit 3f35c40

File tree

5 files changed

+44
-0
lines changed

5 files changed

+44
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ const consumerOptions = { stream: "stream-name", offset: Offset.next() }
211211
- Offset.offset(x) ---> Start reading from the specified offset. The parameter has to be a bigint.
212212
- Offset.timestamp(t) ---> Start reading from the messages stored after the timestamp t.
213213
214+
Optionally a consumer identifier can be set in the consumer option.
215+
It's an optional property called consumerTag.
214216
*/
215217

216218
const consumer = await client.declareConsumer(consumerOptions, (message: Message) => {

src/client.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ export class Client {
209209
stream: params.stream,
210210
consumerId,
211211
consumerRef: params.consumerRef,
212+
consumerTag: params.consumerTag,
212213
offset: params.offset,
213214
creditPolicy: params.creditPolicy,
214215
},
@@ -503,6 +504,9 @@ export class Client {
503504
}
504505
properties["match-unfiltered"] = `${params.filter.matchUnfiltered}`
505506
}
507+
if (params.consumerTag) {
508+
properties["identifier"] = params.consumerTag
509+
}
506510

507511
const creditPolicy = params.creditPolicy || defaultCreditPolicy
508512

@@ -774,6 +778,7 @@ export interface DeclareConsumerParams {
774778
singleActive?: boolean
775779
filter?: ConsumerFilter
776780
creditPolicy?: ConsumerCreditPolicy
781+
consumerTag?: string
777782
}
778783

779784
export interface DeclareSuperStreamConsumerParams {

src/consumer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export class StreamConsumer implements Consumer {
2525
private stream: string
2626
public consumerId: number
2727
public consumerRef?: string
28+
public consumerTag?: string
2829
public offset: Offset
2930
private clientLocalOffset: Offset
3031
private creditsHandler: ConsumerCreditPolicy
@@ -38,6 +39,7 @@ export class StreamConsumer implements Consumer {
3839
stream: string
3940
consumerId: number
4041
consumerRef?: string
42+
consumerTag?: string
4143
offset: Offset
4244
creditPolicy?: ConsumerCreditPolicy
4345
},

test/e2e/declare_consumer.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,24 @@ describe("declare consumer", () => {
8484
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
8585
}).timeout(10000)
8686

87+
it("declaring a consumer on an existing stream with identifiers", async () => {
88+
const messages: Buffer[] = []
89+
await publisher.send(Buffer.from("hello"))
90+
91+
await client.declareConsumer(
92+
{ stream: streamName, offset: Offset.first(), consumerTag: "test-id" },
93+
(message: Message) => {
94+
messages.push(message.content)
95+
}
96+
)
97+
98+
await eventually(async () => {
99+
const ids = await rabbit.returnConsumersIdentifiers()
100+
expect(ids).length(1)
101+
expect(ids[0]).eql("test-id")
102+
})
103+
}).timeout(10000)
104+
87105
it("declaring an async consumer on an existing stream - the consumer should handle the message", async () => {
88106
const messages: Buffer[] = []
89107
await publisher.send(Buffer.from("hello"))

test/support/rabbit.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ interface RabbitChannelDetails {
5050
user: string
5151
}
5252

53+
interface RabbitConsumerProperties {
54+
identifier?: string
55+
}
56+
5357
interface RabbitConsumersResponse {
5458
queue: RabbitConsumersResponseQueue
5559
consumer_tag: string
60+
properties: RabbitConsumerProperties
5661
channel_details: RabbitChannelDetails
5762
}
5863

@@ -218,6 +223,18 @@ export class Rabbit {
218223
return resp.body.map((p) => p.consumer_tag)
219224
}
220225

226+
async returnConsumersIdentifiers(): Promise<string[]> {
227+
const resp = await got.get<RabbitConsumersResponse[]>(
228+
`http://${this.firstNode.host}:${this.port}/api/stream/consumers/%2F/`,
229+
{
230+
username: this.username,
231+
password: this.password,
232+
responseType: "json",
233+
}
234+
)
235+
return resp.body.map((p) => p.properties.identifier ?? "")
236+
}
237+
221238
async returnConsumersCredits(): Promise<RabbitConsumerCredits[]> {
222239
const allConsumerCredits: RabbitConsumerCredits[] = []
223240
const allConsumersResp = await got.get<RabbitConsumersResponse[]>(

0 commit comments

Comments
 (0)