Skip to content

Commit 9159dad

Browse files
Introduce support for suspending SQS Jobqueue
implementation GitOrigin-RevId: 5981363e7cac52a5f9208645d60f259d585d5dd2
1 parent b072efd commit 9159dad

30 files changed

+1750
-0
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ aws2Core = { module = "software.amazon.awssdk:aws-core", version.ref = "aws2" }
3737
aws2Dynamodb = { module = "software.amazon.awssdk:dynamodb", version.ref = "aws2" }
3838
aws2DynamodbEnhanced = { module = "software.amazon.awssdk:dynamodb-enhanced", version.ref = "aws2" }
3939
aws2Regions = { module = "software.amazon.awssdk:regions" }
40+
aws2Sqs = { module = "software.amazon.awssdk:sqs", version.ref = "aws2" }
4041
awsCore = { module = "com.amazonaws:aws-java-sdk-core", version.ref = "aws1" }
4142
awsDynamodb = { module = "com.amazonaws:aws-java-sdk-dynamodb", version.ref = "aws1" }
4243
awsS3 = { module = "com.amazonaws:aws-java-sdk-s3", version.ref = "aws1" }

misk-aws2-sqs/README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Module: AWS SQS Module
2+
3+
**This module is still in experimental state. It's a work in progress towards:**
4+
* supporting suspending handlers
5+
* migration to AWS SDK v2
6+
7+
## Differences to the previous implementation
8+
9+
* uses AWS SDK v2
10+
* exposes suspending API
11+
* handlers return status and don't make calls to SQS. Acknowledging jobs is done by the framework code
12+
* no dependency on the lease module. There will be at least one handler per service instance
13+
* no dependency on the feature flags
14+
* metrics are updated to v2, names of the metrics have been changed
15+
16+
## Migration
17+
18+
TODO - this section will have detailed steps for migrating from the previous implementation
19+
20+
## Threading model
21+
22+
Receiving and processing messages is handled by separate views on the `Dispatchers.IO`:
23+
- querying SQS is done on a single thread
24+
- for each subscribed queue, a receiving coroutine is created
25+
- received jobs are sent to a dedicated per-queue channel with a default size of 0. This is configurable
26+
- by default, there is a single thread with a single coroutine running dedicated to processing
27+
of jobs from a given queue. This is configurable as well.
28+
29+
How the configuration impacts the processing:
30+
- increasing the channel size allows to pre-read jobs from SQS. This may be helpful to reduce the
31+
latency, but if the handler takes more time to process than the visibility timeout, it may lead
32+
to increased duplicated and out of order processing
33+
- increasing the concurrency spins up more coroutines per queue. This may increase the throughput
34+
of processing, if processing mostly uses non-blocking operations
35+
- increasing the parallelism will increase the thread pool size per-queue. Together with increased
36+
concurrency it may increase the throughput of processing, if processing involves heavy computations
37+
or blocking operations
38+
39+
It's advised to start with the default settings and adjust based on specific workloads.
40+
41+
![image](concurrency.jpg)
42+
43+
## Outstanding todo items
44+
45+
The module will not be considered beta/GA state until the below items are completed.
46+
47+
Outstanding work that needs to be done:
48+
* detailed test
49+
* tracing
50+
* test fixtures
51+
* external queues
52+
* installing retry queue only on request
53+
* pass in configuration (make it compatible with previous implementation if needed)
54+
* detailed documentation
55+
* batch size configuration
56+
57+
Things that are supported in the old documentation but are questionable:
58+
* aws queue attribute importer
59+
60+
Outstanding things to document:
61+
* how batch size plays out with channel size and visibility timeout
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
public abstract interface class misk/aws2/sqs/jobqueue/DeadLetterQueueProvider {
2+
public abstract fun deadLetterQueueFor (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
3+
}
4+
5+
public final class misk/aws2/sqs/jobqueue/DeadLetterQueueProviderKt {
6+
public static final fun getParentQueue (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
7+
public static final fun getRetryQueue (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
8+
public static final fun isRetryQueue (Lmisk/jobqueue/QueueName;)Z
9+
}
10+
11+
public final class misk/aws2/sqs/jobqueue/DefaultDeadLetterQueueProvider : misk/aws2/sqs/jobqueue/DeadLetterQueueProvider {
12+
public fun <init> ()V
13+
public fun deadLetterQueueFor (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
14+
}
15+
16+
public final class misk/aws2/sqs/jobqueue/QueueResolver {
17+
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;)V
18+
public final fun getQueueUrl (Lmisk/jobqueue/QueueName;)Ljava/lang/String;
19+
}
20+
21+
public final class misk/aws2/sqs/jobqueue/SqsJob : misk/jobqueue/v2/Job {
22+
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJob$Companion;
23+
public static final field JOBQUEUE_METADATA_ATTR Ljava/lang/String;
24+
public static final field JOBQUEUE_METADATA_IDEMPOTENCE_KEY Ljava/lang/String;
25+
public static final field JOBQUEUE_METADATA_ORIGIN_QUEUE Ljava/lang/String;
26+
public fun <init> (Lmisk/jobqueue/QueueName;Lcom/squareup/moshi/Moshi;Lsoftware/amazon/awssdk/services/sqs/model/Message;Ljava/lang/String;J)V
27+
public fun getAttributes ()Ljava/util/Map;
28+
public fun getBody ()Ljava/lang/String;
29+
public fun getId ()Ljava/lang/String;
30+
public fun getIdempotenceKey ()Ljava/lang/String;
31+
public final fun getMessage ()Lsoftware/amazon/awssdk/services/sqs/model/Message;
32+
public final fun getPublishToChannelTimestamp ()J
33+
public fun getQueueName ()Lmisk/jobqueue/QueueName;
34+
public final fun getQueueUrl ()Ljava/lang/String;
35+
}
36+
37+
public final class misk/aws2/sqs/jobqueue/SqsJob$Companion {
38+
}
39+
40+
public final class misk/aws2/sqs/jobqueue/SqsJobConsumer : com/google/common/util/concurrent/AbstractService, misk/jobqueue/v2/JobConsumer {
41+
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJobConsumer$Companion;
42+
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lcom/squareup/moshi/Moshi;Lmisk/aws2/sqs/jobqueue/DeadLetterQueueProvider;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Ljava/time/Clock;)V
43+
public fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;)V
44+
public final fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;III)V
45+
public fun unsubscribe (Lmisk/jobqueue/QueueName;)V
46+
}
47+
48+
public final class misk/aws2/sqs/jobqueue/SqsJobConsumer$Companion {
49+
public final fun getLogger ()Lmu/KLogger;
50+
}
51+
52+
public final class misk/aws2/sqs/jobqueue/SqsJobEnqueuer : misk/jobqueue/v2/JobEnqueuer {
53+
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;)V
54+
public fun enqueue (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
55+
public fun enqueueAsync (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture;
56+
public fun enqueueBlocking (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)V
57+
}
58+
59+
public final class misk/aws2/sqs/jobqueue/SqsJobHandlerModule : misk/inject/KAbstractModule {
60+
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule$Companion;
61+
public synthetic fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
62+
}
63+
64+
public final class misk/aws2/sqs/jobqueue/SqsJobHandlerModule$Companion {
65+
public final fun create (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;III)Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule;
66+
public static synthetic fun create$default (Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule$Companion;Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;IIIILjava/lang/Object;)Lmisk/aws2/sqs/jobqueue/SqsJobHandlerModule;
67+
}
68+
69+
public class misk/aws2/sqs/jobqueue/SqsJobQueueModule : misk/inject/KAbstractModule {
70+
public fun <init> ()V
71+
public fun <init> (Lkotlin/jvm/functions/Function1;)V
72+
public synthetic fun <init> (Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
73+
protected fun configure ()V
74+
public final fun sqsAsyncClient (Lsoftware/amazon/awssdk/auth/credentials/AwsCredentialsProvider;Lmisk/cloud/aws/AwsRegion;)Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
75+
}
76+
77+
public final class misk/aws2/sqs/jobqueue/SqsMetrics {
78+
public final fun getChannelReceiveLag ()Lio/prometheus/client/Histogram;
79+
public final fun getHandlerDispatchTime ()Lio/prometheus/client/Histogram;
80+
public final fun getHandlerFailures ()Lio/prometheus/client/Counter;
81+
public final fun getJobEnqueueFailures ()Lio/prometheus/client/Counter;
82+
public final fun getJobsAcknowledged ()Lio/prometheus/client/Counter;
83+
public final fun getJobsDeadLettered ()Lio/prometheus/client/Counter;
84+
public final fun getJobsEnqueued ()Lio/prometheus/client/Counter;
85+
public final fun getJobsReceived ()Lio/prometheus/client/Counter;
86+
public final fun getQueueProcessingLag ()Lio/prometheus/client/Histogram;
87+
public final fun getSqsApproxNumberOfMessages ()Lio/prometheus/client/Gauge;
88+
public final fun getSqsApproxNumberOfMessagesNotVisible ()Lio/prometheus/client/Gauge;
89+
public final fun getSqsDeleteTime ()Lio/prometheus/client/Histogram;
90+
public final fun getSqsReceiveTime ()Lio/prometheus/client/Histogram;
91+
public final fun getSqsSendTime ()Lio/prometheus/client/Histogram;
92+
}
93+
94+
public final class misk/aws2/sqs/jobqueue/StaticDeadLetterQueueProvider : misk/aws2/sqs/jobqueue/DeadLetterQueueProvider {
95+
public fun <init> (Ljava/lang/String;)V
96+
public fun deadLetterQueueFor (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;
97+
}
98+
99+
public final class misk/aws2/sqs/jobqueue/Subscriber {
100+
public static final field Companion Lmisk/aws2/sqs/jobqueue/Subscriber$Companion;
101+
public fun <init> (Lmisk/jobqueue/QueueName;Ljava/lang/String;Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/jobqueue/v2/JobHandler;Lkotlinx/coroutines/channels/Channel;Ljava/lang/String;Lmisk/jobqueue/QueueName;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lcom/squareup/moshi/Moshi;Ljava/time/Clock;)V
102+
public final fun getChannel ()Lkotlinx/coroutines/channels/Channel;
103+
public final fun getClient ()Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
104+
public final fun getClock ()Ljava/time/Clock;
105+
public final fun getDeadLetterQueueName ()Lmisk/jobqueue/QueueName;
106+
public final fun getHandler ()Lmisk/jobqueue/v2/JobHandler;
107+
public final fun getMoshi ()Lcom/squareup/moshi/Moshi;
108+
public final fun getQueueName ()Lmisk/jobqueue/QueueName;
109+
public final fun getQueueResolver ()Lmisk/aws2/sqs/jobqueue/QueueResolver;
110+
public final fun getQueueUrl ()Ljava/lang/String;
111+
public final fun getRetryQueueUrl ()Ljava/lang/String;
112+
public final fun getSqsMetrics ()Lmisk/aws2/sqs/jobqueue/SqsMetrics;
113+
public final fun poll (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
114+
public final fun run (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
115+
}
116+
117+
public final class misk/aws2/sqs/jobqueue/Subscriber$Companion {
118+
}
119+
120+
public final class misk/aws2/sqs/jobqueue/Subscription {
121+
public fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;)V
122+
public fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;I)V
123+
public fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;II)V
124+
public fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;III)V
125+
public synthetic fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;IIIILkotlin/jvm/internal/DefaultConstructorMarker;)V
126+
public final fun component1 ()Lmisk/jobqueue/QueueName;
127+
public final fun component2 ()Lkotlin/reflect/KClass;
128+
public final fun component3 ()I
129+
public final fun component4 ()I
130+
public final fun component5 ()I
131+
public final fun copy (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;III)Lmisk/aws2/sqs/jobqueue/Subscription;
132+
public static synthetic fun copy$default (Lmisk/aws2/sqs/jobqueue/Subscription;Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;IIIILjava/lang/Object;)Lmisk/aws2/sqs/jobqueue/Subscription;
133+
public fun equals (Ljava/lang/Object;)Z
134+
public final fun getChannelCapacity ()I
135+
public final fun getConcurrency ()I
136+
public final fun getHandler ()Lkotlin/reflect/KClass;
137+
public final fun getParallelism ()I
138+
public final fun getQueueName ()Lmisk/jobqueue/QueueName;
139+
public fun hashCode ()I
140+
public fun toString ()Ljava/lang/String;
141+
}
142+
143+
public final class misk/aws2/sqs/jobqueue/SubscriptionService : com/google/common/util/concurrent/AbstractIdleService {
144+
public fun <init> (Lmisk/aws2/sqs/jobqueue/SqsJobConsumer;Ljava/util/Map;Ljava/util/Map;)V
145+
}
146+

