Skip to content

Commit 3a60cca

Browse files
author
magne
committed
feat: sql filters, properties filter, application properties filter
1 parent f7af4c5 commit 3a60cca

File tree

3 files changed

+139
-14
lines changed

3 files changed

+139
-14
lines changed

src/consumer.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@ import {
88
EventContext,
99
Message,
1010
Dictionary,
11-
filter,
11+
types,
12+
Typed,
13+
MessageProperties,
1214
} from "rhea"
1315
import {
1416
Offset,
1517
SourceFilter,
1618
STREAM_FILTER_MATCH_UNFILTERED,
1719
STREAM_FILTER_SPEC,
1820
STREAM_OFFSET_SPEC,
19-
STREAM_SQL_FILTER,
21+
STREAM_FILTER_SQL,
22+
STREAM_FILTER_MESSAGE_PROPERTIES,
23+
STREAM_FILTER_APPLICATION_PROPERTIES,
2024
} from "./utils.js"
2125
import { openLink } from "./rhea_wrapper.js"
2226
import { createConsumerAddressFrom } from "./message.js"
@@ -29,6 +33,8 @@ export type StreamOptions = {
2933
name: string
3034
offset?: Offset
3135
filterValues?: string[]
36+
messagePropertiesFilter?: MessageProperties
37+
applicationPropertiesFilter?: Dictionary<string>
3238
sqlFilter?: string
3339
matchUnfiltered?: boolean
3440
}
@@ -128,21 +134,27 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter |
128134
throw new Error("At least one between offset and filterValues must be set when using filtering")
129135
}
130136

131-
const filters: Dictionary<string | bigint | boolean | string[] | any> = {}
137+
const filters: Dictionary<string | bigint | boolean | string[] | Typed> = {}
132138
if (params.stream.offset) {
133139
filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue()
134140
}
135141
if (params.stream.filterValues) {
136142
filters[STREAM_FILTER_SPEC] = params.stream.filterValues
137143
}
138144
if (params.stream.sqlFilter) {
139-
console.log(filter.selector(`Described(${String(Symbol("amqp:sql-filter"))}, ${params.stream.sqlFilter})`))
140-
const { descriptor, value } = filter.selector(params.stream.sqlFilter)["jms-selector"]
141-
filters["sql-filter"] = { descriptor: descriptor.value, value }
145+
filters[STREAM_FILTER_SQL] = types.wrap_described(params.stream.sqlFilter, 0x120)
142146
}
143147
if (params.stream.matchUnfiltered) {
144148
filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered
145149
}
150+
if (params.stream.messagePropertiesFilter) {
151+
const symbolicMap = types.wrap_symbolic_map(params.stream.messagePropertiesFilter)
152+
filters[STREAM_FILTER_MESSAGE_PROPERTIES] = types.wrap_described(symbolicMap, 0x173)
153+
}
154+
if (params.stream.applicationPropertiesFilter) {
155+
const map = types.wrap_map(params.stream.applicationPropertiesFilter)
156+
filters[STREAM_FILTER_APPLICATION_PROPERTIES] = types.wrap_described(map, 0x174)
157+
}
146158

147159
return filters
148160
}

src/utils.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Dictionary, Message } from "rhea"
1+
import { Dictionary, Message, Typed } from "rhea"
22
import { QueueType } from "./queue.js"
33

44
export enum AmqpResponseCodes {
@@ -20,12 +20,14 @@ export const DURABLE = 1
2020
export const AUTO_DELETE = 1
2121
export const EXCLUSIVE = 1
2222

23-
export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
2423
export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
24+
export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter"
2525
export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
26-
export const STREAM_SQL_FILTER = "amqp:sql-filter"
26+
export const STREAM_FILTER_MESSAGE_PROPERTIES = "properties-filter"
27+
export const STREAM_FILTER_APPLICATION_PROPERTIES = "application-properties-filter"
28+
export const STREAM_FILTER_SQL = "sql-filter"
2729

28-
export type SourceFilter = Dictionary<string | bigint | boolean | string[]>
30+
export type SourceFilter = Dictionary<string | bigint | boolean | string[] | Typed>
2931

3032
export type Result<T, K> = OkResult<T> | ErrorResult<K>
3133

test/e2e/consumer.test.ts

Lines changed: 115 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,81 @@ describe("Consumer", () => {
202202
})
203203
})
204204

