Skip to content

Commit be11c72

Browse files
l4mbyapietroni51magne
authored
[feat/IS-70] support filter expressions (#75)
* update docker version * added application properties to message * including sql filter * test * feat: add filters * feat: sql filters, properties filter, application properties filter --------- Co-authored-by: Andrea Pietroni <[email protected]> Co-authored-by: magne <[email protected]>
1 parent e6d4d4a commit be11c72

File tree

6 files changed

+194
-8
lines changed

6 files changed

+194
-8
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020

2121
services:
2222
rabbitmq:
23-
image: rabbitmq:4.1.1-management
23+
image: rabbitmq:4.2.0-beta.3-management
2424
options: --hostname test-node --name test-node
2525
env:
2626
RABBITMQ_HOSTNAME: "test-node"

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
rabbitmq-js-client:
3-
image: rabbitmq:4.1.1-management
3+
image: rabbitmq:4.2.0-beta.3-management
44
container_name: rabbitmq-js-client
55
restart: unless-stopped
66
hostname: "rabbitmq"

src/consumer.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@ import {
88
EventContext,
99
Message,
1010
Dictionary,
11+
types,
12+
Typed,
13+
MessageProperties,
1114
} from "rhea"
1215
import {
1316
Offset,
1417
SourceFilter,
1518
STREAM_FILTER_MATCH_UNFILTERED,
1619
STREAM_FILTER_SPEC,
1720
STREAM_OFFSET_SPEC,
21+
STREAM_FILTER_SQL,
22+
STREAM_FILTER_MESSAGE_PROPERTIES,
23+
STREAM_FILTER_APPLICATION_PROPERTIES,
1824
} from "./utils.js"
1925
import { openLink } from "./rhea_wrapper.js"
2026
import { createConsumerAddressFrom } from "./message.js"
@@ -27,6 +33,9 @@ export type StreamOptions = {
2733
name: string
2834
offset?: Offset
2935
filterValues?: string[]
36+
messagePropertiesFilter?: MessageProperties
37+
applicationPropertiesFilter?: Dictionary<string>
38+
sqlFilter?: string
3039
matchUnfiltered?: boolean
3140
}
3241

@@ -125,16 +134,27 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter |
125134
throw new Error("At least one between offset and filterValues must be set when using filtering")
126135
}
127136

128-
const filters: Dictionary<string | bigint | boolean | string[]> = {}
137+
const filters: Dictionary<string | bigint | boolean | string[] | Typed> = {}
129138
if (params.stream.offset) {
130139
filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue()
131140
}
132141
if (params.stream.filterValues) {
133142
filters[STREAM_FILTER_SPEC] = params.stream.filterValues
134143
}
144+
if (params.stream.sqlFilter) {
145+
filters[STREAM_FILTER_SQL] = types.wrap_described(params.stream.sqlFilter, 0x120)
146+
}
135147
if (params.stream.matchUnfiltered) {
136148
filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered
137149
}
150+
if (params.stream.messagePropertiesFilter) {
151+
const symbolicMap = types.wrap_symbolic_map(params.stream.messagePropertiesFilter)
152+
filters[STREAM_FILTER_MESSAGE_PROPERTIES] = types.wrap_described(symbolicMap, 0x173)
153+
}
154+
if (params.stream.applicationPropertiesFilter) {
155+
const map = types.wrap_map(params.stream.applicationPropertiesFilter)
156+
filters[STREAM_FILTER_APPLICATION_PROPERTIES] = types.wrap_described(map, 0x174)
157+
}
138158

139159
return filters
140160
}

src/message.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { generate_uuid, MessageAnnotations, Message as RheaMessage } from "rhea"
1+
import { Dictionary, generate_uuid, MessageAnnotations, MessageProperties, Message as RheaMessage } from "rhea"
22
import { AmqpEndpoints } from "./link_message_builder.js"
33
import { inspect } from "util"
44
import { CreateConsumerParams } from "./consumer.js"
@@ -18,6 +18,8 @@ type MessageOptions = {
1818
body: string
1919
destination?: DestinationOptions
2020
annotations?: MessageAnnotations
21+
message_properties?: MessageProperties
22+
application_properties?: Dictionary<string>
2123
}
2224

2325
export function createAmqpMessage(options: MessageOptions): RheaMessage {
@@ -28,10 +30,19 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage {
2830
to: createPublisherAddressFrom(options.destination),
2931
durable: true,
3032
message_annotations: options.annotations,
33+
application_properties: options.application_properties,
34+
...(options.message_properties ? options.message_properties : {}),
3135
}
3236
}
3337

34-
return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations }
38+
return {
39+
message_id: generate_uuid(),
40+
body: options.body,
41+
durable: true,
42+
message_annotations: options.annotations,
43+
application_properties: options.application_properties,
44+
...(options.message_properties ? options.message_properties : {}),
45+
}
3546
}
3647

3748
export function createPublisherAddressFrom(options?: DestinationOptions): string | undefined {

src/utils.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Dictionary, Message } from "rhea"
1+
import { Dictionary, Message, Typed } from "rhea"
22
import { QueueType } from "./queue.js"
33

