Skip to content

Commit a8f21fc

Browse files
dc1992icappello
andauthored
228 some tests are flaky (#229)
* increased timeout value * fix filter flaky test * update rabbitmq image * delete queues after tests * flush messages and increase timeout * another flaky test fixed * typo * increase timeout * rabbit version with minor * Update test/e2e/offset.test.ts Co-authored-by: icappello <[email protected]> * rollback * moved message sending later * added always * removed only * updated below threshold --------- Co-authored-by: icappello <[email protected]>
1 parent b73abe7 commit a8f21fc

File tree

9 files changed

+35
-15
lines changed

9 files changed

+35
-15
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020

2121
services:
2222
rabbitmq:
23-
image: rabbitmq:3.13-rc-management
23+
image: rabbitmq:4.0.5-management
2424
options: --hostname test-node --name test-node
2525
env:
2626
RABBITMQ_DEFAULT_USER: "test-user"

cluster/docker-compose.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ services:
99
networks:
1010
- back
1111
hostname: node0
12-
image: rabbitmq:3.13-rc-management
12+
image: rabbitmq:4.0.5-management
1313
ports:
1414
- "5560:5550"
1515
- "5561:5551"
@@ -27,7 +27,7 @@ services:
2727
networks:
2828
- back
2929
hostname: node1
30-
image: rabbitmq:3.13-rc-management
30+
image: rabbitmq:4.0.5-management
3131
ports:
3232
- "5570:5550"
3333
- "5571:5551"
@@ -45,7 +45,7 @@ services:
4545
networks:
4646
- back
4747
hostname: node2
48-
image: rabbitmq:3.13-rc-management
48+
image: rabbitmq:4.0.5-management
4949
ports:
5050
- "5580:5550"
5151
- "5581:5551"

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
rabbitmq-stream:
3-
image: rabbitmq:3.13-rc-management
3+
image: rabbitmq:4.0.5-management
44
container_name: rabbitmq-stream
55
restart: unless-stopped
66
hostname: "rabbitmq"

example/docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ version: "2"
22

33
services:
44
rabbitmq-stream:
5-
image: rabbitmq:3-management
5+
image: rabbitmq:4.0.5-management
66
container_name: rabbitmq-stream
77
restart: unless-stopped
88
hostname: "rabbitmq"

test/e2e/declare_consumer.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,15 @@ describe("declare consumer", () => {
277277
it("messageAnnotations with bytes are read correctly", async () => {
278278
const messageAnnotations: MessageAnnotations[] = []
279279
const annotations = { test: new AmqpByte(123) }
280-
await rabbit.createStream("testQ")
280+
await rabbit.createStream(streamName)
281281
await client.declareConsumer(
282-
{ stream: "testQ", offset: Offset.next(), consumerRef: "test" },
282+
{ stream: streamName, offset: Offset.next(), consumerRef: "test" },
283283
(message: Message) => {
284284
messageAnnotations.push(message.messageAnnotations ?? {})
285285
}
286286
)
287287

288-
const testP = await client.declarePublisher({ stream: "testQ" })
288+
const testP = await client.declarePublisher({ stream: streamName })
289289
await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations })
290290

291291
await eventually(async () => {

test/e2e/filtering.test.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ describe("filtering", () => {
8686
}).timeout(10000)
8787

8888
it("published messages are filtered on the server side keeping only the ones with filter value", async () => {
89-
const filteredMsg: string[] = []
89+
const expectedMessages: string[] = []
90+
const notCorrectlyFilteredMessages: string[] = []
9091
const publisher = await client.declarePublisher(
9192
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
9293
(msg) => (msg.applicationProperties ? msg.applicationProperties["test"].toString() : undefined)
@@ -110,12 +111,20 @@ describe("filtering", () => {
110111
},
111112
},
112113
(msg) => {
113-
filteredMsg.push(msg.content.toString("utf-8"))
114+
if (msg.applicationProperties?.test === "A" || msg.applicationProperties?.test === "B")
115+
expectedMessages.push(msg.content.toString("utf-8"))
116+
else notCorrectlyFilteredMessages.push(msg.content.toString("utf-8"))
114117
}
115118
)
116119

120+
//RabbitMQ uses a Bloom filter for server side filtering.
121+
//A Bloom filter is very efficient in terms of storage and speed, but it is probabilistic: it can return false positives.
122+
//Because of this, the broker can send messages it believes match the expected filter values whereas they do not. That's why some client-side filtering logic is necessary.
123+
//For this reason some messages may not be correctly filtered, but we expect the number of them to be very low.
124+
//For more information: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
117125
await eventually(async () => {
118-
expect(filteredMsg.length).eql(2000)
126+
expect(expectedMessages.length).eql(2000)
127+
expect(notCorrectlyFilteredMessages.length).below(150)
119128
}, 10000)
120129
}).timeout(15000)
121130

test/e2e/offset.test.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ describe("offset", () => {
150150
const receivedMessages: Message[] = []
151151
const publisher = await client.declarePublisher({ stream: testStreamName })
152152
const previousMessages = await sendANumberOfRandomMessages(publisher)
153-
await wait(10)
153+
await publisher.flush()
154+
await wait(100)
154155
const offset = new Date()
155-
const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length)
156156

157157
await client.declareConsumer(
158158
{ stream: testStreamName, consumerRef: "my consumer", offset: Offset.timestamp(offset) },
@@ -161,9 +161,15 @@ describe("offset", () => {
161161
}
162162
)
163163

164+
const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length)
165+
164166
await eventually(async () => {
165167
expect(receivedMessages).to.have.length(laterMessages.length)
166168
})
169+
170+
await always(async () => {
171+
expect(receivedMessages).to.have.length(laterMessages.length)
172+
})
167173
})
168174
})
169175

test/e2e/route_query.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ describe("RouteQuery command", () => {
3737
const streamName = randomUUID()
3838
await rabbit.createStream(streamName)
3939

40-
await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error)
40+
try {
41+
await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error)
42+
} finally {
43+
await rabbit.deleteStream(streamName)
44+
}
4145
})
4246
})

test/support/util.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ export const sendANumberOfRandomMessages = async (publisher: Publisher, offset =
216216
const noOfMessages = Math.floor(Math.random() * 10) + 1
217217
const messages = Array.from(Array(noOfMessages).keys()).map((_, i) => `Message number ${i + offset + 1}`)
218218
await Promise.all(messages.map((m) => publisher.send(Buffer.from(m))))
219+
await publisher.flush()
219220
return messages
220221
}
221222

0 commit comments

Comments
 (0)