diff --git a/README.md b/README.md index 8eaec3da..c6f77f07 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,39 @@ await publisher.send(Buffer.from("my message content")) await client.close() ``` +### Deduplication + +If you want to make sure that a given message isn't sent more than once, you must use deduplication. +To create a publisher with deduplication, you just need to pass `publisherRef` to the `declarePublisher` function, this way RabbitMQ will detect messages with lower ids and discard them. +Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications. +Note: The server does not control the publisherRef across the producers. It's the user's responsibility to guarantee that. + +You can publish messages either defining a `publishingId` or not: +In the first case you call the `send` function with `publishingId` defined inside the `MessageOptions`. It's the users responsability to guarantee a valid `publishingId`. +In the latter case you just call `send` and the publisher will use the next valid `publishingId`. + +```typescript +const client = await connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", +}) + +const deduplicationPublisher = await client.declarePublisher({ + stream: "stream-name", + publisherRef: "my-publisher", +}) + +await deduplicationPublisher.send(Buffer.from("my message content"), { publishingId: 5n }) //here we are passing 5 as publishingId, the message will be sent only if the last publishingId was lower +await deduplicationPublisher.send(Buffer.from("my message content")) //here we are not passing any publishingId, the publisher will use the next valid publishingId + +// ... + +await client.close() +``` + ### Sub Batch Entry Publishing ```typescript @@ -211,18 +244,18 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message ``` ### Custom Policy + By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing. ```typescript const consumerOptions = { stream: "stream-name", - creditPolicy: creditsOnChunkReceived(2, 1) + creditPolicy: creditsOnChunkReceived(2, 1), } await client.declareConsumer(consumerOptions, async (message: Message) => { - console.log(message.content) - } -) + console.log(message.content) +}) ``` ### Clustering diff --git a/example/package-lock.json b/example/package-lock.json index 45471ad0..3ff1abc6 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.5.0", + "version": "0.5.1", "license": "ISC", "dependencies": { "semver": "^7.5.4" diff --git a/example/src/deduplication_example.js b/example/src/deduplication_example.js new file mode 100644 index 00000000..197fb2d2 --- /dev/null +++ b/example/src/deduplication_example.js @@ -0,0 +1,63 @@ +/* + Run this example only with rabbit management version >= 3.13.0. +*/ + +const rabbit = require("rabbitmq-stream-js-client") +const { randomUUID } = require("crypto") + +const rabbitUser = process.env.RABBITMQ_USER || "rabbit" +const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit" + +async function main() { + const streamName = `example-${randomUUID()}` + const publisherRef = `publisher-${randomUUID()}` + console.log(`Creating stream ${streamName}`) + + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: rabbitUser, + password: rabbitPassword, + vhost: "/", + heartbeat: 0, + }) + await client.createStream({ stream: streamName }) + + //to declare a publisher with deduplication enabled, you need to set a publisherRef + const firstDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) + + //with deduplication actived, you can send messages without a publishingId; in this case it will be incremental + await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) //publishingId = 1 + await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) //publishingId = 2 + //but you can also set a publishingId, note that it must be greater than the last one for the message to be stored + await firstDeduplicationPublisher.send(Buffer.from("Test message 3"), { publishingId: 3n }) //publishingId = 3 + //if you choose a publishingId that is less than the last one, the message will not be stored + await firstDeduplicationPublisher.send(Buffer.from("Test message 4"), { publishingId: 1n }) //this message won't be stored + await firstDeduplicationPublisher.flush() + const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId() + await firstDeduplicationPublisher.close() + + console.log(`Publishing id is ${firstPublisherPublishingId} (must be 3)`) //this must be the greatest publishingId, 3 in this case + + const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef }) + //with the second publisher if we try to send messages with lower publishingId than the last one, they will not be stored + await secondDeduplicationPublisher.send(Buffer.from("Test message 5"), { publishingId: 1n }) //won't be stored + await secondDeduplicationPublisher.send(Buffer.from("Test message 6"), { publishingId: 2n }) //won't be stored + await secondDeduplicationPublisher.send(Buffer.from("Test message 7"), { publishingId: 7n }) //this will be stored since 7 is greater than 3, the last highest publishingId + await secondDeduplicationPublisher.flush() + const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId() + await secondDeduplicationPublisher.close() + + console.log(`Publishing id is ${secondPublisherPublishingId} (must be 7)`) //this must be the greatest publishingId, 7 in this case + + await client.deleteStream({ stream: streamName }) + + await client.close() +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("ERROR ", res) + process.exit(-1) + }) diff --git a/src/client.ts b/src/client.ts index f0042d3c..2ac42865 100644 --- a/src/client.ts +++ b/src/client.ts @@ -151,12 +151,18 @@ export class Client { stream: params.stream, publisherId: publisherId, publisherRef: params.publisherRef, - boot: params.boot, maxFrameSize: this.maxFrameSize, maxChunkLength: params.maxChunkLength, logger: this.logger, } - const publisher = new StreamPublisher(streamPublisherParams, filter) + let lastPublishingId = 0n + if (streamPublisherParams.publisherRef) { + lastPublishingId = await this.connection.queryPublisherSequence({ + stream: streamPublisherParams.stream, + publisherRef: streamPublisherParams.publisherRef, + }) + } + const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter) connection.onPublisherClosed(publisher.extendedId, params.stream, async () => { await publisher.close(false) this.publishers.delete(publisher.extendedId) @@ -740,7 +746,6 @@ export interface ClientParams { export interface DeclarePublisherParams { stream: string publisherRef?: string - boot?: boolean maxChunkLength?: number connectionClosedListener?: ConnectionClosedListener } diff --git a/src/publisher.ts b/src/publisher.ts index 3d9b3476..4e8d7825 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -70,6 +70,7 @@ export interface MessageOptions { messageProperties?: MessageProperties applicationProperties?: Record messageAnnotations?: Record + publishingId?: bigint } export const computeExtendedPublisherId = (publisherId: number, connectionId: string) => { @@ -100,7 +101,6 @@ export class StreamPublisher implements Publisher { private stream: string readonly publisherId: number protected publisherRef: string - private boot: boolean private publishingId: bigint private maxFrameSize: number private queue: PublishRequestMessage[] @@ -115,19 +115,18 @@ export class StreamPublisher implements Publisher { stream: string publisherId: number publisherRef?: string - boot?: boolean maxFrameSize: number maxChunkLength?: number logger: Logger }, + publishingId: bigint, private readonly filter?: FilterFunc ) { this.connection = params.connection this.stream = params.stream this.publisherId = params.publisherId this.publisherRef = params.publisherRef || "" - this.boot = params.boot || false - this.publishingId = params.boot ? -1n : 0n + this.publishingId = publishingId this.maxFrameSize = params.maxFrameSize || 1048576 this.queue = [] this.scheduled = null @@ -144,12 +143,19 @@ export class StreamPublisher implements Publisher { if (this._closed) { throw new Error(`Publisher has been closed`) } - if (this.boot && this.publishingId === -1n) { - this.publishingId = await this.getLastPublishingId() + + let publishingIdToSend: bigint + if (this.publisherRef && opts.publishingId) { + publishingIdToSend = opts.publishingId + if (opts.publishingId > this.publishingId) { + this.publishingId = opts.publishingId + } + } else { + this.publishingId = this.publishingId + 1n + publishingIdToSend = this.publishingId } - this.publishingId = this.publishingId + 1n - return await this.basicSend(this.publishingId, message, opts) + return await this.basicSend(publishingIdToSend, message, opts) } async basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}): Promise { diff --git a/test/e2e/basic_publish.test.ts b/test/e2e/basic_publish.test.ts index 57228845..e89fa374 100644 --- a/test/e2e/basic_publish.test.ts +++ b/test/e2e/basic_publish.test.ts @@ -20,7 +20,7 @@ describe("publish a message", () => { client = await createClient(username, password, undefined, maxFrameSize, bufferSizeSettings) streamName = createStreamName() await rabbit.createStream(streamName) - publisher = await createPublisher(streamName, client) + publisher = await createPublisher(streamName, client, true) }) afterEach(async () => { diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index a3eebf06..55658afd 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -34,10 +34,14 @@ export function createConsumerRef(): string { return `my-consumer-${randomUUID()}` } -export async function createPublisher(streamName: string, client: Client): Promise { +export async function createPublisher( + streamName: string, + client: Client, + deduplication: Boolean = false +): Promise { const publisher = await client.declarePublisher({ stream: streamName, - publisherRef: `my-publisher-${randomUUID()}`, + ...(deduplication && { publisherRef: `my-publisher-${randomUUID()}` }), }) return publisher } diff --git a/test/unit/publisher.test.ts b/test/unit/publisher.test.ts index 4d6c3dbb..bc38a435 100644 --- a/test/unit/publisher.test.ts +++ b/test/unit/publisher.test.ts @@ -23,7 +23,7 @@ describe("Publisher", () => { afterEach(() => rabbit.deleteStream(testStreamName)) - it("increase publishing id from server when boot is true", async () => { + it("increase publishing id from server when publisherRef is defined and publishing id is not set (deduplication active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] @@ -32,7 +32,7 @@ describe("Publisher", () => { await oldClient.close() const newClient = await createClient(username, password) - const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: true }) + const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef }) await newPublisher.send(Buffer.from(`test${randomUUID()}`)) await newPublisher.flush() @@ -40,19 +40,80 @@ describe("Publisher", () => { await newClient.close() }).timeout(10000) - it("do not increase publishing id from server when boot is false", async () => { + it("increase publishing id from server when publisherRef is defined and publishing id is set (deduplication active)", async () => { const oldClient = await createClient(username, password) const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef }) const oldMessages = [...Array(3).keys()] + await Promise.all( + oldMessages.map((_, index) => + oldPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(index + 1) }) + ) + ) + await oldPublisher.flush() + await oldClient.close() + const newClient = await createClient(username, password) + + const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef }) + await newPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(4) }) + await newPublisher.flush() + + expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n) + await newClient.close() + }).timeout(10000) + + it("should not increase publishing id when publishRef is defined and publishing two messages with same id (deduplication active)", async () => { + const client = await createClient(username, password) + const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) + + const publishingId = BigInt(1) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId }) + await publisher.flush() + + expect(await publisher.getLastPublishingId()).eql(publishingId) + await client.close() + }).timeout(10000) + + it("should auto increment publishing id when publishing id is not passed from outside (deduplication active)", async () => { + const client = await createClient(username, password) + const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 1n }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 2n }) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.flush() + + expect(await publisher.getLastPublishingId()).eql(4n) + await client.close() + }) + + it("should set latest publishing id when passing it from outside (deduplication active)", async () => { + const client = await createClient(username, password) + const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef }) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.send(Buffer.from(`test${randomUUID()}`)) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 3n }) + await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 4n }) + await publisher.flush() + + expect(await publisher.getLastPublishingId()).eql(4n) + await client.close() + }) + + it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => { + const oldClient = await createClient(username, password) + const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName }) + const oldMessages = [...Array(3).keys()] await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`)))) await oldPublisher.flush() await oldClient.close() const newClient = await createClient(username, password) - const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: false }) + const newPublisher = await newClient.declarePublisher({ stream: testStreamName }) await newPublisher.send(Buffer.from(`test${randomUUID()}`)) + await newPublisher.flush() - expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length)) + expect(await newPublisher.getLastPublishingId()).eql(BigInt(0)) await newClient.close() }).timeout(10000)