44
export enum AmqpResponseCodes {
@@ -20,11 +20,14 @@ export const DURABLE = 1
2020
export const AUTO_DELETE = 1
2121
export const EXCLUSIVE = 1
2222

23-
export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
2423
export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
24+
export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
2525
export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
26+
export const STREAM_FILTER_MESSAGE_PROPERTIES = "properties-filter"
27+
export const STREAM_FILTER_APPLICATION_PROPERTIES = "application-properties-filter"
28+
export const STREAM_FILTER_SQL = "sql-filter"
2629

27-
export type SourceFilter = Dictionary<string | bigint | boolean | string[]>
30+
export type SourceFilter = Dictionary<string | bigint | boolean | string[] | Typed>
2831

2932
export type Result<T, K> = OkResult<T> | ErrorResult<K>
3033

test/e2e/consumer.test.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,158 @@ describe("Consumer", () => {
202202
})
203203
})
204204

205+
test("consumer can handle message on stream with filter on message properties", async () => {
206+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
207+
const filteredMessage = createAmqpMessage({
208+
body: "my body",
209+
message_properties: {
210+
subject: "foo",
211+
},
212+
})
213+
const discardedMessage = createAmqpMessage({
214+
body: "discard me",
215+
message_properties: {
216+
subject: "bar",
217+
},
218+
})
219+
await publisher.publish(filteredMessage)
220+
await publisher.publish(discardedMessage)
221+
let received: number = 0
222+
223+
const consumer = await connection.createConsumer({
224+
stream: {
225+
name: streamName,
226+
offset: Offset.first(),
227+
matchUnfiltered: false,
228+
messagePropertiesFilter: { subject: "foo" },
229+
},
230+
messageHandler: (context) => {
231+
received++
232+
context.accept()
233+
},
234+
})
235+
consumer.start()
236+
237+
await eventually(() => {
238+
expect(received).to.be.eql(1)
239+
})
240+
})
241+
242+
test("consumer can handle message on stream with filter on application properties", async () => {
243+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
244+
const filteredMessage = createAmqpMessage({
245+
body: "my body",
246+
application_properties: {
247+
test: "foo",
248+
},
249+
})
250+
const discardedMessage = createAmqpMessage({
251+
body: "discard me",
252+
application_properties: {
253+
test: "bar",
254+
},
255+
})
256+
await publisher.publish(filteredMessage)
257+
await publisher.publish(discardedMessage)
258+
let received: number = 0
259+
260+
const consumer = await connection.createConsumer({
261+
stream: {
262+
name: streamName,
263+
offset: Offset.first(),
264+
matchUnfiltered: false,
265+
applicationPropertiesFilter: { test: "foo" },
266+
},
267+
messageHandler: (context) => {
268+
received++
269+
context.accept()
270+
},
271+
})
272+
consumer.start()
273+
274+
await eventually(() => {
275+
expect(received).to.be.eql(1)
276+
})
277+
})
278+
279+
test("consumer can handle message on stream with SQL filters on message properties", async () => {
280+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
281+
const filteredMessage = createAmqpMessage({
282+
body: "my body",
283+
message_properties: {
284+
subject: "foo",
285+
},
286+
})
287+
const discardedMessage = createAmqpMessage({
288+
body: "discard me",
289+
message_properties: {
290+
subject: "bar",
291+
},
292+
})
293+
await publisher.publish(filteredMessage)
294+
await publisher.publish(discardedMessage)
295+
let received: string = ""
296+
297+
const consumer = await connection.createConsumer({
298+
stream: {
299+
name: streamName,
300+
offset: Offset.first(),
301+
matchUnfiltered: false,
302+
sqlFilter: "properties.subject = 'foo'",
303+
},
304+
messageHandler: (context, message) => {
305+
if (message.subject && message.subject == "foo") {
306+
received = message.body
307+
}
308+
context.accept()
309+
},
310+
})
311+
consumer.start()
312+
313+
await eventually(() => {
314+
expect(received).to.be.eql("my body")
315+
})
316+
})
317+
318+
test("consumer can handle message on stream with SQL filters on message application properties", async () => {
319+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
320+
const filteredMessage = createAmqpMessage({
321+
body: "my body",
322+
application_properties: {
323+
test: "foo",
324+
},
325+
})
326+
const discardedMessage = createAmqpMessage({
327+
body: "discard me",
328+
application_properties: {
329+
test: "bar",
330+
},
331+
})
332+
await publisher.publish(filteredMessage)
333+
await publisher.publish(discardedMessage)
334+
let received: string = ""
335+
336+
const consumer = await connection.createConsumer({
337+
stream: {
338+
name: streamName,
339+
offset: Offset.first(),
340+
matchUnfiltered: false,
341+
sqlFilter: "application_properties.test = 'foo'",
342+
},
343+
messageHandler: (context, message) => {
344+
if (message.application_properties && message.application_properties.test == "foo") {
345+
received = message.body
346+
}
347+
context.accept()
348+
},
349+
})
350+
consumer.start()
351+
352+
await eventually(() => {
353+
expect(received).to.be.eql("my body")
354+
})
355+
})
356+
205357
test("consumer can discard a message published to a queue", async () => {
206358
const publisher = await connection.createPublisher({ queue: { name: discardQueueName } })
207359
const expectedBody = "ciao"

0 commit comments

Comments
 (0)