Skip to content

Commit 07da189

Browse files
authored
230 change stream options when already created (#233)
* removed boot param * removed boot from tests * conditionally activate deduplication in tests * activate deduplication for deduplication tests * race condition * rollback race condition * fix test * readme * fix send concurrency * added example * new deduplication implementation * added tests * more tests * example * typo * readme * renamed variable * replaced sent with stored * updated readme
1 parent a8f21fc commit 07da189

File tree

8 files changed

+196
-24
lines changed

8 files changed

+196
-24
lines changed

README.md

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,39 @@ await publisher.send(Buffer.from("my message content"))
123123
await client.close()
124124
```
125125

126+
### Deduplication
127+
128+
If you want to make sure that a given message isn't sent more than once, you must use deduplication.
129+
To create a publisher with deduplication, you just need to pass `publisherRef` to the `declarePublisher` function, this way RabbitMQ will detect messages with lower ids and discard them.
130+
Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications.
131+
Note: The server does not control the publisherRef across the producers. It's the user's responsibility to guarantee that.
132+
133+
You can publish messages either defining a `publishingId` or not:
134+
In the first case you call the `send` function with `publishingId` defined inside the `MessageOptions`. It's the users responsability to guarantee a valid `publishingId`.
135+
In the latter case you just call `send` and the publisher will use the next valid `publishingId`.
136+
137+
```typescript
138+
const client = await connect({
139+
hostname: "localhost",
140+
port: 5552,
141+
username: "rabbit",
142+
password: "rabbit",
143+
vhost: "/",
144+
})
145+
146+
const deduplicationPublisher = await client.declarePublisher({
147+
stream: "stream-name",
148+
publisherRef: "my-publisher",
149+
})
150+
151+
await deduplicationPublisher.send(Buffer.from("my message content"), { publishingId: 5n }) //here we are passing 5 as publishingId, the message will be sent only if the last publishingId was lower
152+
await deduplicationPublisher.send(Buffer.from("my message content")) //here we are not passing any publishingId, the publisher will use the next valid publishingId
153+
154+
// ...
155+
156+
await client.close()
157+
```
158+
126159
### Sub Batch Entry Publishing
127160

128161
```typescript
@@ -211,18 +244,18 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
211244
```
212245

213246
### Custom Policy
247+
214248
By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing.
215249

216250
```typescript
217251
const consumerOptions = {
218252
stream: "stream-name",
219-
creditPolicy: creditsOnChunkReceived(2, 1)
253+
creditPolicy: creditsOnChunkReceived(2, 1),
220254
}
221255

222256
await client.declareConsumer(consumerOptions, async (message: Message) => {
223-
console.log(message.content)
224-
}
225-
)
257+
console.log(message.content)
258+
})
226259
```
227260

228261
### Clustering

example/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Run this example only with rabbit management version >= 3.13.0.
3+
*/
4+
5+
const rabbit = require("rabbitmq-stream-js-client")
6+
const { randomUUID } = require("crypto")
7+
8+
const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
9+
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"
10+
11+
async function main() {
12+
const streamName = `example-${randomUUID()}`
13+
const publisherRef = `publisher-${randomUUID()}`
14+
console.log(`Creating stream ${streamName}`)
15+
16+
const client = await rabbit.connect({
17+
hostname: "localhost",
18+
port: 5552,
19+
username: rabbitUser,
20+
password: rabbitPassword,
21+
vhost: "/",
22+
heartbeat: 0,
23+
})
24+
await client.createStream({ stream: streamName })
25+
26+
//to declare a publisher with deduplication enabled, you need to set a publisherRef
27+
const firstDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef })
28+
29+
//with deduplication actived, you can send messages without a publishingId; in this case it will be incremental
30+
await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) //publishingId = 1
31+
await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) //publishingId = 2
32+
//but you can also set a publishingId, note that it must be greater than the last one for the message to be stored
33+
await firstDeduplicationPublisher.send(Buffer.from("Test message 3"), { publishingId: 3n }) //publishingId = 3
34+
//if you choose a publishingId that is less than the last one, the message will not be stored
35+
await firstDeduplicationPublisher.send(Buffer.from("Test message 4"), { publishingId: 1n }) //this message won't be stored
36+
await firstDeduplicationPublisher.flush()
37+
const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId()
38+
await firstDeduplicationPublisher.close()
39+
40+
console.log(`Publishing id is ${firstPublisherPublishingId} (must be 3)`) //this must be the greatest publishingId, 3 in this case
41+
42+
const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef })
43+
//with the second publisher if we try to send messages with lower publishingId than the last one, they will not be stored
44+
await secondDeduplicationPublisher.send(Buffer.from("Test message 5"), { publishingId: 1n }) //won't be stored
45+
await secondDeduplicationPublisher.send(Buffer.from("Test message 6"), { publishingId: 2n }) //won't be stored
46+
await secondDeduplicationPublisher.send(Buffer.from("Test message 7"), { publishingId: 7n }) //this will be stored since 7 is greater than 3, the last highest publishingId
47+
await secondDeduplicationPublisher.flush()
48+
const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId()
49+
await secondDeduplicationPublisher.close()
50+
51+
console.log(`Publishing id is ${secondPublisherPublishingId} (must be 7)`) //this must be the greatest publishingId, 7 in this case
52+
53+
await client.deleteStream({ stream: streamName })
54+
55+
await client.close()
56+
}
57+
58+
main()
59+
.then(() => console.log("done!"))
60+
.catch((res) => {
61+
console.log("ERROR ", res)
62+
process.exit(-1)
63+
})

src/client.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,18 @@ export class Client {
151151
stream: params.stream,
152152
publisherId: publisherId,
153153
publisherRef: params.publisherRef,
154-
boot: params.boot,
155154
maxFrameSize: this.maxFrameSize,
156155
maxChunkLength: params.maxChunkLength,
157156
logger: this.logger,
158157
}
159-
const publisher = new StreamPublisher(streamPublisherParams, filter)
158+
let lastPublishingId = 0n
159+
if (streamPublisherParams.publisherRef) {
160+
lastPublishingId = await this.connection.queryPublisherSequence({
161+
stream: streamPublisherParams.stream,
162+
publisherRef: streamPublisherParams.publisherRef,
163+
})
164+
}
165+
const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter)
160166
connection.onPublisherClosed(publisher.extendedId, params.stream, async () => {
161167
await publisher.close(false)
162168
this.publishers.delete(publisher.extendedId)
@@ -740,7 +746,6 @@ export interface ClientParams {
740746
export interface DeclarePublisherParams {
741747
stream: string
742748
publisherRef?: string
743-
boot?: boolean
744749
maxChunkLength?: number
745750
connectionClosedListener?: ConnectionClosedListener
746751
}

src/publisher.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export interface MessageOptions {
7070
messageProperties?: MessageProperties
7171
applicationProperties?: Record<string, string | number>
7272
messageAnnotations?: Record<string, MessageAnnotationsValue>
73+
publishingId?: bigint
7374
}
7475

7576
export const computeExtendedPublisherId = (publisherId: number, connectionId: string) => {
@@ -100,7 +101,6 @@ export class StreamPublisher implements Publisher {
100101
private stream: string
101102
readonly publisherId: number
102103
protected publisherRef: string
103-
private boot: boolean
104104
private publishingId: bigint
105105
private maxFrameSize: number
106106
private queue: PublishRequestMessage[]
@@ -115,19 +115,18 @@ export class StreamPublisher implements Publisher {
115115
stream: string
116116
publisherId: number
117117
publisherRef?: string
118-
boot?: boolean
119118
maxFrameSize: number
120119
maxChunkLength?: number
121120
logger: Logger
122121
},
122+
publishingId: bigint,
123123
private readonly filter?: FilterFunc
124124
) {
125125
this.connection = params.connection
126126
this.stream = params.stream
127127
this.publisherId = params.publisherId
128128
this.publisherRef = params.publisherRef || ""
129-
this.boot = params.boot || false
130-
this.publishingId = params.boot ? -1n : 0n
129+
this.publishingId = publishingId
131130
this.maxFrameSize = params.maxFrameSize || 1048576
132131
this.queue = []
133132
this.scheduled = null
@@ -144,12 +143,19 @@ export class StreamPublisher implements Publisher {
144143
if (this._closed) {
145144
throw new Error(`Publisher has been closed`)
146145
}
147-
if (this.boot && this.publishingId === -1n) {
148-
this.publishingId = await this.getLastPublishingId()
146+
147+
let publishingIdToSend: bigint
148+
if (this.publisherRef && opts.publishingId) {
149+
publishingIdToSend = opts.publishingId
150+
if (opts.publishingId > this.publishingId) {
151+
this.publishingId = opts.publishingId
152+
}
153+
} else {
154+
this.publishingId = this.publishingId + 1n
155+
publishingIdToSend = this.publishingId
149156
}
150-
this.publishingId = this.publishingId + 1n
151157

152-
return await this.basicSend(this.publishingId, message, opts)
158+
return await this.basicSend(publishingIdToSend, message, opts)
153159
}
154160

155161
async basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}): Promise<SendResult> {

test/e2e/basic_publish.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ describe("publish a message", () => {
2020
client = await createClient(username, password, undefined, maxFrameSize, bufferSizeSettings)
2121
streamName = createStreamName()
2222
await rabbit.createStream(streamName)
23-
publisher = await createPublisher(streamName, client)
23+
publisher = await createPublisher(streamName, client, true)
2424
})
2525

2626
afterEach(async () => {

test/support/fake_data.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,14 @@ export function createConsumerRef(): string {
3434
return `my-consumer-${randomUUID()}`
3535
}
3636

37-
export async function createPublisher(streamName: string, client: Client): Promise<Publisher> {
37+
export async function createPublisher(
38+
streamName: string,
39+
client: Client,
40+
deduplication: Boolean = false
41+
): Promise<Publisher> {
3842
const publisher = await client.declarePublisher({
3943
stream: streamName,
40-
publisherRef: `my-publisher-${randomUUID()}`,
44+
...(deduplication && { publisherRef: `my-publisher-${randomUUID()}` }),
4145
})
4246
return publisher
4347
}

test/unit/publisher.test.ts

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ describe("Publisher", () => {
2323

2424
afterEach(() => rabbit.deleteStream(testStreamName))
2525

26-
it("increase publishing id from server when boot is true", async () => {
26+
it("increase publishing id from server when publisherRef is defined and publishing id is not set (deduplication active)", async () => {
2727
const oldClient = await createClient(username, password)
2828
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef })
2929
const oldMessages = [...Array(3).keys()]
@@ -32,27 +32,88 @@ describe("Publisher", () => {
3232
await oldClient.close()
3333
const newClient = await createClient(username, password)
3434

35-
const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: true })
35+
const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef })
3636
await newPublisher.send(Buffer.from(`test${randomUUID()}`))
3737
await newPublisher.flush()
3838

3939
expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n)
4040
await newClient.close()
4141
}).timeout(10000)
4242

43-
it("do not increase publishing id from server when boot is false", async () => {
43+
it("increase publishing id from server when publisherRef is defined and publishing id is set (deduplication active)", async () => {
4444
const oldClient = await createClient(username, password)
4545
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef })
4646
const oldMessages = [...Array(3).keys()]
47+
await Promise.all(
48+
oldMessages.map((_, index) =>
49+
oldPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(index + 1) })
50+
)
51+
)
52+
await oldPublisher.flush()
53+
await oldClient.close()
54+
const newClient = await createClient(username, password)
55+
56+
const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef })
57+
await newPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(4) })
58+
await newPublisher.flush()
59+
60+
expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n)
61+
await newClient.close()
62+
}).timeout(10000)
63+
64+
it("should not increase publishing id when publishRef is defined and publishing two messages with same id (deduplication active)", async () => {
65+
const client = await createClient(username, password)
66+
const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef })
67+
68+
const publishingId = BigInt(1)
69+
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId })
70+
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId })
71+
await publisher.flush()
72+
73+
expect(await publisher.getLastPublishingId()).eql(publishingId)
74+
await client.close()
75+
}).timeout(10000)
76+
77+
it("should auto increment publishing id when publishing id is not passed from outside (deduplication active)", async () => {
78+
const client = await createClient(username, password)
79+
const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef })
80+
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 1n })
81+
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 2n })
82+
await publisher.send(Buffer.from(`test${randomUUID()}`))
83+
await publisher.send(Buffer.from(`test${randomUUID()}`))
84+
await publisher.flush()
85+
86+
expect(await publisher.getLastPublishingId()).eql(4n)
87+
await client.close()
88+
})
89+
90+
it("should set latest publishing id when passing it from outside (deduplication active)", async () => {
91+
const client = await createClient(username, password)
92+
const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef })
93+
await publisher.send(Buffer.from(`test${randomUUID()}`))
94+
await publisher.send(Buffer.from(`test${randomUUID()}`))
95+
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 3n })
96+
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 4n })
97+
await publisher.flush()
98+
99+
expect(await publisher.getLastPublishingId()).eql(4n)
100+
await client.close()
101+
})
102+
103+
it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => {
104+
const oldClient = await createClient(username, password)
105+
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName })
106+
const oldMessages = [...Array(3).keys()]
47107
await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`))))
48108
await oldPublisher.flush()
49109
await oldClient.close()
50110
const newClient = await createClient(username, password)
51111

52-
const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: false })
112+
const newPublisher = await newClient.declarePublisher({ stream: testStreamName })
53113
await newPublisher.send(Buffer.from(`test${randomUUID()}`))
114+
await newPublisher.flush()
54115

55-
expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length))
116+
expect(await newPublisher.getLastPublishingId()).eql(BigInt(0))
56117
await newClient.close()
57118
}).timeout(10000)
58119

0 commit comments

Comments
 (0)