Skip to content

Commit e43bcde

Browse files
Update misk-aws2-sqs docs
GitOrigin-RevId: 494d568349ec94a43673f2733ff01d7f00278903
1 parent 4976d8b commit e43bcde

File tree

1 file changed

+244
-52
lines changed

1 file changed

+244
-52
lines changed

misk-aws2-sqs/README.md

Lines changed: 244 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,244 @@
11
# Module: AWS SQS Module
22

3-
**This module is still in experimental state. It's a work in progress towards:**
4-
* supporting suspending handlers
5-
* migration to AWS SDK v2
3+
**This module is still in experimental state.**
64

7-
## Differences to the previous implementation
5+
This module provides a modern, Kotlin-first interface for working with Amazon Simple Queue Service (SQS) in Misk applications. It leverages Kotlin's coroutines and structured concurrency to offer an efficient, type-safe, and easy-to-use message processing system.
86

9-
* uses AWS SDK v2
10-
* exposes suspending API
11-
* handlers return status and don't make calls to SQS. Acknowledging jobs is done by the framework code
12-
* no dependency on the lease module. There will be at least one handler per service instance
13-
* no dependency on the feature flags
14-
* metrics are updated to v2, names of the metrics have been changed
7+
### Key Features
158

16-
## Migration
9+
- **Kotlin Coroutines Integration**: Native support for suspending functions and structured concurrency
10+
- **Flexible Configuration**: Global and per-queue configuration options for fine-tuned performance
11+
- **Advanced Queue Management**:
12+
- Automatic retry queue setup and management
13+
- Dead-letter queue support for failed message handling
14+
- Configurable visibility timeout and message retention
15+
- **Scalable Processing**:
16+
- Concurrent message processing with configurable parallelism
17+
- Efficient message batching and pre-fetching
18+
- Automatic backoff for failed messages
19+
- **Robust Error Handling**:
20+
- Automatic message recovery on failures
21+
- Configurable retry policies
22+
- Dead-letter queue routing for unprocessable messages
23+
- **Monitoring Ready**:
24+
- Built-in metrics for queue operations
25+
- Detailed logging for debugging and monitoring
26+
- Performance tracking capabilities
27+
- **Type-safe Message Handling**:
28+
- Support for both blocking and suspending handlers
29+
- Strongly typed message attributes
30+
- Automatic message acknowledgment
31+
32+
This module is designed to be a drop-in replacement for the previous AWS SQS implementation, offering improved performance, better resource utilization, and a more developer-friendly API.
33+
34+
## Installation
35+
36+
Follow these steps to integrate the AWS SQS module into your Misk application:
37+
38+
### 1. Add Dependencies
39+
40+
Add the following dependency to your `build.gradle.kts`:
41+
42+
```kotlin
43+
dependencies {
44+
implementation(project(":misk-aws2-sqs"))
45+
}
46+
```
47+
48+
### 2. Install Required Modules
49+
50+
Install the SQS module in your application by adding it to your service module:
51+
52+
```kotlin
53+
import com.squareup.cash.orc.orcOverride
54+
import com.squareup.skim.aws.sqs.CommonCashQueues
55+
import misk.annotation.ExperimentalMiskApi
56+
import misk.aws2.sqs.jobqueue.DeadLetterQueueProvider
57+
import misk.aws2.sqs.jobqueue.SqsJobHandlerModule
58+
import misk.aws2.sqs.jobqueue.SqsJobQueueModule
59+
import misk.aws2.sqs.jobqueue.StaticDeadLetterQueueProvider
60+
import misk.aws2.sqs.jobqueue.config.SqsConfig
61+
import misk.inject.KAbstractModule
62+
import misk.jobqueue.QueueName
63+
import misk.web.WebActionModule
64+
65+
@OptIn(ExperimentalMiskApi::class)
66+
class SqsModule(private val config: SqsConfig) : KAbstractModule() {
67+
override fun configure() {
68+
// Override configuration in local development to support Orc connection
69+
install(SqsJobQueueModule(config) { orcOverride("sqs") })
70+
71+
install(SqsJobHandlerModule.create<ExampleSuspendingHandler>(EXEMPLAR_QUEUE))
72+
bind<DeadLetterQueueProvider>().toInstance(StaticDeadLetterQueueProvider(CommonCashQueues.GLOBAL_DLQ))
73+
}
74+
75+
companion object {
76+
val EXEMPLAR_QUEUE = QueueName("cash-misk-suspending-exemplar_queue")
77+
}
78+
}
79+
```
80+
81+
### 3. Define Message Handlers
82+
83+
Create handlers for your queues. You can choose between two types of handlers:
84+
85+
#### Suspending Handler
1786

