|
1 | 1 | # Message bus queue connectors |
| 2 | + |
2 | 3 | This is the package for the message bus queue connectors component for LoopBack 4 applications. |
3 | | -It provides components to work with queues such as SQS, BullMQ |
| 4 | +It provides components to work with queues such as SQS, BullMQ and EventBridge |
4 | 5 |
|
5 | | -[[email protected])](http://loopback.io/) |
| 6 | +[[email protected]>)](http://loopback.io/) |
6 | 7 |
|
7 | 8 | ## Installation |
8 | 9 |
|
9 | | -Install MessageBusQueueConnectorsComponent using `npm`; |
| 10 | +Install EventStreamConnectorComponent using `npm`; |
10 | 11 |
|
11 | 12 | ```sh |
12 | | -$ [npm install | yarn add] message-bus-queue-connectors |
| 13 | +$ [npm install | yarn add] @sourceloop/message-bus-queue-connectors |
13 | 14 | ``` |
| 15 | +## Flow Diagram |
| 16 | + |
| 17 | +<img width="659" alt="Screenshot 2025-06-06 at 10 53 06 AM" src="https://github.com/user-attachments/assets/baf1bcaa-5f67-44bb-a01a-b8d1c41644bc" /> |
14 | 18 |
|
15 | 19 | ## Basic Use |
16 | 20 |
|
17 | | -Configure and load MessageBusQueueConnectorsComponent in the application constructor |
| 21 | +Configure and load EventStreamConnectorComponent in the application constructor |
18 | 22 | as shown below. |
19 | 23 |
|
20 | | -### SQS |
21 | 24 | ```ts |
22 | | -import {SqsProducerProvider, SQSBindings, SQSConsumerObserver, SQSConsumerProvider} from 'message-bus-queue-connectors/sqs'; |
| 25 | +import { |
| 26 | + EventStreamConnectorComponent |
| 27 | +} from '@sourceloop/message-bus-queue-connectors'; |
23 | 28 |
|
24 | 29 | // ... |
25 | | -export class MyApplication extends BootMixin(ServiceMixin(RepositoryMixin(RestApplication))) { |
| 30 | +export class MyApplication extends BootMixin( |
| 31 | + ServiceMixin(RepositoryMixin(RestApplication)), |
| 32 | +) { |
26 | 33 | constructor(options: ApplicationConfig = {}) { |
27 | 34 | super(); |
28 | | - this.bind(SqsClientBindings.Config).to( |
29 | | - options.sqsConfig |
30 | | - ); |
31 | | - |
32 | | - this.bind(SQSBindings.SQSProducerProvider).toProvider(SqsProducerProvider); |
| 35 | + this.component(EventStreamConnectorComponent); |
33 | 36 | // ... |
| 37 | + } |
| 38 | + // ... |
| 39 | +} |
| 40 | +``` |
34 | 41 |
|
35 | | - // Add lifecycle observer |
36 | | - this.lifeCycleObserver(SQSConsumerObserver); |
| 42 | +### SQS |
| 43 | + |
| 44 | +To use SQS as their message queue, bind its required config and connector component in your application. |
| 45 | + |
| 46 | +```ts |
| 47 | +import { |
| 48 | + SQSConnector, |
| 49 | + SQSBindings, |
| 50 | + EventStreamConnectorComponent |
| 51 | +} from '@sourceloop/message-bus-queue-connectors'; |
| 52 | + |
| 53 | +// ... |
| 54 | +export class MyApplication extends BootMixin( |
| 55 | + ServiceMixin(RepositoryMixin(RestApplication)), |
| 56 | +) { |
| 57 | + constructor(options: ApplicationConfig = {}) { |
| 58 | + super(); |
| 59 | + |
| 60 | + this.component(EventStreamConnectorComponent); |
| 61 | + // SQS Config and its connector |
| 62 | + this.bind(SQSBindings.Config).to({ |
| 63 | + queueConfig: { |
| 64 | + QueueUrl: 'http://127.0.0.1:4566/000000000000/my-test-queue', |
| 65 | + MessageRetentionPeriod: 60, // at least 60 seconds |
| 66 | + MaximumMessageSize: 262144, |
| 67 | + ReceiveMessageWaitTimeSeconds: 20, // typical polling time |
| 68 | + VisibilityTimeout: 30, // 30 seconds |
| 69 | + }, |
| 70 | + Credentials: { |
| 71 | + region: 'us-east-1', |
| 72 | + accessKeyId: 'test', |
| 73 | + secretAccessKey: 'test', |
| 74 | + }, |
| 75 | + ConsumerConfig: { |
| 76 | + MaxNumberOfMessages: 10, |
| 77 | + WaitTimeSeconds: 20, |
| 78 | + maxConsumers: 2, |
| 79 | + }, |
| 80 | + }); |
| 81 | + |
| 82 | + this.component(SQSConnector); |
37 | 83 |
|
38 | 84 | // ... |
39 | 85 | } |
40 | 86 | // ... |
41 | 87 | } |
42 | 88 | ``` |
43 | 89 |
|
44 | | -#### SQS Config |
| 90 | +to make the application as consumer, pass 'isConsumer' flag to be true in SQS config. like |
| 91 | + |
45 | 92 | ```ts |
46 | 93 | const config = { |
47 | | - queueConfig: { |
48 | | - QueueUrl: "sqs-queue-url",, |
49 | | - MessageRetentionPeriod: 1, |
50 | | - MaximumMessageSize: 262144 , |
51 | | - ReceiveMessageWaitTimeSeconds: 60, |
52 | | - VisibilityTimeout: 300, |
53 | | - }, |
54 | | - Credentials: { |
55 | | - region: "aws-region", |
56 | | - accessKeyId: "aws-access-key-id", |
57 | | - secretAccessKey: "aws-secret-access-key", |
58 | | - }, |
59 | | - ConsumerConfig: { |
60 | | - MaxNumberOfMessages: 10, |
61 | | - WaitTimeSeconds: 20, |
62 | | - maxConsumers: 2, |
63 | | - }, |
64 | | -} |
65 | | - |
| 94 | + // rest of ur config |
| 95 | + isConsumer: true, |
| 96 | +}; |
66 | 97 | ``` |
67 | | -#### Consumer setup |
68 | | -Below is consumer handler example. |
69 | | -```ts |
70 | | -this.bind(SQSBindings.SQSConsumerProvider).toProvider(SQSConsumerProvider); |
71 | | - // eslint-disable-next-line @typescript-eslint/no-explicit-any |
72 | | - this.bind(SQSBindings.SQSConsumerHandler).to(async (message: string) => { |
73 | | - console.log('Processing message SQS---------:', message); |
74 | | - |
75 | | - }); |
76 | 98 |
|
77 | | -``` |
78 | 99 | Please follow the [AWS SDK for JavaScript](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/sqs-examples-send-receive-messages.html) for more information on the configuration. |
79 | 100 |
|
80 | | - |
81 | 101 | ### BullMQ |
82 | 102 |
|
| 103 | +To use BullMq as their message queue, bind its required config and connector component in your application. |
| 104 | + |
83 | 105 | ```ts |
84 | | -import {BullMQProducerProvider, BullMQBindings, BullMQConsumerObserver} from 'message-bus-queue-connectors/bullmq'; |
| 106 | +import { |
| 107 | + BullMQConnector, |
| 108 | + BullMQBindings, |
| 109 | + EventStreamConnectorComponent, |
| 110 | +} from '@sourceloop/message-bus-queue-connectors'; |
85 | 111 |
|
86 | 112 | // ... |
87 | | -export class MyApplication extends BootMixin(ServiceMixin(RepositoryMixin(RestApplication))) { |
| 113 | +export class MyApplication extends BootMixin( |
| 114 | + ServiceMixin(RepositoryMixin(RestApplication)), |
| 115 | +) { |
88 | 116 | constructor(options: ApplicationConfig = {}) { |
89 | 117 | super(); |
90 | | - this.bind(BullMQBindings.Config).to( |
91 | | - options.config |
92 | | - ); |
93 | | - |
94 | | - this.bind(BullMQBindings.BullMQProducerProvider).toProvider(BullMQProducerProvider); |
95 | | - // ... |
96 | | - // Add lifecycle observer |
97 | | - this.lifeCycleObserver(BullMQConsumerObserver); |
| 118 | + |
| 119 | + this.component(EventStreamConnectorComponent); |
| 120 | + |
| 121 | + // Bull Mq config and connector |
| 122 | + this.bind(BullMQBindings.Config).to({ |
| 123 | + QueueName: process.env.QUEUE_NAME ?? 'default-queue', |
| 124 | + redisConfig: { |
| 125 | + host: process.env.REDIS_HOST ?? 'localhost', |
| 126 | + port: parseInt(process.env.REDIS_PORT ?? '6379'), |
| 127 | + password: process.env.REDIS_PASSWORD ?? undefined, |
| 128 | + }, |
| 129 | + producerConfig: { |
| 130 | + defaultJobOptions: { |
| 131 | + attempts: 3, |
| 132 | + backoff: 5000, |
| 133 | + }, |
| 134 | + }, |
| 135 | + consumerConfig: { |
| 136 | + MinConsumers: 1, |
| 137 | + MaxConsumers: 5, |
| 138 | + QueuePollInterval: 2000, |
| 139 | + }, |
| 140 | + }); |
| 141 | + this.component(BullMQConnector); |
98 | 142 | // ... |
99 | 143 | } |
100 | 144 | // ... |
101 | 145 | } |
102 | 146 | ``` |
103 | 147 |
|
104 | | -#### BullMQ config |
| 148 | +to make the application as consumer, pass 'isConsumer' flag to be true in Bull config. like |
| 149 | + |
105 | 150 | ```ts |
106 | 151 | const config = { |
107 | | - queueConfig:{ |
108 | | - QueueName: 'BullMQ1', |
109 | | - }, |
110 | | - QueueName: 'BullMQ1', |
111 | | - producerConfig: { |
112 | | - defaultJobOptions: {attempts: 3, |
113 | | - backoff: { |
114 | | - type: 'exponential', |
115 | | - delay: 5000, |
116 | | - }, |
117 | | - } |
118 | | - }, |
119 | | - consumerConfig: { |
120 | | - MaxConsumers: 1, |
121 | | - MinConsumers: 1, |
122 | | - }, |
123 | | - redisConfig: { |
124 | | - host: process.env.REDIS_HOST ?? 'localhost', |
125 | | - port: +(process.env.REDIS_PORT ?? 6379), |
126 | | - } |
127 | | - |
| 152 | + // rest of ur config |
| 153 | + isConsumer: true, |
| 154 | +}; |
| 155 | +``` |
| 156 | + |
| 157 | +## Integration |
| 158 | + |
| 159 | + @sourceloop/message-bus-queue-connectors provides a decorator '@producer()' that can be used to access the producer of each msg queue. It expects one arguement defining the type of queue, of which producer u want to use. like |
| 160 | + |
| 161 | + ```ts |
| 162 | + @injectable({scope: BindingScope.TRANSIENT}) |
| 163 | +export class EventConnector implements IEventConnector<PublishedEvents> { |
| 164 | + constructor( |
| 165 | + @producer(QueueType.EventBridge) |
| 166 | + private producer: Producer, |
| 167 | + @producer(QueueType.SQS) |
| 168 | + private sqsProducer: Producer, |
| 169 | + @producer(QueueType.BullMQ) |
| 170 | + private bullMqProducer: Producer, |
| 171 | + ) {} |
| 172 | + |
| 173 | + // rest of implementation |
| 174 | + |
128 | 175 | } |
| 176 | + ``` |
| 177 | + |
| 178 | + Producer provider two ways of sending events - single event at a time and multiple event at a time. |
| 179 | + |
| 180 | + ```ts |
| 181 | + export type Producer<Stream extends AnyObject = AnyObject> = { |
| 182 | + send: <Event extends keyof Stream>(data: Stream[Event], topic?: Event) => Promise<void>; |
| 183 | + sendMultiple: <Event extends keyof Stream>(data: Stream[Event][], topic?: Event) => Promise<void>; |
| 184 | +}; |
| 185 | + ``` |
129 | 186 |
|
| 187 | +It provides '@consumer' decorator to make a service as consumer. consumer needs to follow an interface. |
| 188 | + |
| 189 | +```ts |
| 190 | +export interface IConsumer<Stream extends AnyObject, Event extends keyof Stream> { |
| 191 | + event: Event; |
| 192 | + queue: QueueType; |
| 193 | + handle(data: Stream[Event]): Promise<void>; |
| 194 | +} |
130 | 195 | ``` |
131 | 196 |
|
132 | | -#### Consumer setup |
133 | | -Below is consumer handler example. |
| 197 | +and can be used as |
| 198 | + |
134 | 199 | ```ts |
135 | | -this.bind(BullMQBindings.BullMQConsumerProvider).toProvider(BullMQConsumerProvider); |
136 | | - |
137 | | - this.bind(BullMQBindings.BullMQConsumerHandler).to(async (message: string) => { |
138 | | - console.log('Processing message ---------:', message); |
139 | | - }); |
| 200 | +import { |
| 201 | + IConsumer, |
| 202 | + QueueType, |
| 203 | + consumer, |
| 204 | +} from '@sourceloop/message-bus-queue-connectors'; |
| 205 | +import { OrchestratorStream, EventTypes, ProvisioningInputs } from '../../types'; |
| 206 | + |
| 207 | +@consumer |
| 208 | +export class TenantProvisioningConsumerForEventSQS |
| 209 | + implements IConsumer<OrchestratorStream, EventTypes.TENANT_PROVISIONING> |
| 210 | +{ |
| 211 | + constructor( |
| 212 | + ) {} |
| 213 | + event: EventTypes.TENANT_PROVISIONING = EventTypes.TENANT_PROVISIONING; |
| 214 | + queue: QueueType = QueueType.SQS; |
| 215 | + async handle(data: ProvisioningInputs): Promise<void> { |
| 216 | + console.log(`SQS: ${this.event} Event Recieved ` + JSON.stringify(data)); |
| 217 | + return; |
| 218 | + } |
| 219 | +} |
140 | 220 | ``` |
0 commit comments