Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,39 @@ await publisher.send(Buffer.from("my message content"))
await client.close()
```

### Deduplication

If you want to make sure that a given message isn't sent more than once, you must use deduplication.
To create a publisher with deduplication, you just need to pass `publisherRef` to the `declarePublisher` function, this way RabbitMQ will detect messages with lower ids and discard them.
Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications.
Note: The server does not control the publisherRef across the producers. It's the user's responsibility to guarantee that.

You can publish messages either defining a `publishingId` or not:
In the first case you call the `send` function with `publishingId` defined inside the `MessageOptions`. It's the users responsability to guarantee a valid `publishingId`.
In the latter case you just call `send` and the publisher will use the next valid `publishingId`.

```typescript
const client = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
})

const deduplicationPublisher = await client.declarePublisher({
stream: "stream-name",
publisherRef: "my-publisher",
})

await deduplicationPublisher.send(Buffer.from("my message content"), { publishingId: 5n }) //here we are passing 5 as publishingId, the message will be sent only if the last publishingId was lower
await deduplicationPublisher.send(Buffer.from("my message content")) //here we are not passing any publishingId, the publisher will use the next valid publishingId

// ...

await client.close()
```

### Sub Batch Entry Publishing

```typescript
Expand Down Expand Up @@ -211,18 +244,18 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
```

### Custom Policy

By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing.

```typescript
const consumerOptions = {
stream: "stream-name",
creditPolicy: creditsOnChunkReceived(2, 1)
creditPolicy: creditsOnChunkReceived(2, 1),
}