205-
test("consumer can handle message on stream with sql filters", async () => {
205+
test("consumer can handle message on stream with filter on message properties", async () => {
206+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
207+
const filteredMessage = createAmqpMessage({
208+
body: "my body",
209+
message_properties: {
210+
subject: "foo",
211+
},
212+
})
213+
const discardedMessage = createAmqpMessage({
214+
body: "discard me",
215+
message_properties: {
216+
subject: "bar",
217+
},
218+
})
219+
await publisher.publish(filteredMessage)
220+
await publisher.publish(discardedMessage)
221+
let received: number = 0
222+
223+
const consumer = await connection.createConsumer({
224+
stream: {
225+
name: streamName,
226+
offset: Offset.first(),
227+
matchUnfiltered: false,
228+
messagePropertiesFilter: { subject: "foo" },
229+
},
230+
messageHandler: (context) => {
231+
received++
232+
context.accept()
233+
},
234+
})
235+
consumer.start()
236+
237+
await eventually(() => {
238+
expect(received).to.be.eql(1)
239+
})
240+
})
241+
242+
test("consumer can handle message on stream with filter on application properties", async () => {
243+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
244+
const filteredMessage = createAmqpMessage({
245+
body: "my body",
246+
application_properties: {
247+
test: "foo",
248+
},
249+
})
250+
const discardedMessage = createAmqpMessage({
251+
body: "discard me",
252+
application_properties: {
253+
test: "bar",
254+
},
255+
})
256+
await publisher.publish(filteredMessage)
257+
await publisher.publish(discardedMessage)
258+
let received: number = 0
259+
260+
const consumer = await connection.createConsumer({
261+
stream: {
262+
name: streamName,
263+
offset: Offset.first(),
264+
matchUnfiltered: false,
265+
applicationPropertiesFilter: { test: "foo" },
266+
},
267+
messageHandler: (context) => {
268+
received++
269+
context.accept()
270+
},
271+
})
272+
consumer.start()
273+
274+
await eventually(() => {
275+
expect(received).to.be.eql(1)
276+
})
277+
})
278+
279+
test("consumer can handle message on stream with SQL filters on message properties", async () => {
206280
const publisher = await connection.createPublisher({ queue: { name: streamName } })
207281
const filteredMessage = createAmqpMessage({
208282
body: "my body",
@@ -225,12 +299,49 @@ describe("Consumer", () => {
225299
name: streamName,
226300
offset: Offset.first(),
227301
matchUnfiltered: false,
228-
sqlFilter: "properties.subject = '123'",
302+
sqlFilter: "properties.subject = 'foo'",
229303
},
230304
messageHandler: (context, message) => {
231-
console.log("message", message.subject)
232305
if (message.subject && message.subject == "foo") {
233-
console.log("hello ")
306+
received = message.body
307+
}
308+
context.accept()
309+
},
310+
})
311+
consumer.start()
312+
313+
await eventually(() => {
314+
expect(received).to.be.eql("my body")
315+
})
316+
})
317+
318+
test("consumer can handle message on stream with SQL filters on message application properties", async () => {
319+
const publisher = await connection.createPublisher({ queue: { name: streamName } })
320+
const filteredMessage = createAmqpMessage({
321+
body: "my body",
322+
application_properties: {
323+
test: "foo",
324+
},
325+
})
326+
const discardedMessage = createAmqpMessage({
327+
body: "discard me",
328+
application_properties: {
329+
test: "bar",
330+
},
331+
})
332+
await publisher.publish(filteredMessage)
333+
await publisher.publish(discardedMessage)
334+
let received: string = ""
335+
336+
const consumer = await connection.createConsumer({
337+
stream: {
338+
name: streamName,
339+
offset: Offset.first(),
340+
matchUnfiltered: false,
341+
sqlFilter: "application_properties.test = 'foo'",
342+
},
343+
messageHandler: (context, message) => {
344+
if (message.application_properties && message.application_properties.test == "foo") {
234345
received = message.body
235346
}
236347
context.accept()

0 commit comments

Comments
 (0)