Skip to content

Commit f389aaf

Browse files
author
magne
committed
feat: add stream support for consumers
1 parent fc79c3c commit f389aaf

File tree

7 files changed

+195
-34
lines changed

7 files changed

+195
-34
lines changed

src/connection.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export interface Connection {
1212
createPublisher(options?: DestinationOptions): Promise<Publisher>
1313
get publishers(): Map<string, Publisher>
1414
get consumers(): Map<string, Consumer>
15-
createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer>
15+
createConsumer(params: CreateConsumerParams): Promise<Consumer>
1616
}
1717

1818
export class AmqpConnection implements Connection {
@@ -59,8 +59,8 @@ export class AmqpConnection implements Connection {
5959
})
6060
}
6161

62-
async createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer> {
63-
const consumer = await AmqpConsumer.createFrom(this.connection, this._consumers, queueName, params)
62+
async createConsumer(params: CreateConsumerParams): Promise<Consumer> {
63+
const consumer = await AmqpConsumer.createFrom(this.connection, this._consumers, params)
6464
this._consumers.set(consumer.id, consumer)
6565
return consumer
6666
}

src/consumer.ts

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,38 @@ import {
77
SenderOptions,
88
EventContext,
99
Message,
10+
Dictionary,
1011
} from "rhea"
11-
import { openLink } from "./utils.js"
12-
import { createAddressFrom } from "./message.js"
12+
import {
13+
Offset,
14+
openLink,
15+
SourceFilter,
16+
STREAM_FILTER_MATCH_UNFILTERED,
17+
STREAM_FILTER_SPEC,
18+
STREAM_OFFSET_SPEC,
19+
} from "./utils.js"
20+
import { createConsumerAddressFrom } from "./message.js"
21+
import { QueueOptions } from "./message.js"
1322

1423
export type ConsumerMessageHandler = (message: Message) => void
1524

16-
export type CreateConsumerParams = {
25+
export type StreamOptions = {
26+
name: string
27+
offset?: Offset
28+
filterValues?: string[]
29+
matchUnfiltered?: boolean
30+
}
31+
32+
export type SourceOptions = { stream: StreamOptions } | { queue: QueueOptions }
33+
34+
export type CreateConsumerParams = SourceOptions & {
1735
messageHandler: ConsumerMessageHandler
1836
}
1937

2038
const getConsumerReceiverLinkConfigurationFrom = (
2139
address: string,
22-
consumerId: string
40+
consumerId: string,
41+
filter?: SourceFilter
2342
): SenderOptions | ReceiverOptions => ({
2443
snd_settle_mode: 0,
2544
rcv_settle_mode: 0,
@@ -31,6 +50,7 @@ const getConsumerReceiverLinkConfigurationFrom = (
3150
timeout: 0,
3251
dynamic: false,
3352
durable: 0,
53+
filter,
3454
},
3555
})
3656

@@ -41,27 +61,28 @@ export interface Consumer {
4161
}
4262

4363
export class AmqpConsumer implements Consumer {
44-
static async createFrom(
45-
connection: Connection,
46-
consumersList: Map<string, Consumer>,
47-
queueName: string,
48-
params: CreateConsumerParams
49-
) {
64+
static async createFrom(connection: Connection, consumersList: Map<string, Consumer>, params: CreateConsumerParams) {
5065
const id = generate_uuid()
51-
const address = createAddressFrom({ queue: { name: queueName } })
66+
const address = createConsumerAddressFrom(params)
67+
const filter = createConsumerFilterFrom(params)
5268
if (!address) throw new Error("Consumer must have an address")
5369

54-
const receiverLink = await AmqpConsumer.openReceiver(connection, address, id)
70+
const receiverLink = await AmqpConsumer.openReceiver(connection, address, id, filter)
5571
return new AmqpConsumer(id, connection, consumersList, receiverLink, params)
5672
}
5773

58-
private static async openReceiver(connection: Connection, address: string, consumerId: string): Promise<Receiver> {
74+
private static async openReceiver(
75+
connection: Connection,
76+
address: string,
77+
consumerId: string,
78+
filter?: SourceFilter
79+
): Promise<Receiver> {
5980
return openLink<Receiver>(
6081
connection,
6182
ReceiverEvents.receiverOpen,
6283
ReceiverEvents.receiverError,
6384
connection.open_receiver.bind(connection),
64-
getConsumerReceiverLinkConfigurationFrom(address, consumerId)
85+
getConsumerReceiverLinkConfigurationFrom(address, consumerId, filter)
6586
)
6687
}
6788

@@ -98,3 +119,25 @@ export class AmqpConsumer implements Consumer {
98119
if (this.consumersList.has(this._id)) this.consumersList.delete(this._id)
99120
}
100121
}
122+
123+
function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | undefined {
124+
if ("queue" in params) {
125+
return undefined
126+
}
127+
if (!params.stream.offset && !params.stream.filterValues) {
128+
throw new Error("At least one between offset and filterValues must be set when using filtering")
129+
}
130+
131+
const filters: Dictionary<string | bigint | boolean | string[]> = {}
132+
if (params.stream.offset) {
133+
filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue()
134+
}
135+
if (params.stream.filterValues) {
136+
filters[STREAM_FILTER_SPEC] = params.stream.filterValues
137+
}
138+
if (params.stream.matchUnfiltered) {
139+
filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered
140+
}
141+
142+
return filters
143+
}

src/message.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { generate_uuid, MessageAnnotations, Message as RheaMessage } from "rhea"
22
import { AmqpEndpoints } from "./link_message_builder.js"
33
import { inspect } from "util"
4+
import { CreateConsumerParams } from "./consumer.js"
45

56
export type ExchangeOptions = {
67
name: string
@@ -24,21 +25,16 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage {
2425
return {
2526
message_id: generate_uuid(),
2627
body: options.body,
27-
to: createAddressFrom(options.destination),
28+
to: createPublisherAddressFrom(options.destination),
2829
durable: true,
29-
message_annotations: options.annotations ?? {},
30+
message_annotations: options.annotations,
3031
}
3132
}
3233

33-
return {
34-
message_id: generate_uuid(),
35-
body: options.body,
36-
durable: true,
37-
message_annotations: options.annotations ?? {},
38-
}
34+
return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations }
3935
}
4036

41-
export function createAddressFrom(options?: DestinationOptions): string | undefined {
37+
export function createPublisherAddressFrom(options?: DestinationOptions): string | undefined {
4238
if (!options) return undefined
4339
if ("queue" in options) return `/${AmqpEndpoints.Queues}/${options.queue.name}`
4440
if ("exchange" in options) {
@@ -49,3 +45,10 @@ export function createAddressFrom(options?: DestinationOptions): string | undefi
4945

5046
throw new Error(`Unknown publisher options -- ${inspect(options)}`)
5147
}
48+
49+
export function createConsumerAddressFrom(params: CreateConsumerParams): string | undefined {
50+
if ("queue" in params) return `/${AmqpEndpoints.Queues}/${params.queue.name}`
51+
if ("stream" in params) return `/${AmqpEndpoints.Queues}/${params.stream.name}`
52+
53+
throw new Error(`Unknown publisher options -- ${inspect(params)}`)
54+
}

src/publisher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Connection, Delivery, EventContext, Message, ReceiverOptions, Sender, S
22
import { openLink, OutcomeState } from "./utils.js"
33
import { randomUUID } from "crypto"
44
import { inspect } from "util"
5-
import { createAddressFrom, DestinationOptions } from "./message.js"
5+
import { createPublisherAddressFrom, DestinationOptions } from "./message.js"
66

77
const getPublisherSenderLinkConfigurationFrom = (
88
publisherId: string,
@@ -47,7 +47,7 @@ export class AmqpPublisher implements Publisher {
4747
publishersList: Map<string, Publisher>,
4848
options?: DestinationOptions
4949
): Promise<Publisher> {
50-
const address = createAddressFrom(options)
50+
const address = createPublisherAddressFrom(options)
5151
const id = randomUUID()
5252
const senderLink = await AmqpPublisher.openSender(connection, id, address)
5353
return new AmqpPublisher(connection, senderLink, id, publishersList)

src/utils.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
Connection,
3+
Dictionary,
34
Message,
45
Receiver,
56
ReceiverEvents,
@@ -29,6 +30,12 @@ export const DURABLE = 1
2930
export const AUTO_DELETE = 1
3031
export const EXCLUSIVE = 1
3132

33+
export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
34+
export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
35+
export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
36+
37+
export type SourceFilter = Dictionary<string | bigint | boolean | string[]>
38+
3239
export type Result<T, K> = OkResult<T> | ErrorResult<K>
3340

3441
type OkResult<T> = {
@@ -85,3 +92,43 @@ export async function openLink<T extends Sender | Receiver>(
8592
openMethod(config)
8693
})
8794
}
95+
96+
export enum OffsetType {
97+
first = "first",
98+
last = "last",
99+
next = "next",
100+
numeric = "numeric",
101+
timestamp = "timestamp",
102+
}
103+
104+
export class Offset {
105+
private constructor(
106+
public readonly type: OffsetType,
107+
public readonly value?: bigint
108+
) {}
109+
110+
toValue() {
111+
if (this.value && (this.type === OffsetType.numeric || this.type === OffsetType.timestamp)) return this.value
112+
return this.type.toString()
113+
}
114+
115+
static first() {
116+
return new Offset(OffsetType.first)
117+
}
118+
119+
static last() {
120+
return new Offset(OffsetType.last)
121+
}
122+
123+
static next() {
124+
return new Offset(OffsetType.next)
125+
}
126+
127+
static offset(offset: bigint) {
128+
return new Offset(OffsetType.numeric, offset)
129+
}
130+
131+
static timestamp(date: Date) {
132+
return new Offset(OffsetType.timestamp, BigInt(date.getTime()))
133+
}
134+
}

test/e2e/connection.test.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ describe("Connection", () => {
108108
})
109109

110110
test("create a consumer linked to a queue", async () => {
111-
await connection.createConsumer(queueName, {
111+
await connection.createConsumer({
112+
queue: { name: queueName },
112113
messageHandler: async (msg) => {
113114
console.log(msg)
114115
},
@@ -118,7 +119,8 @@ describe("Connection", () => {
118119
})
119120

120121
test("close a consumer", async () => {
121-
const consumer = await connection.createConsumer(queueName, {
122+
const consumer = await connection.createConsumer({
123+
queue: { name: queueName },
122124
messageHandler: async (msg) => {
123125
console.log(msg)
124126
},
@@ -131,7 +133,8 @@ describe("Connection", () => {
131133

132134
test("closing the connection also closes the consumer", async () => {
133135
const newConnection = await environment.createConnection()
134-
await newConnection.createConsumer(queueName, {
136+
await newConnection.createConsumer({
137+
queue: { name: queueName },
135138
messageHandler: async (msg) => {
136139
console.log(msg)
137140
},

0 commit comments

Comments
 (0)