Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface Connection {
createPublisher(options?: DestinationOptions): Promise<Publisher>
get publishers(): Map<string, Publisher>
get consumers(): Map<string, Consumer>
createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer>
createConsumer(params: CreateConsumerParams): Promise<Consumer>
}

export class AmqpConnection implements Connection {
Expand Down Expand Up @@ -59,8 +59,8 @@ export class AmqpConnection implements Connection {
})
}

async createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer> {
const consumer = await AmqpConsumer.createFrom(this.connection, this._consumers, queueName, params)
async createConsumer(params: CreateConsumerParams): Promise<Consumer> {
const consumer = await AmqpConsumer.createFrom(this.connection, this._consumers, params)
this._consumers.set(consumer.id, consumer)
return consumer
}
Expand Down
71 changes: 57 additions & 14 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,38 @@ import {
SenderOptions,
EventContext,
Message,
Dictionary,
} from "rhea"
import { openLink } from "./utils.js"
import { createAddressFrom } from "./message.js"
import {
Offset,
openLink,
SourceFilter,
STREAM_FILTER_MATCH_UNFILTERED,
STREAM_FILTER_SPEC,
STREAM_OFFSET_SPEC,
} from "./utils.js"
import { createConsumerAddressFrom } from "./message.js"
import { QueueOptions } from "./message.js"

export type ConsumerMessageHandler = (message: Message) => void

export type CreateConsumerParams = {
export type StreamOptions = {
name: string
offset?: Offset
filterValues?: string[]
matchUnfiltered?: boolean
}

export type SourceOptions = { stream: StreamOptions } | { queue: QueueOptions }

export type CreateConsumerParams = SourceOptions & {
messageHandler: ConsumerMessageHandler
}

const getConsumerReceiverLinkConfigurationFrom = (
address: string,
consumerId: string
consumerId: string,
filter?: SourceFilter
): SenderOptions | ReceiverOptions => ({
snd_settle_mode: 0,
rcv_settle_mode: 0,
Expand All @@ -31,6 +50,7 @@ const getConsumerReceiverLinkConfigurationFrom = (
timeout: 0,
dynamic: false,
durable: 0,
filter,
},
})

Expand All @@ -41,27 +61,28 @@ export interface Consumer {
}

export class AmqpConsumer implements Consumer {
static async createFrom(
connection: Connection,
consumersList: Map<string, Consumer>,
queueName: string,
params: CreateConsumerParams
) {
static async createFrom(connection: Connection, consumersList: Map<string, Consumer>, params: CreateConsumerParams) {
const id = generate_uuid()
const address = createAddressFrom({ queue: { name: queueName } })
const address = createConsumerAddressFrom(params)
const filter = createConsumerFilterFrom(params)
if (!address) throw new Error("Consumer must have an address")

const receiverLink = await AmqpConsumer.openReceiver(connection, address, id)
const receiverLink = await AmqpConsumer.openReceiver(connection, address, id, filter)
return new AmqpConsumer(id, connection, consumersList, receiverLink, params)
}

private static async openReceiver(connection: Connection, address: string, consumerId: string): Promise<Receiver> {
private static async openReceiver(
connection: Connection,
address: string,
consumerId: string,
filter?: SourceFilter
): Promise<Receiver> {
return openLink<Receiver>(
connection,
ReceiverEvents.receiverOpen,
ReceiverEvents.receiverError,
connection.open_receiver.bind(connection),
getConsumerReceiverLinkConfigurationFrom(address, consumerId)
getConsumerReceiverLinkConfigurationFrom(address, consumerId, filter)
)
}

Expand Down Expand Up @@ -98,3 +119,25 @@ export class AmqpConsumer implements Consumer {
if (this.consumersList.has(this._id)) this.consumersList.delete(this._id)
}
}

function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | undefined {
if ("queue" in params) {
return undefined
}
if (!params.stream.offset && !params.stream.filterValues) {
throw new Error("At least one between offset and filterValues must be set when using filtering")
}

const filters: Dictionary<string | bigint | boolean | string[]> = {}
if (params.stream.offset) {
filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue()
}
if (params.stream.filterValues) {
filters[STREAM_FILTER_SPEC] = params.stream.filterValues
}
if (params.stream.matchUnfiltered) {
filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered
}

return filters
}
21 changes: 12 additions & 9 deletions src/message.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { generate_uuid, MessageAnnotations, Message as RheaMessage } from "rhea"
import { AmqpEndpoints } from "./link_message_builder.js"
import { inspect } from "util"
import { CreateConsumerParams } from "./consumer.js"

export type ExchangeOptions = {
name: string
Expand All @@ -24,21 +25,16 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage {
return {
message_id: generate_uuid(),
body: options.body,
to: createAddressFrom(options.destination),
to: createPublisherAddressFrom(options.destination),
durable: true,
message_annotations: options.annotations ?? {},
message_annotations: options.annotations,
}
}

return {
message_id: generate_uuid(),
body: options.body,
durable: true,
message_annotations: options.annotations ?? {},
}
return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations }
}

export function createAddressFrom(options?: DestinationOptions): string | undefined {
export function createPublisherAddressFrom(options?: DestinationOptions): string | undefined {
if (!options) return undefined
if ("queue" in options) return `/${AmqpEndpoints.Queues}/${options.queue.name}`
if ("exchange" in options) {
Expand All @@ -49,3 +45,10 @@ export function createAddressFrom(options?: DestinationOptions): string | undefi

throw new Error(`Unknown publisher options -- ${inspect(options)}`)
}

export function createConsumerAddressFrom(params: CreateConsumerParams): string | undefined {
if ("queue" in params) return `/${AmqpEndpoints.Queues}/${params.queue.name}`
if ("stream" in params) return `/${AmqpEndpoints.Queues}/${params.stream.name}`

throw new Error(`Unknown publisher options -- ${inspect(params)}`)
}
4 changes: 2 additions & 2 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Connection, Delivery, EventContext, Message, ReceiverOptions, Sender, S
import { openLink, OutcomeState } from "./utils.js"
import { randomUUID } from "crypto"
import { inspect } from "util"
import { createAddressFrom, DestinationOptions } from "./message.js"
import { createPublisherAddressFrom, DestinationOptions } from "./message.js"

const getPublisherSenderLinkConfigurationFrom = (
publisherId: string,
Expand Down Expand Up @@ -47,7 +47,7 @@ export class AmqpPublisher implements Publisher {
publishersList: Map<string, Publisher>,
options?: DestinationOptions
): Promise<Publisher> {
const address = createAddressFrom(options)
const address = createPublisherAddressFrom(options)
const id = randomUUID()
const senderLink = await AmqpPublisher.openSender(connection, id, address)
return new AmqpPublisher(connection, senderLink, id, publishersList)
Expand Down
47 changes: 47 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Connection,
Dictionary,
Message,
Receiver,
ReceiverEvents,
Expand Down Expand Up @@ -29,6 +30,12 @@ export const DURABLE = 1
export const AUTO_DELETE = 1
export const EXCLUSIVE = 1

export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"

export type SourceFilter = Dictionary<string | bigint | boolean | string[]>

export type Result<T, K> = OkResult<T> | ErrorResult<K>

type OkResult<T> = {
Expand Down Expand Up @@ -85,3 +92,43 @@ export async function openLink<T extends Sender | Receiver>(
openMethod(config)
})
}

export enum OffsetType {
first = "first",
last = "last",
next = "next",
numeric = "numeric",
timestamp = "timestamp",
}

export class Offset {
private constructor(
public readonly type: OffsetType,
public readonly value?: bigint
) {}

toValue() {
if (this.value && (this.type === OffsetType.numeric || this.type === OffsetType.timestamp)) return this.value
return this.type.toString()
}

static first() {
return new Offset(OffsetType.first)
}

static last() {
return new Offset(OffsetType.last)
}

static next() {
return new Offset(OffsetType.next)
}

static offset(offset: bigint) {
return new Offset(OffsetType.numeric, offset)
}

static timestamp(date: Date) {
return new Offset(OffsetType.timestamp, BigInt(date.getTime()))
}
}
9 changes: 6 additions & 3 deletions test/e2e/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ describe("Connection", () => {
})

test("create a consumer linked to a queue", async () => {
await connection.createConsumer(queueName, {
await connection.createConsumer({
queue: { name: queueName },
messageHandler: async (msg) => {
console.log(msg)
},
Expand All @@ -118,7 +119,8 @@ describe("Connection", () => {
})

test("close a consumer", async () => {
const consumer = await connection.createConsumer(queueName, {
const consumer = await connection.createConsumer({
queue: { name: queueName },
messageHandler: async (msg) => {
console.log(msg)
},
Expand All @@ -131,7 +133,8 @@ describe("Connection", () => {

test("closing the connection also closes the consumer", async () => {
const newConnection = await environment.createConnection()
await newConnection.createConsumer(queueName, {
await newConnection.createConsumer({
queue: { name: queueName },
messageHandler: async (msg) => {
console.log(msg)
},
Expand Down
Loading