18-
TODO - this section will have detailed steps for migrating from the previous implementation
87+
Suspending handler allows you to call coroutines and manages the coroutine scope for you:
88+
89+
```kotlin
90+
class ExampleSuspendingHandler @Inject constructor() : SuspendingJobHandler {
91+
override suspend fun handleJob(job: Job): JobStatus {
92+
// Process the message
93+
val messageBody = job.body
94+
95+
return when {
96+
// Successfully processed
97+
-> JobStatus.OK
98+
// Message should be retried
99+
-> JobStatus.RETRY_LATER
100+
// Message should be sent to dead-letter queue
101+
-> JobStatus.DEAD_LETTER
102+
// Message should be retried with exponential backoff
103+
-> JobStatus.RETRY_WITH_BACKOFF
104+
}
105+
}
106+
}
107+
```
108+
109+
#### Blocking Handler
110+
111+
If you don't want to use any suspending code in your handling, you can use `BlockingJobHandler`
112+
113+
```kotlin
114+
class ExampleBlockingHandler @Inject constructor() : BlockingJobHandler {
115+
override fun handleJob(job: Job): JobStatus {
116+
// Process the message
117+
val messageBody = job.body
118+
119+
return JobStatus.OK
120+
}
121+
}
122+
```
123+
124+
### 4. Add Configuration
125+
126+
If you want to leverage yaml configuration, follow the steps below. Alternatively you can pass an instance of
127+
`misk.aws2.sqs.jobqueue.config.SqsConfig` to `SqsJobQueueModule` in the example guice module definition above.
128+
In such case it's up to you how you read the values (hardcoded, feature flags, etc.).
129+
130+
#### 4.1 Create Configuration Class
131+
Add the SQS configuration to your service configuration class:
132+
133+
```kotlin
134+
import misk.aws2.sqs.jobqueue.config.SqsConfig
135+
import misk.config.Config
136+
137+
data class ExampleServiceConfig(
138+
val aws_sqs: SqsConfig,
139+
// ... other config properties
140+
) : Config
141+
```
142+
143+
#### 4.2 Add YAML Configuration
144+
Add the SQS configuration to your service YAML file
145+
146+
Example relying on the defaults:
147+
```yaml
148+
aws_sqs: {}
149+
```
150+
151+
Example with some values defined:
152+
```yaml
153+
aws_sqs:
154+
all_queues:
155+
# Default configuration for all queues
156+
parallelism: 1
157+
concurrency: 5
158+
channel_capacity: 0
159+
wait_timeout: 20
160+
max_number_of_messages: 10
161+
install_retry_queue: true
162+
region: us-west-2 # Specify your AWS region
163+
164+
per_queue_overrides:
165+
# Optional: Override configuration for specific queues
166+
example_queue:
167+
parallelism: 2
168+
concurrency: 10
169+
```
170+
171+
### Testing
172+
173+
TODO
174+
175+
## Configuration
176+
177+
You can define default configuration for all queue or choose to override it per queue.
178+
179+
The default values are a good starting point. You can adjust them to match your specific performance and behavior needs.
180+
181+
### Concurrency related setting
182+
183+
Refer to the "threading model" section below for in-depth description.
184+
185+
* `parallelism`
186+
* default value: 1
187+
* defines how many threads are used to process a given queue
188+
* increase this parameter to increase throughput if your have a lot of computationally intensive or blocking IO code in
189+
your handlers. You will also need to increase `concurrency` to at least match this setting.
190+
* `concurrency`
191+
* default value: 1
192+
* defines how many coroutines are used to process a given queue. Coroutines are executed on
193+
the number of threads defined by the `parallelism` parameter.
194+
* increase this parameter to increase throughput if you use suspending code. If you also have some
195+
computationally intensive and/or blocking IO code, you will also need to bump `parallelism`
196+
* `channel_capacity`
197+
* default value: 0
198+
* defines the buffer size between the polling loop and handling loop. 0 means that the polling loop
199+
will wait until all messages from the current batch are picked up by handlers before requesting
200+
another batch of messages. 1 and above means how many jobs can wait in the buffer between the polling
201+
and handling loop
202+
* increase this parameter if you want to reduce the latency in job processing. This will be effective
203+
only if your handling code is slower than the polling code
204+
* note: if your `visibility_timeout` is low and your processing code is slow, non-zero value may lead
205+
to exceeding `visibility_timeout` even before the job is picked up by the handler. This may result
206+
in increased duplicated processing
207+
208+
### SQS Specific settings
209+
210+
* `max_number_of_messages`
211+
* default value: 10, range 1-10
212+
* defines the size of batch requested from SQS
213+
* this setting optimizes communication with SQS - less network round-trips. However, if given
214+
queue does not receive a lot of messages, a number higher than 1 may result long polling and
215+
negatively impact latency
216+
* `wait_timeout`
217+
* default value: null, range 0-20
218+
* defines how long (in seconds) the polling loop should wait for `max_number_of_messages` to be available. If no
219+
value is provided, queue settings will be used
220+
* see [Amazon SQS short and long polling](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html)
221+
* this setting optimizes communication with SQS - less network round-trips.
222+
* `visibility_timeout`
223+
* default value: null, 0-43200
224+
* defines how long (in seconds) given message will be invisible by other processes querying the queue.
225+
If no value is provided, queue settings will be used
226+
* note: if the value is smaller than typical processing time, it may result in the job being processed
227+
multiple times
228+
229+
### Connection settings
230+
231+
* `install_retry_queue`
232+
* default value: true
233+
* defines if a retry queue should be installed by default. Retry queue is populated by replaying
234+
failed jobs from DeadLetterOffice
235+
* `reqion`
236+
* default value: null
237+
* defines the AWS region to consume the messages from. By default, uses the deployment region of the service
238+
* `account_id`
239+
* default value: null
240+
* defined the AWS account id that own the queue. By default, uses the deployment account if of the service.
241+
This setting is needed for queue defined outside of our AWS accounts (external queues)
19242

