Skip to content

Commit 96f3443

Browse files
authored
214 question async consumerfunc (#224)
* ConsumerFunc async * credit policy * consumer func return type * package lock * timeout * policy * package lock * readme * removed async in 3 tests * revert some async tests * removed async * add async consumer test
1 parent 2b6845d commit 96f3443

14 files changed

+155
-72
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,21 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
210210
// ...
211211
```
212212

213+
### Custom Policy
214+
By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing.
215+
216+
```typescript
217+
const consumerOptions = {
218+
stream: "stream-name",
219+
creditPolicy: creditsOnChunkReceived(2, 1)
220+
}
221+
222+
await client.declareConsumer(consumerOptions, async (message: Message) => {
223+
console.log(message.content)
224+
}
225+
)
226+
```
227+
213228
### Clustering
214229

215230
Every time we create a new producer or a new consumer, a new connection object is created. The underlying TCP connections can be shared among different producers and different consumers. Note however that:

example/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

performance_test/package-lock.json

Lines changed: 24 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/client.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,9 +564,11 @@ export class Client {
564564
? consumer.filter?.postFilterFunc
565565
: (_msg: Message) => true
566566

567-
messages.map((message) => {
568-
if (messageFilter(message)) consumer.handle(message)
569-
})
567+
for (const message of messages) {
568+
if (messageFilter(message)) {
569+
await consumer.handle(message)
570+
}
571+
}
570572

571573
await consumer.creditPolicy.onChunkCompleted(creditRequestWrapper)
572574
}

src/consumer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_pol
55
import { Message } from "./publisher"
66
import { Offset } from "./requests/subscribe_request"
77

8-
export type ConsumerFunc = (message: Message) => void
8+
export type ConsumerFunc = (message: Message) => Promise<void> | void
99
export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => {
1010
return `${consumerId}@${connectionId}`
1111
}
@@ -82,9 +82,9 @@ export class StreamConsumer implements Consumer {
8282
return this.clientLocalOffset.clone()
8383
}
8484

85-
public handle(message: Message) {
85+
public async handle(message: Message) {
8686
if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return
87-
this.consumerHandle(message)
87+
await this.consumerHandle(message)
8888
this.maybeUpdateLocalOffset(message)
8989
}
9090

src/consumer_credit_policy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,4 @@ export const creditsOnChunkReceived = (startFrom: number, step: number) =>
5454
new NewCreditsOnChunkReceived(startFrom, step)
5555
export const creditsOnChunkCompleted = (startFrom: number, step: number) =>
5656
new NewCreditsOnChunkCompleted(startFrom, step)
57-
export const defaultCreditPolicy = creditsOnChunkReceived(2, 1)
57+
export const defaultCreditPolicy = creditsOnChunkCompleted(1, 1)

test/e2e/address_resolver.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,15 @@ describe("address resolver", () => {
4040
})
4141

4242
it("declaring a consumer - should not throw", async () => {
43-
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => null)
43+
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => {
44+
return
45+
})
4446
})
4547

4648
it("declaring a consumer - if multiple nodes are present the consumer should be connected to a replica", async () => {
47-
const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => null)
49+
const consumer = await client.declareConsumer({ stream: streamName, offset: Offset.first() }, () => {
50+
return
51+
})
4852

4953
const connectionInfo = consumer.getConnectionInfo()
5054
const queueInfo = await rabbit.getQueueInfo(streamName)

test/e2e/close_consumer.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ describe("close consumer", () => {
3838

3939
it("closing a consumer in an existing stream", async () => {
4040
await client.declarePublisher({ stream: testStreamName })
41-
const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, () => null)
41+
const consumer = await client.declareConsumer({ stream: testStreamName, offset: Offset.first() }, () => {
42+
return
43+
})
4244

4345
const response = await client.closeConsumer(consumer.extendedId)
4446

test/e2e/declare_consumer.test.ts

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
getTestNodesFromEnv,
3030
password,
3131
username,
32+
waitSleeping,
3233
} from "../support/util"
3334

3435
describe("declare consumer", () => {
@@ -72,9 +73,21 @@ describe("declare consumer", () => {
7273
const messages: Buffer[] = []
7374
await publisher.send(Buffer.from("hello"))
7475

75-
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) =>
76+
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) => {
7677
messages.push(message.content)
77-
)
78+
})
79+
80+
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
81+
}).timeout(10000)
82+
83+
it("declaring an async consumer on an existing stream - the consumer should handle the message", async () => {
84+
const messages: Buffer[] = []
85+
await publisher.send(Buffer.from("hello"))
86+
87+
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, async (message: Message) => {
88+
await waitSleeping(10)
89+
messages.push(message.content)
90+
})
7891

7992
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
8093
}).timeout(10000)
@@ -86,15 +99,21 @@ describe("declare consumer", () => {
8699
await publisher.send(Buffer.from("hello"))
87100
await client.declareConsumer(
88101
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
89-
(message: Message) => messages.push(message.content)
102+
(message: Message) => {
103+
messages.push(message.content)
104+
}
90105
)
91106
await client.declareConsumer(
92107
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
93-
(message: Message) => messages.push(message.content)
108+
(message: Message) => {
109+
messages.push(message.content)
110+
}
94111
)
95112
await client.declareConsumer(
96113
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
97-
(message: Message) => messages.push(message.content)
114+
(message: Message) => {
115+
messages.push(message.content)
116+
}
98117
)
99118

100119
await eventually(() => expect(messages).eql([Buffer.from("hello")]))
@@ -105,16 +124,20 @@ describe("declare consumer", () => {
105124
const consumerRef = createConsumerRef()
106125

107126
await publisher.send(Buffer.from("hello"))
108-
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) =>
127+
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) => {
109128
messages.push(message.content)
110-
)
129+
})
111130
await client.declareConsumer(
112131
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
113-
(message: Message) => messages.push(message.content)
132+
(message: Message) => {
133+
messages.push(message.content)
134+
}
114135
)
115136
await client.declareConsumer(
116137
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
117-
(message: Message) => messages.push(message.content)
138+
(message: Message) => {
139+
messages.push(message.content)
140+
}
118141
)
119142

120143
await eventually(() => expect(messages).eql([Buffer.from("hello"), Buffer.from("hello")]))
@@ -128,19 +151,27 @@ describe("declare consumer", () => {
128151
await publisher.send(Buffer.from("hello"))
129152
await client.declareConsumer(
130153
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
131-
(message: Message) => messages.push(message.content)
154+
(message: Message) => {
155+
messages.push(message.content)
156+
}
132157
)
133158
await client.declareConsumer(
134159
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef },
135-
(message: Message) => messages.push(message.content)
160+
(message: Message) => {
161+
messages.push(message.content)
162+
}
136163
)
137164
await client.declareConsumer(
138165
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef1 },
139-
(message: Message) => messages.push(message.content)
166+
(message: Message) => {
167+
messages.push(message.content)
168+
}
140169
)
141170
await client.declareConsumer(
142171
{ stream: streamName, offset: Offset.first(), singleActive: true, consumerRef: consumerRef1 },
143-
(message: Message) => messages.push(message.content)
172+
(message: Message) => {
173+
messages.push(message.content)
174+
}
144175
)
145176

146177
await eventually(() => expect(messages).eql([Buffer.from("hello"), Buffer.from("hello")]))
@@ -155,7 +186,9 @@ describe("declare consumer", () => {
155186
async () => {
156187
await client.declareConsumer(
157188
{ stream: streamName, offset: Offset.first(), singleActive: true },
158-
(message: Message) => messages.push(message.content)
189+
(message: Message) => {
190+
messages.push(message.content)
191+
}
159192
)
160193
},
161194
Error,
@@ -169,9 +202,9 @@ describe("declare consumer", () => {
169202
await publisher.send(Buffer.from("world"))
170203
await publisher.send(Buffer.from("world"))
171204

172-
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) =>
205+
await client.declareConsumer({ stream: streamName, offset: Offset.first() }, (message: Message) => {
173206
messages.push(message.content)
174-
)
207+
})
175208

176209
await eventually(() => expect(messages).eql([Buffer.from("hello"), Buffer.from("world"), Buffer.from("world")]))
177210
}).timeout(10000)
@@ -192,7 +225,10 @@ describe("declare consumer", () => {
192225

193226
it("declaring a consumer on a non-existing stream should raise an error", async () => {
194227
await expectToThrowAsync(
195-
() => client.declareConsumer({ stream: nonExistingStreamName, offset: Offset.first() }, () => null),
228+
() =>
229+
client.declareConsumer({ stream: nonExistingStreamName, offset: Offset.first() }, () => {
230+
return
231+
}),
196232
Error,
197233
"Stream was not found on any node"
198234
)

test/e2e/filtering.test.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ describe("filtering", () => {
7474
matchUnfiltered: true,
7575
},
7676
},
77-
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
77+
(msg) => {
78+
filteredMsg.push(msg.content.toString("utf-8"))
79+
}
7880
)
7981

8082
await eventually(async () => {
@@ -107,13 +109,15 @@ describe("filtering", () => {
107109
matchUnfiltered: false,
108110
},
109111
},
110-
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
112+
(msg) => {
113+
filteredMsg.push(msg.content.toString("utf-8"))
114+
}
111115
)
112116

113117
await eventually(async () => {
114118
expect(filteredMsg.length).eql(2000)
115119
}, 10000)
116-
}).timeout(10000)
120+
}).timeout(15000)
117121

118122
it("published messages are filtered on the server side keeping even the ones with filter value", async () => {
119123
const filteredMsg: string[] = []
@@ -139,7 +143,9 @@ describe("filtering", () => {
139143
matchUnfiltered: true,
140144
},
141145
},
142-
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
146+
(msg) => {
147+
filteredMsg.push(msg.content.toString("utf-8"))
148+
}
143149
)
144150

145151
await eventually(async () => {

0 commit comments

Comments
 (0)