From e1243b37adb49c495429b5f8cab8c13857c08a65 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 11:54:25 +0100 Subject: [PATCH 01/19] removed boot param --- src/client.ts | 2 -- src/publisher.ts | 7 ++----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/client.ts b/src/client.ts index f0042d3c..32d5b497 100644 --- a/src/client.ts +++ b/src/client.ts @@ -151,7 +151,6 @@ export class Client { stream: params.stream, publisherId: publisherId, publisherRef: params.publisherRef, - boot: params.boot, maxFrameSize: this.maxFrameSize, maxChunkLength: params.maxChunkLength, logger: this.logger, @@ -740,7 +739,6 @@ export interface ClientParams { export interface DeclarePublisherParams { stream: string publisherRef?: string - boot?: boolean maxChunkLength?: number connectionClosedListener?: ConnectionClosedListener } diff --git a/src/publisher.ts b/src/publisher.ts index 3d9b3476..ae2b3389 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -100,7 +100,6 @@ export class StreamPublisher implements Publisher { private stream: string readonly publisherId: number protected publisherRef: string - private boot: boolean private publishingId: bigint private maxFrameSize: number private queue: PublishRequestMessage[] @@ -115,7 +114,6 @@ export class StreamPublisher implements Publisher { stream: string publisherId: number publisherRef?: string - boot?: boolean maxFrameSize: number maxChunkLength?: number logger: Logger @@ -126,8 +124,7 @@ export class StreamPublisher implements Publisher { this.stream = params.stream this.publisherId = params.publisherId this.publisherRef = params.publisherRef || "" - this.boot = params.boot || false - this.publishingId = params.boot ? -1n : 0n + this.publishingId = params.publisherRef ? -1n : 0n this.maxFrameSize = params.maxFrameSize || 1048576 this.queue = [] this.scheduled = null @@ -144,7 +141,7 @@ export class StreamPublisher implements Publisher { if (this._closed) { throw new Error(`Publisher has been closed`) } - if (this.boot && this.publishingId === -1n) { + if (this.publisherRef && this.publishingId === -1n) { this.publishingId = await this.getLastPublishingId() } this.publishingId = this.publishingId + 1n From 5c51fc010a96dde4811e6ae2103b190a88edc414 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 12:02:05 +0100 Subject: [PATCH 02/19] removed boot from tests --- test/unit/publisher.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 4d6c3dbb..2a55dd50 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -23,7 +23,7 @@ describe("Publisher", () => { afterEach(() => rabbit.deleteStream(testStreamName)) - it("increase publishing id from server when boot is true", async () => { + it("increase publishing id from server when publisherRef is defined (deduplication active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] @@ -32,7 +32,7 @@ describe("Publisher", () => { await oldClient.close() const newClient = await createClient(username, password) - const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: true }) + const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef }) await newPublisher.send(Buffer.from(`test${randomUUID()}`)) await newPublisher.flush() @@ -40,7 +40,7 @@ describe("Publisher", () => { await newClient.close() }).timeout(10000) - it("do not increase publishing id from server when boot is false", async () => { + it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] @@ -49,7 +49,7 @@ describe("Publisher", () => { await oldClient.close() const newClient = await createClient(username, password) - const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: false }) + const newPublisher = await newClient.declarePublisher({ stream: testStreamName }) await newPublisher.send(Buffer.from(`test${randomUUID()}`)) expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length)) From 8379808c2b7c686ee7caba48cce44affa0461691 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 14:12:20 +0100 Subject: [PATCH 03/19] conditionally activate deduplication in tests --- test/support/fake_data.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index a3eebf06..55658afd 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -34,10 +34,14 @@ export function createConsumerRef(): string { return `my-consumer-${randomUUID()}` } -export async function createPublisher(streamName: string, client: Client): Promise { +export async function createPublisher( + streamName: string, + client: Client, + deduplication: Boolean = false +): Promise { const publisher = await client.declarePublisher({ stream: streamName, - publisherRef: `my-publisher-${randomUUID()}`, + ...(deduplication && { publisherRef: `my-publisher-${randomUUID()}` }), }) return publisher } From 4983d86be488743a1455e602a58b8e959532cb64 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 14:13:05 +0100 Subject: [PATCH 04/19] activate deduplication for deduplication tests --- test/e2e/basic_publish.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/basic_publish.test.ts b/test/e2e/basic_publish.test.ts index 57228845..e89fa374 100644 --- a/test/e2e/basic_publish.test.ts +++ b/test/e2e/basic_publish.test.ts @@ -20,7 +20,7 @@ describe("publish a message", () => { client = await createClient(username, password, undefined, maxFrameSize, bufferSizeSettings) streamName = createStreamName() await rabbit.createStream(streamName) - publisher = await createPublisher(streamName, client) + publisher = await createPublisher(streamName, client, true) }) afterEach(async () => { From 778eba45054632d655a919a58ca564aceebc85bd Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 14:46:41 +0100 Subject: [PATCH 05/19] race condition --- test/unit/publisher.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 2a55dd50..f8852dce 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -103,7 +103,10 @@ describe("Publisher", () => { }) const msgs = [Buffer.from([1]), Buffer.from([2])] - const result = await Promise.all(msgs.map((msg) => publisher.send(msg, {}))) + const result = [] + for (const msg of msgs) { + result.push(await publisher.send(msg, {})) + } expect(result[0].sent).is.false expect(result[1].sent).is.true From ffb0ca8595b28ac50021ee8f54b6e7c3aea865ff Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 15:55:44 +0100 Subject: [PATCH 06/19] rollback race condition --- test/unit/publisher.test.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index f8852dce..2a55dd50 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -103,10 +103,7 @@ describe("Publisher", () => { }) const msgs = [Buffer.from([1]), Buffer.from([2])] - const result = [] - for (const msg of msgs) { - result.push(await publisher.send(msg, {})) - } + const result = await Promise.all(msgs.map((msg) => publisher.send(msg, {}))) expect(result[0].sent).is.false expect(result[1].sent).is.true From 7d07c29fba1c037af72b644af3511021f0c7cc32 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 16:29:59 +0100 Subject: [PATCH 07/19] fix test --- test/unit/publisher.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 2a55dd50..9a24617c 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -42,7 +42,7 @@ describe("Publisher", () => { it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => { const oldClient = await createClient(username, password) - const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) + const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName }) const oldMessages = [...Array(3).keys()] await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`)))) await oldPublisher.flush() @@ -51,8 +51,9 @@ describe("Publisher", () => { const newPublisher = await newClient.declarePublisher({ stream: testStreamName }) await newPublisher.send(Buffer.from(`test${randomUUID()}`)) + await newPublisher.flush() - expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length)) + expect(await newPublisher.getLastPublishingId()).eql(BigInt(0)) await newClient.close() }).timeout(10000) From 3b25ea0f79707dafef486fa3f7f36a18fb37327c Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 17:09:49 +0100 Subject: [PATCH 08/19] readme --- README.md | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8eaec3da..5af023e7 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,34 @@ await publisher.send(Buffer.from("my message content")) await client.close() ``` +### Deduplication + +If you want to make sure that a given message isn't sent more than once, you must use deduplication. +To activate the deduplication, you just need to pass `publisherRef` to the `declarePublisher` function. +This way, RabbitMQ will detect messages with the same id and filter them. +Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications. + +```typescript +const client = await connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", +}) + +const deduplicationPublisher = await client.declarePublisher({ + stream: "stream-name", + publisherRef: "my-publisher", +}) + +await deduplicationPublisher.send(Buffer.from("my message content")) + +// ... + +await client.close() +``` + ### Sub Batch Entry Publishing ```typescript @@ -211,18 +239,18 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message ``` ### Custom Policy + By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing. ```typescript const consumerOptions = { stream: "stream-name", - creditPolicy: creditsOnChunkReceived(2, 1) + creditPolicy: creditsOnChunkReceived(2, 1), } await client.declareConsumer(consumerOptions, async (message: Message) => { - console.log(message.content) - } -) + console.log(message.content) +}) ``` ### Clustering From b6588bb8a8f5e75b9fbd778fbbd4abfed3ab4c91 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 17:35:07 +0100 Subject: [PATCH 09/19] fix send concurrency --- src/client.ts | 1 + src/publisher.ts | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index 32d5b497..a311562e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -156,6 +156,7 @@ export class Client { logger: this.logger, } const publisher = new StreamPublisher(streamPublisherParams, filter) + await publisher.init() connection.onPublisherClosed(publisher.extendedId, params.stream, async () => { await publisher.close(false) this.publishers.delete(publisher.extendedId) diff --git a/src/publisher.ts b/src/publisher.ts index ae2b3389..daa9391c 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -77,6 +77,7 @@ export const computeExtendedPublisherId = (publisherId: number, connectionId: st } export interface Publisher { + init(): Promise send(message: Buffer, opts?: MessageOptions): Promise basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise flush(): Promise @@ -137,12 +138,18 @@ export class StreamPublisher implements Publisher { return this._closed } + async init(): Promise { + if (this.publisherRef && this.publishingId === -1n) { + this.publishingId = await this.getLastPublishingId() + } + } + async send(message: Buffer, opts: MessageOptions = {}): Promise { if (this._closed) { throw new Error(`Publisher has been closed`) } if (this.publisherRef && this.publishingId === -1n) { - this.publishingId = await this.getLastPublishingId() + throw new Error(`Please initialize the publisher before sending messages`) } this.publishingId = this.publishingId + 1n From 5e21307ef2676359d72a1acee8f8aff68afb9f13 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Thu, 30 Jan 2025 18:00:32 +0100 Subject: [PATCH 10/19] added example --- example/src/deduplication_example.js | 57 ++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 example/src/deduplication_example.js diff --git a/example/src/deduplication_example.js b/example/src/deduplication_example.js new file mode 100644 index 00000000..d6661db6 --- /dev/null +++ b/example/src/deduplication_example.js @@ -0,0 +1,57 @@ +/* + Run this example only with rabbit management version >= 3.13.0. +*/ + +const rabbit = require("rabbitmq-stream-js-client") +const { randomUUID } = require("crypto") + +const rabbitUser = process.env.RABBITMQ_USER || "rabbit" +const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + +async function main() { + const streamName = `example-${randomUUID()}` + const publisherRef = `publisher-${randomUUID()}` + console.log(`Creating stream ${streamName}`) + + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: rabbitUser, + password: rabbitPassword, + vhost: "/", + heartbeat: 0, + }) + await client.createStream({ stream: streamName }) + await sleep(200) + + const firstDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) + await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) + await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) + await firstDeduplicationPublisher.send(Buffer.from("Test message 3")) + await firstDeduplicationPublisher.flush() + const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId() + await firstDeduplicationPublisher.close() + + console.log(`Publishing id is ${firstPublisherPublishingId}`) + + const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) + await secondDeduplicationPublisher.send(Buffer.from("Test message 1")) + await secondDeduplicationPublisher.send(Buffer.from("Test message 2")) + await secondDeduplicationPublisher.flush() + const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId() + await secondDeduplicationPublisher.close() + + console.log(`Publishing id is still ${secondPublisherPublishingId} (same as first ${firstPublisherPublishingId})`) + + await client.deleteStream({ stream: streamName }) + + await client.close() +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("ERROR ", res) + process.exit(-1) + }) +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)) From cfb1ce038d92df7d4ca4f40577efc63081509f62 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 12:14:05 +0100 Subject: [PATCH 11/19] new deduplication implementation --- src/client.ts | 10 ++++++++-- src/publisher.ts | 26 ++++++++++++++------------ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/client.ts b/src/client.ts index a311562e..2ac42865 100644 --- a/src/client.ts +++ b/src/client.ts @@ -155,8 +155,14 @@ export class Client { maxChunkLength: params.maxChunkLength, logger: this.logger, } - const publisher = new StreamPublisher(streamPublisherParams, filter) - await publisher.init() + let lastPublishingId = 0n + if (streamPublisherParams.publisherRef) { + lastPublishingId = await this.connection.queryPublisherSequence({ + stream: streamPublisherParams.stream, + publisherRef: streamPublisherParams.publisherRef, + }) + } + const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter) connection.onPublisherClosed(publisher.extendedId, params.stream, async () => { await publisher.close(false) this.publishers.delete(publisher.extendedId) diff --git a/src/publisher.ts b/src/publisher.ts index daa9391c..42370057 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -70,6 +70,7 @@ export interface MessageOptions { messageProperties?: MessageProperties applicationProperties?: Record messageAnnotations?: Record + publishingId?: bigint } export const computeExtendedPublisherId = (publisherId: number, connectionId: string) => { @@ -77,7 +78,6 @@ export const computeExtendedPublisherId = (publisherId: number, connectionId: st } export interface Publisher { - init(): Promise send(message: Buffer, opts?: MessageOptions): Promise basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise flush(): Promise @@ -119,13 +119,14 @@ export class StreamPublisher implements Publisher { maxChunkLength?: number logger: Logger }, + publishingId: bigint, private readonly filter?: FilterFunc ) { this.connection = params.connection this.stream = params.stream this.publisherId = params.publisherId this.publisherRef = params.publisherRef || "" - this.publishingId = params.publisherRef ? -1n : 0n + this.publishingId = publishingId this.maxFrameSize = params.maxFrameSize || 1048576 this.queue = [] this.scheduled = null @@ -138,22 +139,23 @@ export class StreamPublisher implements Publisher { return this._closed } - async init(): Promise { - if (this.publisherRef && this.publishingId === -1n) { - this.publishingId = await this.getLastPublishingId() - } - } - async send(message: Buffer, opts: MessageOptions = {}): Promise { if (this._closed) { throw new Error(`Publisher has been closed`) } - if (this.publisherRef && this.publishingId === -1n) { - throw new Error(`Please initialize the publisher before sending messages`) + + let localPublishingId = 0n + if (this.publisherRef && opts.publishingId) { + localPublishingId = opts.publishingId + if (opts.publishingId > this.publishingId) { + this.publishingId = opts.publishingId + } + } else { + this.publishingId = this.publishingId + 1n + localPublishingId = this.publishingId } - this.publishingId = this.publishingId + 1n - return await this.basicSend(this.publishingId, message, opts) + return await this.basicSend(localPublishingId, message, opts) } async basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}): Promise { From c4c8bc242b66442bcb0b46c22c13581eb1686f65 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 13:57:10 +0100 Subject: [PATCH 12/19] added tests --- test/unit/publisher.test.ts | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 9a24617c..b77a5620 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -23,7 +23,7 @@ describe("Publisher", () => { afterEach(() => rabbit.deleteStream(testStreamName)) - it("increase publishing id from server when publisherRef is defined (deduplication active)", async () => { + it("increase publishing id from server when publisherRef is defined and publishing id is not set (deduplication active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] @@ -40,6 +40,40 @@ describe("Publisher", () => { await newClient.close() }).timeout(10000) + it("increase publishing id from server when publisherRef is defined and publishing id is set (deduplication active)", async () => { + const oldClient = await createClient(username, password) + const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) + const oldMessages = [...Array(3).keys()] + await Promise.all( + oldMessages.map((_, index) => + oldPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(index + 1) }) + ) + ) + await oldPublisher.flush() + await oldClient.close() + const newClient = await createClient(username, password) + + const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef }) + await newPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(4) }) + await newPublisher.flush() + + expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n) + await newClient.close() + }).timeout(10000) + + it("should not increase publishing id when publishRef is defined and publishing two messages with same id (deduplication active)", async () => { + const client = await createClient(username, password) + const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) + + const publishingId = BigInt(1) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId }) + await publisher.flush() + + expect(await publisher.getLastPublishingId()).eql(publishingId) + await client.close() + }).timeout(10000) + it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName }) From 7f89341d584b578e7f2287cacf3379d2b09e587b Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 14:29:16 +0100 Subject: [PATCH 13/19] more tests --- test/unit/publisher.test.ts | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index b77a5620..085082c8 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -74,6 +74,32 @@ describe("Publisher", () => { await client.close() }).timeout(10000) + it("should autoincrement publishing id when publishing id is not passed from outside (deduplication active)", async () => { + const client = await createClient(username, password) + const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 1n }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 2n }) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.flush() + + expect(await publisher.getLastPublishingId()).eql(4n) + await client.close() + }) + + it("should set latest publishing id when passing it from outside (deduplication active)", async () => { + const client = await createClient(username, password) + const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 3n }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 4n }) + await publisher.flush() + + expect(await publisher.getLastPublishingId()).eql(4n) + await client.close() + }) + it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName }) From 90e5457151fb9fba4069cc35e7b0e0725fc0689d Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 15:04:16 +0100 Subject: [PATCH 14/19] example --- example/package-lock.json | 2 +- example/src/deduplication_example.js | 23 +++++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/example/package-lock.json b/example/package-lock.json index 45471ad0..3ff1abc6 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.5.0", + "version": "0.5.1", "license": "ISC", "dependencies": { "semver": "^7.5.4" diff --git a/example/src/deduplication_example.js b/example/src/deduplication_example.js index d6661db6..86f94a3e 100644 --- a/example/src/deduplication_example.js +++ b/example/src/deduplication_example.js @@ -22,26 +22,33 @@ async function main() { heartbeat: 0, }) await client.createStream({ stream: streamName }) - await sleep(200) + //to declare a publisher with deduplication enabled, you need to set a publisherRef const firstDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) - await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) - await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) - await firstDeduplicationPublisher.send(Buffer.from("Test message 3")) + + //with deduplication actived, you can send messages without a publishingId; in this case it will be incremental + await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) //publishingId = 1 + await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) //publishingId = 2 + //but you can also set a publishingId, note that it must be greater than the last one for the message to be sent + await firstDeduplicationPublisher.send(Buffer.from("Test message 3"), { publishingId: 3n }) //publishingId = 3 + //if you choose a publishingId that is less than the last one, the message will not be sent + await firstDeduplicationPublisher.send(Buffer.from("Test message 4"), { publishingId: 1n }) //this message won't be sent await firstDeduplicationPublisher.flush() const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId() await firstDeduplicationPublisher.close() - console.log(`Publishing id is ${firstPublisherPublishingId}`) + console.log(`Publishing id is ${firstPublisherPublishingId} (must be 3)`) //this must be the greatest publishingId sent, 3 in this case const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) - await secondDeduplicationPublisher.send(Buffer.from("Test message 1")) - await secondDeduplicationPublisher.send(Buffer.from("Test message 2")) + //with the second publisher if we try to send messages with lower publishingId than the last one, they will not be sent + await secondDeduplicationPublisher.send(Buffer.from("Test message 5"), { publishingId: 1n }) //won't be sent + await secondDeduplicationPublisher.send(Buffer.from("Test message 6"), { publishingId: 2n }) //won't be sent + await secondDeduplicationPublisher.send(Buffer.from("Test message 7"), { publishingId: 7n }) //this will be sent since 7 is greater than 3, the last highest publishingId await secondDeduplicationPublisher.flush() const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId() await secondDeduplicationPublisher.close() - console.log(`Publishing id is still ${secondPublisherPublishingId} (same as first ${firstPublisherPublishingId})`) + console.log(`Publishing id is ${secondPublisherPublishingId} (must be 7)`) //this must be the greatest publishingId sent, 7 in this case await client.deleteStream({ stream: streamName }) From 15dc77a383aa37cebbe424adc8cc81f47dc85d37 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 15:41:48 +0100 Subject: [PATCH 15/19] typo --- test/unit/publisher.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 085082c8..bc38a435 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -74,7 +74,7 @@ describe("Publisher", () => { await client.close() }).timeout(10000) - it("should autoincrement publishing id when publishing id is not passed from outside (deduplication active)", async () => { + it("should auto increment publishing id when publishing id is not passed from outside (deduplication active)", async () => { const client = await createClient(username, password) const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 1n }) From e809078ca25040fe402ed39f07131456cfa165db Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 16:24:14 +0100 Subject: [PATCH 16/19] readme --- README.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5af023e7..d835008a 100644 --- a/README.md +++ b/README.md @@ -126,9 +126,13 @@ await client.close() ### Deduplication If you want to make sure that a given message isn't sent more than once, you must use deduplication. -To activate the deduplication, you just need to pass `publisherRef` to the `declarePublisher` function. -This way, RabbitMQ will detect messages with the same id and filter them. +To create a publisher with deduplication, you just need to pass `publisherRef` to the `declarePublisher` function, this way RabbitMQ will detect messages with lower ids and discard them. Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications. +It's also important to remember that the client doesn't control that the `publisherRef` is unique, it's the user's responsibility to guarantee that. + +You can publish messages either defining a `publishingId` or not: +In the first case you call the `send` function with `publishingId` defined inside the `MessageOptions`. It's the users responsability to guarantee a valid `publishingId`. +In the latter case you just call `send` and the publisher will use the next valid `publishingId`. ```typescript const client = await connect({ @@ -144,7 +148,8 @@ const deduplicationPublisher = await client.declarePublisher({ publisherRef: "my-publisher", }) -await deduplicationPublisher.send(Buffer.from("my message content")) +await deduplicationPublisher.send(Buffer.from("my message content"), { publishingId: 5n }) //here we are passing 5 as publishingId, the message will be sent only if the last publishingId was lower +await deduplicationPublisher.send(Buffer.from("my message content")) //here we are not passing any publishingId, the publisher will use the next valid publishingId // ... From 68abb5a14a4c65bca1747e276e121e8d04f3fbe4 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Fri, 31 Jan 2025 16:27:22 +0100 Subject: [PATCH 17/19] renamed variable --- src/publisher.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/publisher.ts b/src/publisher.ts index 42370057..4e8d7825 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -144,18 +144,18 @@ export class StreamPublisher implements Publisher { throw new Error(`Publisher has been closed`) } - let localPublishingId = 0n + let publishingIdToSend: bigint if (this.publisherRef && opts.publishingId) { - localPublishingId = opts.publishingId + publishingIdToSend = opts.publishingId if (opts.publishingId > this.publishingId) { this.publishingId = opts.publishingId } } else { this.publishingId = this.publishingId + 1n - localPublishingId = this.publishingId + publishingIdToSend = this.publishingId } - return await this.basicSend(localPublishingId, message, opts) + return await this.basicSend(publishingIdToSend, message, opts) } async basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}): Promise { From dda0766e3dd7544b59d09b8e845a7ba5a10f1f85 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Mon, 3 Feb 2025 12:12:17 +0100 Subject: [PATCH 18/19] replaced sent with stored --- example/src/deduplication_example.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/example/src/deduplication_example.js b/example/src/deduplication_example.js index 86f94a3e..4b6434ca 100644 --- a/example/src/deduplication_example.js +++ b/example/src/deduplication_example.js @@ -29,26 +29,26 @@ async function main() { //with deduplication actived, you can send messages without a publishingId; in this case it will be incremental await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) //publishingId = 1 await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) //publishingId = 2 - //but you can also set a publishingId, note that it must be greater than the last one for the message to be sent + //but you can also set a publishingId, note that it must be greater than the last one for the message to be stored await firstDeduplicationPublisher.send(Buffer.from("Test message 3"), { publishingId: 3n }) //publishingId = 3 - //if you choose a publishingId that is less than the last one, the message will not be sent - await firstDeduplicationPublisher.send(Buffer.from("Test message 4"), { publishingId: 1n }) //this message won't be sent + //if you choose a publishingId that is less than the last one, the message will not be stored + await firstDeduplicationPublisher.send(Buffer.from("Test message 4"), { publishingId: 1n }) //this message won't be stored await firstDeduplicationPublisher.flush() const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId() await firstDeduplicationPublisher.close() - console.log(`Publishing id is ${firstPublisherPublishingId} (must be 3)`) //this must be the greatest publishingId sent, 3 in this case + console.log(`Publishing id is ${firstPublisherPublishingId} (must be 3)`) //this must be the greatest publishingId, 3 in this case const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) - //with the second publisher if we try to send messages with lower publishingId than the last one, they will not be sent - await secondDeduplicationPublisher.send(Buffer.from("Test message 5"), { publishingId: 1n }) //won't be sent - await secondDeduplicationPublisher.send(Buffer.from("Test message 6"), { publishingId: 2n }) //won't be sent - await secondDeduplicationPublisher.send(Buffer.from("Test message 7"), { publishingId: 7n }) //this will be sent since 7 is greater than 3, the last highest publishingId + //with the second publisher if we try to send messages with lower publishingId than the last one, they will not be stored + await secondDeduplicationPublisher.send(Buffer.from("Test message 5"), { publishingId: 1n }) //won't be stored + await secondDeduplicationPublisher.send(Buffer.from("Test message 6"), { publishingId: 2n }) //won't be stored + await secondDeduplicationPublisher.send(Buffer.from("Test message 7"), { publishingId: 7n }) //this will be stored since 7 is greater than 3, the last highest publishingId await secondDeduplicationPublisher.flush() const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId() await secondDeduplicationPublisher.close() - console.log(`Publishing id is ${secondPublisherPublishingId} (must be 7)`) //this must be the greatest publishingId sent, 7 in this case + console.log(`Publishing id is ${secondPublisherPublishingId} (must be 7)`) //this must be the greatest publishingId, 7 in this case await client.deleteStream({ stream: streamName }) From 600f605097e40911986c6316604f23db1a98dffe Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Mon, 3 Feb 2025 14:17:16 +0100 Subject: [PATCH 19/19] updated readme --- README.md | 2 +- example/src/deduplication_example.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index d835008a..c6f77f07 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ await client.close() If you want to make sure that a given message isn't sent more than once, you must use deduplication. To create a publisher with deduplication, you just need to pass `publisherRef` to the `declarePublisher` function, this way RabbitMQ will detect messages with lower ids and discard them. Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications. -It's also important to remember that the client doesn't control that the `publisherRef` is unique, it's the user's responsibility to guarantee that. +Note: The server does not control the publisherRef across the producers. It's the user's responsibility to guarantee that. You can publish messages either defining a `publishingId` or not: In the first case you call the `send` function with `publishingId` defined inside the `MessageOptions`. It's the users responsability to guarantee a valid `publishingId`. diff --git a/example/src/deduplication_example.js b/example/src/deduplication_example.js index 4b6434ca..197fb2d2 100644 --- a/example/src/deduplication_example.js +++ b/example/src/deduplication_example.js @@ -61,4 +61,3 @@ main() console.log("ERROR ", res) process.exit(-1) }) -const sleep = (ms) => new Promise((r) => setTimeout(r, ms))