misk-aws2-sqs/build.gradle.kts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import com.vanniktech.maven.publish.JavadocJar.Dokka
2+
import com.vanniktech.maven.publish.KotlinJvm
3+
4+
plugins {
5+
id("org.jetbrains.kotlin.jvm")
6+
id("com.vanniktech.maven.publish.base")
7+
id("java-test-fixtures")
8+
}
9+
10+
dependencies {
11+
api(libs.aws2Auth)
12+
implementation(libs.aws2Regions)
13+
api(libs.aws2Sqs)
14+
api(libs.guava)
15+
api(libs.guice)
16+
api(libs.kotlinxCoroutinesCore)
17+
api(libs.jakartaInject)
18+
api(libs.moshiCore)
19+
api(libs.prometheusClient)
20+
implementation(libs.aws2Core)
21+
implementation(project(":misk"))
22+
implementation(project(":misk-api"))
23+
api(project(":misk-aws"))
24+
api(project(":misk-inject"))
25+
api(project(":misk-jobqueue"))
26+
implementation(libs.loggingApi)
27+
runtimeOnly(libs.openTracingDatadog)
28+
implementation(project(":wisp:wisp-logging"))
29+
implementation(project(":misk-metrics"))
30+
implementation(project(":misk-service"))
31+
testImplementation(libs.assertj)
32+
testImplementation(libs.awaitility)
33+
testImplementation(libs.dockerApi)
34+
testImplementation(libs.junitApi)
35+
testImplementation(libs.junitParams)
36+
testImplementation(libs.kotlinTest)
37+
testImplementation(libs.kotlinxCoroutinesTest)
38+
testImplementation(libs.mockitoCore)
39+
testImplementation(libs.mockitoKotlin)
40+
testImplementation(project(":wisp:wisp-containers-testing"))
41+
testImplementation(project(":wisp:wisp-feature-testing"))
42+
testImplementation(project(":wisp:wisp-time-testing"))
43+
testImplementation(project(":wisp:wisp-logging-testing"))
44+
testImplementation(project(":misk-clustering"))
45+
testImplementation(project(":misk-testing"))
46+
}
47+
48+
mavenPublishing {
49+
configure(
50+
KotlinJvm(javadocJar = Dokka("dokkaGfm"))
51+
)
52+
}