await client.declareConsumer(consumerOptions, async (message: Message) => {
console.log(message.content)
}
)
console.log(message.content)
})
```

### Clustering
Expand Down
2 changes: 1 addition & 1 deletion example/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions example/src/deduplication_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Run this example only with rabbit management version >= 3.13.0.
*/

const rabbit = require("rabbitmq-stream-js-client")
const { randomUUID } = require("crypto")

const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"

async function main() {
const streamName = `example-${randomUUID()}`
const publisherRef = `publisher-${randomUUID()}`
console.log(`Creating stream ${streamName}`)

const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: rabbitUser,
password: rabbitPassword,
vhost: "/",
heartbeat: 0,
})
await client.createStream({ stream: streamName })

//to declare a publisher with deduplication enabled, you need to set a publisherRef
const firstDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef })

//with deduplication actived, you can send messages without a publishingId; in this case it will be incremental
await firstDeduplicationPublisher.send(Buffer.from("Test message 1")) //publishingId = 1
await firstDeduplicationPublisher.send(Buffer.from("Test message 2")) //publishingId = 2
//but you can also set a publishingId, note that it must be greater than the last one for the message to be stored
await firstDeduplicationPublisher.send(Buffer.from("Test message 3"), { publishingId: 3n }) //publishingId = 3
//if you choose a publishingId that is less than the last one, the message will not be stored
await firstDeduplicationPublisher.send(Buffer.from("Test message 4"), { publishingId: 1n }) //this message won't be stored
await firstDeduplicationPublisher.flush()
const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId()
await firstDeduplicationPublisher.close()

console.log(`Publishing id is ${firstPublisherPublishingId} (must be 3)`) //this must be the greatest publishingId, 3 in this case

const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef })
//with the second publisher if we try to send messages with lower publishingId than the last one, they will not be stored
await secondDeduplicationPublisher.send(Buffer.from("Test message 5"), { publishingId: 1n }) //won't be stored
await secondDeduplicationPublisher.send(Buffer.from("Test message 6"), { publishingId: 2n }) //won't be stored
await secondDeduplicationPublisher.send(Buffer.from("Test message 7"), { publishingId: 7n }) //this will be stored since 7 is greater than 3, the last highest publishingId
await secondDeduplicationPublisher.flush()
const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId()
await secondDeduplicationPublisher.close()

console.log(`Publishing id is ${secondPublisherPublishingId} (must be 7)`) //this must be the greatest publishingId, 7 in this case

await client.deleteStream({ stream: streamName })

await client.close()
}

main()
.then(() => console.log("done!"))
.catch((res) => {
console.log("ERROR ", res)
process.exit(-1)
})
11 changes: 8 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,18 @@ export class Client {
stream: params.stream,
publisherId: publisherId,
publisherRef: params.publisherRef,
boot: params.boot,
maxFrameSize: this.maxFrameSize,
maxChunkLength: params.maxChunkLength,
logger: this.logger,
}
const publisher = new StreamPublisher(streamPublisherParams, filter)
let lastPublishingId = 0n
if (streamPublisherParams.publisherRef) {
lastPublishingId = await this.connection.queryPublisherSequence({
stream: streamPublisherParams.stream,
publisherRef: streamPublisherParams.publisherRef,
})
}
const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter)
connection.onPublisherClosed(publisher.extendedId, params.stream, async () => {
await publisher.close(false)
this.publishers.delete(publisher.extendedId)
Expand Down Expand Up @@ -740,7 +746,6 @@ export interface ClientParams {
export interface DeclarePublisherParams {
stream: string
publisherRef?: string
boot?: boolean
maxChunkLength?: number
connectionClosedListener?: ConnectionClosedListener
}
Expand Down
22 changes: 14 additions & 8 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export interface MessageOptions {
messageProperties?: MessageProperties
applicationProperties?: Record<string, string | number>
messageAnnotations?: Record<string, MessageAnnotationsValue>
publishingId?: bigint
}

export const computeExtendedPublisherId = (publisherId: number, connectionId: string) => {
Expand Down Expand Up @@ -100,7 +101,6 @@ export class StreamPublisher implements Publisher {
private stream: string
readonly publisherId: number
protected publisherRef: string
private boot: boolean
private publishingId: bigint
private maxFrameSize: number
private queue: PublishRequestMessage[]
Expand All @@ -115,19 +115,18 @@ export class StreamPublisher implements Publisher {
stream: string
publisherId: number
publisherRef?: string
boot?: boolean
maxFrameSize: number
maxChunkLength?: number
logger: Logger
},
publishingId: bigint,
private readonly filter?: FilterFunc
) {
this.connection = params.connection
this.stream = params.stream
this.publisherId = params.publisherId
this.publisherRef = params.publisherRef || ""
this.boot = params.boot || false
this.publishingId = params.boot ? -1n : 0n
this.publishingId = publishingId
this.maxFrameSize = params.maxFrameSize || 1048576
this.queue = []
this.scheduled = null
Expand All @@ -144,12 +143,19 @@ export class StreamPublisher implements Publisher {
if (this._closed) {
throw new Error(`Publisher has been closed`)
}
if (this.boot && this.publishingId === -1n) {
this.publishingId = await this.getLastPublishingId()

let publishingIdToSend: bigint
if (this.publisherRef && opts.publishingId) {
publishingIdToSend = opts.publishingId
if (opts.publishingId > this.publishingId) {
this.publishingId = opts.publishingId
}
} else {
this.publishingId = this.publishingId + 1n
publishingIdToSend = this.publishingId
}
this.publishingId = this.publishingId + 1n

return await this.basicSend(this.publishingId, message, opts)
return await this.basicSend(publishingIdToSend, message, opts)
}

async basicSend(publishingId: bigint, content: Buffer, opts: MessageOptions = {}): Promise<SendResult> {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/basic_publish.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe("publish a message", () => {
client = await createClient(username, password, undefined, maxFrameSize, bufferSizeSettings)
streamName = createStreamName()
await rabbit.createStream(streamName)
publisher = await createPublisher(streamName, client)
publisher = await createPublisher(streamName, client, true)
})

afterEach(async () => {
Expand Down
8 changes: 6 additions & 2 deletions test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ export function createConsumerRef(): string {
return `my-consumer-${randomUUID()}`
}

export async function createPublisher(streamName: string, client: Client): Promise<Publisher> {
export async function createPublisher(
streamName: string,
client: Client,
deduplication: Boolean = false
): Promise<Publisher> {
const publisher = await client.declarePublisher({
stream: streamName,
publisherRef: `my-publisher-${randomUUID()}`,
...(deduplication && { publisherRef: `my-publisher-${randomUUID()}` }),
})
return publisher
}
Expand Down
71 changes: 66 additions & 5 deletions test/unit/publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe("Publisher", () => {

afterEach(() => rabbit.deleteStream(testStreamName))

it("increase publishing id from server when boot is true", async () => {
it("increase publishing id from server when publisherRef is defined and publishing id is not set (deduplication active)", async () => {
const oldClient = await createClient(username, password)
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef })
const oldMessages = [...Array(3).keys()]
Expand All @@ -32,27 +32,88 @@ describe("Publisher", () => {
await oldClient.close()
const newClient = await createClient(username, password)

const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: true })
const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef })
await newPublisher.send(Buffer.from(`test${randomUUID()}`))
await newPublisher.flush()

expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n)
await newClient.close()
}).timeout(10000)

it("do not increase publishing id from server when boot is false", async () => {
it("increase publishing id from server when publisherRef is defined and publishing id is set (deduplication active)", async () => {
const oldClient = await createClient(username, password)
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef })
const oldMessages = [...Array(3).keys()]
await Promise.all(
oldMessages.map((_, index) =>
oldPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(index + 1) })
)
)
await oldPublisher.flush()
await oldClient.close()
const newClient = await createClient(username, password)

const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef })
await newPublisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: BigInt(4) })
await newPublisher.flush()

expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n)
await newClient.close()
}).timeout(10000)

it("should not increase publishing id when publishRef is defined and publishing two messages with same id (deduplication active)", async () => {
const client = await createClient(username, password)
const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef })

const publishingId = BigInt(1)
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId })
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: publishingId })
await publisher.flush()

expect(await publisher.getLastPublishingId()).eql(publishingId)
await client.close()
}).timeout(10000)

it("should auto increment publishing id when publishing id is not passed from outside (deduplication active)", async () => {
const client = await createClient(username, password)
const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef })
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 1n })
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 2n })
await publisher.send(Buffer.from(`test${randomUUID()}`))
await publisher.send(Buffer.from(`test${randomUUID()}`))
await publisher.flush()

expect(await publisher.getLastPublishingId()).eql(4n)
await client.close()
})

it("should set latest publishing id when passing it from outside (deduplication active)", async () => {
const client = await createClient(username, password)
const publisher = await client.declarePublisher({ stream: testStreamName, publisherRef })
await publisher.send(Buffer.from(`test${randomUUID()}`))
await publisher.send(Buffer.from(`test${randomUUID()}`))
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 3n })
await publisher.send(Buffer.from(`test${randomUUID()}`), { publishingId: 4n })
await publisher.flush()

expect(await publisher.getLastPublishingId()).eql(4n)
await client.close()
})

it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => {
const oldClient = await createClient(username, password)
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName })
const oldMessages = [...Array(3).keys()]
await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`))))
await oldPublisher.flush()
await oldClient.close()
const newClient = await createClient(username, password)

const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: false })
const newPublisher = await newClient.declarePublisher({ stream: testStreamName })
await newPublisher.send(Buffer.from(`test${randomUUID()}`))
await newPublisher.flush()

expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length))
expect(await newPublisher.getLastPublishingId()).eql(BigInt(0))
await newClient.close()
}).timeout(10000)

Expand Down