Skip to content

Commit 3a53ee5

Browse files
author
magne
committed
chore: add javascript client library examples
1 parent d1e64fc commit 3a53ee5

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

docs/stream-filtering.md

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,16 @@ Msg = amqp10_msg:set_message_annotations(
188188
amqp10_client:send_msg(Sender, Msg),
189189
```
190190
</TabItem>
191+
192+
<TabItem value="javascript" label="JavaScript">
193+
```javascript
194+
const message = createAmqpMessage({
195+
body: "Hello World!",
196+
annotations: { "x-stream-filter-value": "invoices" }, // set Bloom filter value
197+
})
198+
await publisher.publish(message)
199+
```
200+
</TabItem>
191201
</Tabs>
192202

193203
A receiver must use a filter with descriptor `rabbitmq:stream-filter`.
@@ -317,6 +327,30 @@ after 5000 -> exit(missing_msg)
317327
end,
318328
```
319329
</TabItem>
330+
331+
<TabItem value="javascript" label="JavaScript">
332+
```javascript
333+
const consumer = await connection.createConsumer({
334+
stream: {
335+
name: "some-stream",
336+
offset: Offset.first(),
337+
matchUnfiltered: true,
338+
filterValues: ["invoices", "orders"], // This Bloom filter will be evaluated server-side per chunk (Stage 1).
339+
},
340+
messageHandler: (context, message) => {
341+
// This filter will be evaluated client-side per message (Stage 3).
342+
if (
343+
message.message_annotations &&
344+
["invoices", "orders"].includes(message.message_annotations["x-stream-filter-value"])
345+
) {
346+
// message processing
347+
}
348+
context.accept()
349+
},
350+
})
351+
consumer.start()
352+
```
353+
</TabItem>
320354
</Tabs>
321355

322356

@@ -542,6 +576,28 @@ Filter = #{<<"filter-name-1">> =>
542576
```
543577
</TabItem>
544578

579+
<TabItem value="javascript" label="JavaScript">
580+
```javascript
581+
const consumer = await connection.createConsumer({
582+
stream: {
583+
name: "my-queue",
584+
offset: Offset.first(),
585+
messagePropertiesFilter: {
586+
subject: "&p:Order",
587+
user_id: "John"
588+
},
589+
applicationPropertiesFilter: {
590+
region: "emea",
591+
},
592+
},
593+
messageHandler: (context, message) => {
594+
// process the messages
595+
},
596+
})
597+
consumer.start()
598+
```
599+
</TabItem>
600+
545601
</Tabs>
546602

547603
### SQL Filter Expressions
@@ -838,6 +894,23 @@ Filter = #{<<"sql-filter">> => #filter{descriptor = <<"amqp:sql-filter">>,
838894
```
839895
</TabItem>
840896

897+
<TabItem value="javascript" label="JavaScript">
898+
```javascript
899+
const consumer = await connection.createConsumer({
900+
stream: {
901+
name: "my-queue",
902+
offset: Offset.first(),
903+
sqlFilter: "properties.user_id = 'John' AND"
904+
+ "properties.subject LIKE 'Order%' AND region = 'emea'"
905+
},
906+
messageHandler: (context, message) => {
907+
// process the messages
908+
},
909+
})
910+
consumer.start()
911+
```
912+
</TabItem>
913+
841914
</Tabs>
842915

843916
### Error Handling
@@ -1013,6 +1086,27 @@ Filter = #{%% This Bloom filter will be evaluated server-side per chunk at stage
10131086
```
10141087
</TabItem>
10151088

1089+
<TabItem value="javascript" label="JavaScript">
1090+
```javascript
1091+
const consumer = await connection.createConsumer({
1092+
stream: {
1093+
name: "my-queue",
1094+
offset: Offset.first(),
1095+
filterValues: ["order.created"], // This Bloom filter will be evaluated server-side per chunk (Stage 1).
1096+
sqlFilter: "p.subject = 'order.created' AND " +
1097+
"p.creation_time > UTC() - 3600000 AND " +
1098+
"region IN ('AMER', 'EMEA', 'APJ') AND " +
1099+
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", // This complex SQL filter expression will be evaluted server-side
1100+
// per message at stage 2.
1101+
},
1102+
messageHandler: (context, message) => {
1103+
// message processing
1104+
},
1105+
})
1106+
consumer.start()
1107+
```
1108+
</TabItem>
1109+
10161110
</Tabs>
10171111

10181112
If `order.created` events represent only a small percentage of all events, RabbitMQ can filter the stream efficiently because only a small fraction of messages need to be parsed and evaluated in memory.

0 commit comments

Comments
 (0)