misk-aws2-sqs/concurrency.jpg

50.5 KB
Loading
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package misk.aws2.sqs.jobqueue
2+
3+
import com.google.inject.ImplementedBy
4+
import jakarta.inject.Inject
5+
import jakarta.inject.Singleton
6+
import misk.jobqueue.QueueName
7+
8+
/**
9+
* Interface for a provider of dead-letter queue names.
10+
*
11+
* Returns the name of the appropriate dead-letter [QueueName] for a given [QueueName].
12+
*/
13+
@ImplementedBy(DefaultDeadLetterQueueProvider::class)
14+
interface DeadLetterQueueProvider {
15+
fun deadLetterQueueFor(queue: QueueName): QueueName
16+
}
17+
18+
/** Default provider of dead-letter [QueueName]. Returns the name of the main queue suffixed with "_dlq". */
19+
@Singleton
20+
class DefaultDeadLetterQueueProvider @Inject constructor() : DeadLetterQueueProvider {
21+
override fun deadLetterQueueFor(queue: QueueName): QueueName = queue.deadLetterQueue
22+
}
23+
24+
/**
25+
* Provider of dead-letter [QueueName] that always returns the same value, no matter the supplied queue.
26+
*
27+
* For apps with queues that share a single dead-letter queue.
28+
*/
29+
class StaticDeadLetterQueueProvider(queue: String) : DeadLetterQueueProvider {
30+
private val dlq = QueueName(queue)
31+
override fun deadLetterQueueFor(queue: QueueName): QueueName = dlq
32+
}
33+
34+
internal const val deadLetterQueueSuffix = "_dlq"
35+
36+
internal val QueueName.isDeadLetterQueue get() = value.endsWith(deadLetterQueueSuffix)
37+
internal val QueueName.deadLetterQueue
38+
get() = if (isDeadLetterQueue) this else QueueName(parentQueue.value + deadLetterQueueSuffix)
39+
40+
internal const val retryQueueSuffix = "_retryq"
41+
val QueueName.isRetryQueue get() = value.endsWith(retryQueueSuffix)
42+
val QueueName.retryQueue
43+
get() = if (isRetryQueue) this else QueueName(parentQueue.value + retryQueueSuffix)
44+
45+
val QueueName.parentQueue
46+
get() = when {
47+
isDeadLetterQueue -> QueueName(value.removeSuffix(deadLetterQueueSuffix))
48+
isRetryQueue -> QueueName(value.removeSuffix(retryQueueSuffix))
49+
else -> this
50+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package misk.aws2.sqs.jobqueue
2+
3+
import com.google.inject.Inject
4+
import com.google.inject.Singleton
5+
import misk.jobqueue.QueueName
6+
import software.amazon.awssdk.services.sqs.SqsAsyncClient
7+
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest
8+
import java.util.concurrent.ConcurrentHashMap
9+
10+
@Singleton
11+
class QueueResolver @Inject constructor(
12+
private val client: SqsAsyncClient,
13+
) {
14+
private val queueUrlCache = ConcurrentHashMap<QueueName, String>()
15+
16+
/**
17+
* Get sqs queue URL for a given queue.
18+
*
19+
* Results are cached in-memory, call to SQS is effectively blocking.
20+
*/
21+
fun getQueueUrl(queueName: QueueName): String {
22+
return queueUrlCache.getOrPut(queueName) {
23+
val retryQueueUrlRequest = GetQueueUrlRequest.builder()
24+
.queueName(queueName.value)
25+
.build()
26+
val response = client.getQueueUrl(retryQueueUrlRequest).join()
27+
return response.queueUrl()
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)