Skip to content

Commit f574f1f

Browse files
authored
Add configurable policies for consumer credit flow (#196)
* feature: configurable policies for consumer credit flow
1 parent 6bcd40b commit f574f1f

File tree

6 files changed

+281
-37
lines changed

6 files changed

+281
-37
lines changed

src/client.ts

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
1111
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
1212
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
1313
import { CreateSuperStreamRequest } from "./requests/create_super_stream_request"
14-
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
14+
import { CreditRequest } from "./requests/credit_request"
1515
import { DeclarePublisherRequest } from "./requests/declare_publisher_request"
1616
import { DeletePublisherRequest } from "./requests/delete_publisher_request"
1717
import { DeleteStreamRequest } from "./requests/delete_stream_request"
@@ -42,6 +42,7 @@ import { UnsubscribeResponse } from "./responses/unsubscribe_response"
4242
import { SuperStreamConsumer } from "./super_stream_consumer"
4343
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
4444
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, sample } from "./util"
45+
import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy"
4546

4647
export type ConnectionClosedListener = (hadError: boolean) => void
4748

@@ -54,6 +55,13 @@ type PublisherMappedValue = {
5455
params: DeclarePublisherParams
5556
filter: FilterFunc | undefined
5657
}
58+
59+
type DeliverData = {
60+
messages: Message[]
61+
messageFilteringSupported: boolean
62+
subscriptionId: number
63+
consumerId: string
64+
}
5765
export class Client {
5866
public readonly id: string = randomUUID()
5967
private consumers = new Map<string, ConsumerMappedValue>()
@@ -185,7 +193,14 @@ export class Client {
185193

186194
const consumer = new StreamConsumer(
187195
handle,
188-
{ connection, stream: params.stream, consumerId, consumerRef: params.consumerRef, offset: params.offset },
196+
{
197+
connection,
198+
stream: params.stream,
199+
consumerId,
200+
consumerRef: params.consumerRef,
201+
offset: params.offset,
202+
creditPolicy: params.creditPolicy,
203+
},
189204
params.filter
190205
)
191206
connection.on("metadata_update", async (metadata) => {
@@ -466,8 +481,15 @@ export class Client {
466481
properties["match-unfiltered"] = `${params.filter.matchUnfiltered}`
467482
}
468483

484+
const creditPolicy = params.creditPolicy || defaultCreditPolicy
485+
469486
const res = await connection.sendAndWait<SubscribeResponse>(
470-
new SubscribeRequest({ ...params, subscriptionId: consumerId, credit: 10, properties: properties })
487+
new SubscribeRequest({
488+
...params,
489+
subscriptionId: consumerId,
490+
credit: creditPolicy.onSubscription(),
491+
properties: properties,
492+
})
471493
)
472494

473495
if (!res.ok) {
@@ -476,50 +498,61 @@ export class Client {
476498
}
477499
}
478500

479-
private askForCredit(params: CreditRequestParams, connection: Connection): Promise<void> {
480-
return connection.send(new CreditRequest({ ...params }))
501+
private askForCredit(subscriptionId: number, connection: Connection): CreditRequestWrapper {
502+
return async (howMany: number) => {
503+
return connection.send(new CreditRequest({ subscriptionId: subscriptionId, credit: howMany }))
504+
}
481505
}
482506

483507
private getDeliverV1Callback(connectionId: string) {
484508
return async (response: DeliverResponse) => {
485-
const { consumer, connection } = this.consumers.get(
486-
computeExtendedConsumerId(response.subscriptionId, connectionId)
487-
) ?? {
488-
consumer: undefined,
489-
connection: undefined,
490-
}
491-
if (!consumer) {
492-
this.logger.error(`On deliverV1 no consumer found`)
493-
return
509+
const deliverData = {
510+
messages: response.messages,
511+
subscriptionId: response.subscriptionId,
512+
consumerId: computeExtendedConsumerId(response.subscriptionId, connectionId),
513+
messageFilteringSupported: false,
494514
}
495-
this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`)
496-
this.logger.debug(`response.messages.length: ${response.messages.length}`)
497-
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection)
498-
response.messages.map((x) => consumer.handle(x))
515+
await this.handleDelivery(deliverData)
499516
}
500517
}
501518

502519
private getDeliverV2Callback(connectionId: string) {
503520
return async (response: DeliverResponseV2) => {
504-
const { consumer, connection } = this.consumers.get(
505-
computeExtendedConsumerId(response.subscriptionId, connectionId)
506-
) ?? {
507-
consumer: undefined,
508-
connection: undefined,
521+
const deliverData = {
522+
messages: response.messages,
523+
subscriptionId: response.subscriptionId,
524+
consumerId: computeExtendedConsumerId(response.subscriptionId, connectionId),
525+
messageFilteringSupported: true,
509526
}
510-
if (!consumer) {
511-
this.logger.error(`On deliverV2 no consumer found`)
512-
return
513-
}
514-
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
515-
this.logger.debug(`response.messages.length: ${response.messages.length}`)
516-
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId }, connection)
517-
if (consumer.filter) {
518-
response.messages.filter((x) => consumer.filter?.postFilterFunc(x)).map((x) => consumer.handle(x))
519-
return
520-
}
521-
response.messages.map((x) => consumer.handle(x))
527+
await this.handleDelivery(deliverData)
528+
}
529+
}
530+
531+
private handleDelivery = async (deliverData: DeliverData) => {
532+
const { messages, subscriptionId, consumerId, messageFilteringSupported } = deliverData
533+
const { consumer, connection } = this.consumers.get(consumerId) ?? {
534+
consumer: undefined,
535+
connection: undefined,
522536
}
537+
if (!consumer) {
538+
this.logger.error(`On delivery, no consumer found`)
539+
return
540+
}
541+
this.logger.debug(`on delivery -> ${consumer.consumerRef}`)
542+
this.logger.debug(`response.messages.length: ${messages.length}`)
543+
544+
const creditRequestWrapper = this.askForCredit(subscriptionId, connection)
545+
await consumer.creditPolicy.onChunkReceived(creditRequestWrapper)
546+
const messageFilter =
547+
messageFilteringSupported && consumer.filter?.postFilterFunc
548+
? consumer.filter?.postFilterFunc
549+
: (_msg: Message) => true
550+
551+
messages.map((message) => {
552+
if (messageFilter(message)) consumer.handle(message)
553+
})
554+
555+
await consumer.creditPolicy.onChunkCompleted(creditRequestWrapper)
523556
}
524557

525558
private getConsumerUpdateCallback(connectionId: string) {
@@ -700,9 +733,11 @@ export interface DeclareSuperStreamPublisherParams {
700733
routingStrategy?: RoutingStrategy
701734
}
702735

736+
export type MessageFilter = (msg: Message) => boolean
737+
703738
export interface ConsumerFilter {
704739
values: string[]
705-
postFilterFunc: (msg: Message) => boolean
740+
postFilterFunc: MessageFilter
706741
matchUnfiltered: boolean
707742
}
708743

@@ -713,6 +748,7 @@ export interface DeclareConsumerParams {
713748
connectionClosedListener?: ConnectionClosedListener
714749
singleActive?: boolean
715750
filter?: ConsumerFilter
751+
creditPolicy?: ConsumerCreditPolicy
716752
}
717753

718754
export interface DeclareSuperStreamConsumerParams {

src/consumer.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ConsumerFilter } from "./client"
22
import { ConnectionInfo, Connection } from "./connection"
33
import { ConnectionPool } from "./connection_pool"
4+
import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy"
45
import { Message } from "./publisher"
56
import { Offset } from "./requests/subscribe_request"
67

@@ -26,6 +27,7 @@ export class StreamConsumer implements Consumer {
2627
public consumerRef?: string
2728
public offset: Offset
2829
private clientLocalOffset: Offset
30+
private creditsHandler: ConsumerCreditPolicy
2931
readonly handle: ConsumerFunc
3032

3133
constructor(
@@ -36,6 +38,7 @@ export class StreamConsumer implements Consumer {
3638
consumerId: number
3739
consumerRef?: string
3840
offset: Offset
41+
creditPolicy?: ConsumerCreditPolicy
3942
},
4043
readonly filter?: ConsumerFilter
4144
) {
@@ -46,6 +49,7 @@ export class StreamConsumer implements Consumer {
4649
this.offset = params.offset
4750
this.clientLocalOffset = this.offset.clone()
4851
this.connection.incrRefCount()
52+
this.creditsHandler = params.creditPolicy || defaultCreditPolicy
4953
this.handle = this.wrapHandle(handle, params.offset)
5054
}
5155

@@ -109,4 +113,8 @@ export class StreamConsumer implements Consumer {
109113
public get extendedId(): string {
110114
return computeExtendedConsumerId(this.consumerId, this.connection.connectionId)
111115
}
116+
117+
public get creditPolicy() {
118+
return this.creditsHandler
119+
}
112120
}

src/consumer_credit_policy.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
export type CreditRequestWrapper = (howMany: number) => Promise<void>
2+
3+
export abstract class ConsumerCreditPolicy {
4+
constructor(protected readonly startFrom: number) {}
5+
6+
public async onChunkReceived(_requestWrapper: CreditRequestWrapper) {
7+
return
8+
}
9+
10+
public async onChunkCompleted(_requestWrapper: CreditRequestWrapper) {
11+
return
12+
}
13+
14+
public async requestCredits(requestWrapper: CreditRequestWrapper, amount: number) {
15+
return requestWrapper(amount)
16+
}
17+
18+
public onSubscription() {
19+
return this.startFrom
20+
}
21+
}
22+
23+
class NewCreditsOnChunkReceived extends ConsumerCreditPolicy {
24+
constructor(startFrom: number = 1, private readonly step: number = 1) {
25+
super(startFrom)
26+
}
27+
28+
public async onChunkReceived(requestWrapper: CreditRequestWrapper) {
29+
await this.requestCredits(requestWrapper, this.step)
30+
}
31+
32+
public onSubscription(): number {
33+
return this.startFrom
34+
}
35+
}
36+
37+
class NewCreditsOnChunkCompleted extends ConsumerCreditPolicy {
38+
constructor(startFrom: number = 1, private readonly step: number = 1) {
39+
super(startFrom)
40+
}
41+
42+
public async onChunkCompleted(requestWrapper: CreditRequestWrapper) {
43+
await this.requestCredits(requestWrapper, this.step)
44+
}
45+
}
46+
47+
export const creditsOnChunkReceived = (startFrom: number, step: number) =>
48+
new NewCreditsOnChunkReceived(startFrom, step)
49+
export const creditsOnChunkCompleted = (startFrom: number, step: number) =>
50+
new NewCreditsOnChunkCompleted(startFrom, step)
51+
export const defaultCreditPolicy = creditsOnChunkReceived(2, 1)

0 commit comments

Comments
 (0)