20243
## Threading model
21244

@@ -40,46 +263,15 @@ It's advised to start with the default settings and adjust based on specific wor
40263

41264
![image](concurrency.jpg)
42265

43-
## Configuration
44-
45-
You can define default configuration for all queue or choose to override it per queue.
46-
47-
Example YAML configuration:
266+
## Differences to the previous misk-aws implementation
48267

49-
```yaml
50-
aws_sqs:
51-
all_queues:
52-
parallelism: 1
53-
concurrency: 5
54-
channel_capacity: 0
55-
wait_timeout: 20
56-
max_number_of_messages: 20
57-
install_retry_queue: true
58-
per_queue_overrides:
59-
ledger_validation_queue:
60-
parallelism: 5
61-
concurrency: 10
62-
channel_capacity: 5
63-
refund_delay_queue:
64-
concurrency: 5
65-
```
66-
67-
You also need to add this configuration to your service configuration class, for example:
68-
69-
```kotlin
70-
data class MiskSuspendingExemplarConfig(
71-
val skim: SkimServiceConfig,
72-
val aws_sqs: SqsConfig,
73-
) : Config
74-
```
75-
76-
## Outstanding todo items
77-
78-
The module will not be considered beta/GA state until the below items are completed.
268+
* uses AWS SDK v2
269+
* exposes suspending API
270+
* handlers return status and don't make calls to SQS. Acknowledging jobs is done by the framework code
271+
* no dependency on the lease module. There will be at least one handler per service instance
272+
* no dependency on the feature flags
273+
* metrics are updated to v2, names of the metrics have been changed
79274

80-
Outstanding work that needs to be done:
81-
* detailed test
82-
* detailed documentation
275+
## Migration
83276

84-
Outstanding things to document:
85-
* how batch size plays out with channel size and visibility timeout
277+
TODO - this section will have detailed steps for migrating from the previous implementation

0 commit comments

Comments
 (0)