Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,34 @@ await publisher.send(Buffer.from("my message content"))
await client.close()
```

### Deduplication

If you want to make sure that a given message isn't sent more than once, you must use deduplication.
To activate the deduplication, you just need to pass `publisherRef` to the `declarePublisher` function.
This way, RabbitMQ will detect messages with the same id and filter them.
Note that these ids are incremental, so you have to be careful about how your application publishes the messages, as the order cannot be guaranteed in multi-threaded applications.

```typescript
const client = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
})

const deduplicationPublisher = await client.declarePublisher({
stream: "stream-name",
publisherRef: "my-publisher",
})

await deduplicationPublisher.send(Buffer.from("my message content"))

// ...

await client.close()
```

### Sub Batch Entry Publishing

```typescript
Expand Down Expand Up @@ -211,18 +239,18 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
```

### Custom Policy

By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing.

```typescript
const consumerOptions = {
stream: "stream-name",
creditPolicy: creditsOnChunkReceived(2, 1)
creditPolicy: creditsOnChunkReceived(2, 1),
}

await client.declareConsumer(consumerOptions, async (message: Message) => {
console.log(message.content)
}
)
console.log(message.content)
})
```

### Clustering
Expand Down
57 changes: 57 additions & 0 deletions example/src/deduplication_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Run this example only with rabbit management version >= 3.13.0.
*/

const rabbit = require("rabbitmq-stream-js-client")
const { randomUUID } = require("crypto")

const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"

async function main() {
const streamName = `example-${randomUUID()}`
const publisherRef = `publisher-${randomUUID()}`
console.log(`Creating stream ${streamName}`)

const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: rabbitUser,
password: rabbitPassword,
vhost: "/",
heartbeat: 0,
})
await client.createStream({ stream: streamName })
await sleep(200)

const firstDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef })
await firstDeduplicationPublisher.send(Buffer.from("Test message 1"))
await firstDeduplicationPublisher.send(Buffer.from("Test message 2"))
await firstDeduplicationPublisher.send(Buffer.from("Test message 3"))
await firstDeduplicationPublisher.flush()
const firstPublisherPublishingId = await firstDeduplicationPublisher.getLastPublishingId()
await firstDeduplicationPublisher.close()

console.log(`Publishing id is ${firstPublisherPublishingId}`)

const secondDeduplicationPublisher = await client.declarePublisher({ stream: streamName, publisherRef: publisherRef })
await secondDeduplicationPublisher.send(Buffer.from("Test message 1"))
await secondDeduplicationPublisher.send(Buffer.from("Test message 2"))
await secondDeduplicationPublisher.flush()
const secondPublisherPublishingId = await secondDeduplicationPublisher.getLastPublishingId()
await secondDeduplicationPublisher.close()

console.log(`Publishing id is still ${secondPublisherPublishingId} (same as first ${firstPublisherPublishingId})`)

await client.deleteStream({ stream: streamName })

await client.close()
}

main()
.then(() => console.log("done!"))
.catch((res) => {
console.log("ERROR ", res)
process.exit(-1)
})
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))
3 changes: 1 addition & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ export class Client {
stream: params.stream,
publisherId: publisherId,
publisherRef: params.publisherRef,
boot: params.boot,
maxFrameSize: this.maxFrameSize,
maxChunkLength: params.maxChunkLength,
logger: this.logger,
}
const publisher = new StreamPublisher(streamPublisherParams, filter)
await publisher.init()
connection.onPublisherClosed(publisher.extendedId, params.stream, async () => {
await publisher.close(false)
this.publishers.delete(publisher.extendedId)
Expand Down Expand Up @@ -740,7 +740,6 @@ export interface ClientParams {
export interface DeclarePublisherParams {
stream: string
publisherRef?: string
boot?: boolean
maxChunkLength?: number
connectionClosedListener?: ConnectionClosedListener
}
Expand Down
16 changes: 10 additions & 6 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export const computeExtendedPublisherId = (publisherId: number, connectionId: st
}

export interface Publisher {
init(): Promise<void>
send(message: Buffer, opts?: MessageOptions): Promise<SendResult>
basicSend(publishingId: bigint, content: Buffer, opts?: MessageOptions): Promise<SendResult>
flush(): Promise<boolean>
Expand All @@ -100,7 +101,6 @@ export class StreamPublisher implements Publisher {
private stream: string
readonly publisherId: number
protected publisherRef: string
private boot: boolean
private publishingId: bigint
private maxFrameSize: number
private queue: PublishRequestMessage[]
Expand All @@ -115,7 +115,6 @@ export class StreamPublisher implements Publisher {
stream: string
publisherId: number
publisherRef?: string
boot?: boolean
maxFrameSize: number
maxChunkLength?: number
logger: Logger
Expand All @@ -126,8 +125,7 @@ export class StreamPublisher implements Publisher {
this.stream = params.stream
this.publisherId = params.publisherId
this.publisherRef = params.publisherRef || ""
this.boot = params.boot || false
this.publishingId = params.boot ? -1n : 0n
this.publishingId = params.publisherRef ? -1n : 0n
this.maxFrameSize = params.maxFrameSize || 1048576
this.queue = []
this.scheduled = null
Expand All @@ -140,12 +138,18 @@ export class StreamPublisher implements Publisher {
return this._closed
}

async init(): Promise<void> {
if (this.publisherRef && this.publishingId === -1n) {
this.publishingId = await this.getLastPublishingId()
}
}

async send(message: Buffer, opts: MessageOptions = {}): Promise<SendResult> {
if (this._closed) {
throw new Error(`Publisher has been closed`)
}
if (this.boot && this.publishingId === -1n) {
this.publishingId = await this.getLastPublishingId()
if (this.publisherRef && this.publishingId === -1n) {
throw new Error(`Please initialize the publisher before sending messages`)
}
this.publishingId = this.publishingId + 1n

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/basic_publish.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ describe("publish a message", () => {
client = await createClient(username, password, undefined, maxFrameSize, bufferSizeSettings)
streamName = createStreamName()
await rabbit.createStream(streamName)
publisher = await createPublisher(streamName, client)
publisher = await createPublisher(streamName, client, true)
})

afterEach(async () => {
Expand Down
8 changes: 6 additions & 2 deletions test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ export function createConsumerRef(): string {
return `my-consumer-${randomUUID()}`
}

export async function createPublisher(streamName: string, client: Client): Promise<Publisher> {
export async function createPublisher(
streamName: string,
client: Client,
deduplication: Boolean = false
): Promise<Publisher> {
const publisher = await client.declarePublisher({
stream: streamName,
publisherRef: `my-publisher-${randomUUID()}`,
...(deduplication && { publisherRef: `my-publisher-${randomUUID()}` }),
})
return publisher
}
Expand Down
13 changes: 7 additions & 6 deletions test/unit/publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe("Publisher", () => {

afterEach(() => rabbit.deleteStream(testStreamName))

it("increase publishing id from server when boot is true", async () => {
it("increase publishing id from server when publisherRef is defined (deduplication active)", async () => {
const oldClient = await createClient(username, password)
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef })
const oldMessages = [...Array(3).keys()]
Expand All @@ -32,27 +32,28 @@ describe("Publisher", () => {
await oldClient.close()
const newClient = await createClient(username, password)

const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: true })
const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef })
await newPublisher.send(Buffer.from(`test${randomUUID()}`))
await newPublisher.flush()

expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length) + 1n)
await newClient.close()
}).timeout(10000)

it("do not increase publishing id from server when boot is false", async () => {
it("do not increase publishing id from server when publisherRef is not defined (deduplication not active)", async () => {
const oldClient = await createClient(username, password)
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName, publisherRef })
const oldPublisher = await oldClient.declarePublisher({ stream: testStreamName })
const oldMessages = [...Array(3).keys()]
await Promise.all(oldMessages.map(() => oldPublisher.send(Buffer.from(`test${randomUUID()}`))))
await oldPublisher.flush()
await oldClient.close()
const newClient = await createClient(username, password)

const newPublisher = await newClient.declarePublisher({ stream: testStreamName, publisherRef, boot: false })
const newPublisher = await newClient.declarePublisher({ stream: testStreamName })
await newPublisher.send(Buffer.from(`test${randomUUID()}`))
await newPublisher.flush()

expect(await newPublisher.getLastPublishingId()).eql(BigInt(oldMessages.length))
expect(await newPublisher.getLastPublishingId()).eql(BigInt(0))
await newClient.close()
}).timeout(10000)

Expand Down