Skip to content

Commit 0a411b2

Browse files
Improve first processing lag metrics
GitOrigin-RevId: 672b4b5ad56b38b0d239aecd59f3adb300db0803
1 parent 976c894 commit 0a411b2

File tree

3 files changed

+16
-31
lines changed

3 files changed

+16
-31
lines changed

misk-aws2-sqs/api/misk-aws2-sqs.api

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ public final class misk/aws2/sqs/jobqueue/SqsMetrics {
8484
public final fun getJobsDeadLettered ()Lio/prometheus/client/Counter;
8585
public final fun getJobsEnqueued ()Lio/prometheus/client/Counter;
8686
public final fun getJobsReceived ()Lio/prometheus/client/Counter;
87+
public final fun getQueueFirstProcessingLag ()Lio/prometheus/client/Histogram;
8788
public final fun getQueueProcessingLag ()Lio/prometheus/client/Histogram;
88-
public final fun getSqsApproxNumberOfMessages ()Lio/prometheus/client/Gauge;
89-
public final fun getSqsApproxNumberOfMessagesNotVisible ()Lio/prometheus/client/Gauge;
9089
public final fun getSqsDeleteTime ()Lio/prometheus/client/Histogram;
9190
public final fun getSqsReceiveTime ()Lio/prometheus/client/Histogram;
9291
public final fun getSqsSendTime ()Lio/prometheus/client/Histogram;
@@ -99,7 +98,6 @@ public final class misk/aws2/sqs/jobqueue/StaticDeadLetterQueueProvider : misk/a
9998
}
10099

101100
public final class misk/aws2/sqs/jobqueue/Subscriber {
102-
public static final field Companion Lmisk/aws2/sqs/jobqueue/Subscriber$Companion;
103101
public fun <init> (Lmisk/jobqueue/QueueName;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lkotlinx/coroutines/channels/Channel;Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Ljava/time/Clock;Lio/opentracing/Tracer;Lmisk/aws2/sqs/jobqueue/VisibilityTimeoutCalculator;)V
104102
public final fun getChannel ()Lkotlinx/coroutines/channels/Channel;
105103
public final fun getClient ()Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
@@ -117,9 +115,6 @@ public final class misk/aws2/sqs/jobqueue/Subscriber {
117115
public final fun run (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
118116
}
119117

120-
public final class misk/aws2/sqs/jobqueue/Subscriber$Companion {
121-
}
122-
123118
public final class misk/aws2/sqs/jobqueue/SubscriptionService : com/google/common/util/concurrent/AbstractIdleService {
124119
public static final field Companion Lmisk/aws2/sqs/jobqueue/SubscriptionService$Companion;
125120
public fun <init> (Lmisk/aws2/sqs/jobqueue/SqsJobConsumer;Ljava/util/Map;Lmisk/aws2/sqs/jobqueue/config/SqsConfig;)V

misk-aws2-sqs/src/main/kotlin/misk/aws2/sqs/jobqueue/SqsMetrics.kt

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ class SqsMetrics @Inject internal constructor(metrics: Metrics) {
7878
listOf("QueueName")
7979
)
8080

81+
val queueFirstProcessingLag = metrics.histogram(
82+
"jobs_sqs_first_processing_lag_v2",
83+
"time it took to receive a job from when it was enqueued",
84+
listOf("QueueName")
85+
)
86+
8187
val queueProcessingLag = metrics.histogram(
8288
"jobs_sqs_processing_lag_v2",
8389
"time it took to receive a job from when it was enqueued",
@@ -89,20 +95,4 @@ class SqsMetrics @Inject internal constructor(metrics: Metrics) {
8995
"time a job spent it the channel between receiver and handler",
9096
listOf("QueueName")
9197
)
92-
93-
val sqsApproxNumberOfMessages = metrics.gauge(
94-
"ApproximateNumberOfMessagesVisible",
95-
"the approximate number of messages available for retrieval from SQS",
96-
// `namespace` and `stat` is to emulate the CloudWatch metrics.
97-
listOf("namespace", "stat", "QueueName")
98-
)
99-
100-
val sqsApproxNumberOfMessagesNotVisible = metrics.gauge(
101-
"ApproximateNumberOfMessagesNotVisible",
102-
"the approximate number of messages that are in flight. Messages are considered to " +
103-
"be in flight if they have been sent to a client but have not yet been deleted or have " +
104-
"not yet reached the end of their visibility window.",
105-
// `namespace` and `stat` is to emulate the CloudWatch metrics.
106-
listOf("namespace", "stat", "QueueName")
107-
)
10898
}

misk-aws2-sqs/src/main/kotlin/misk/aws2/sqs/jobqueue/Subscriber.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,14 @@ class Subscriber(
166166

167167
sqsMetrics.jobsReceived.labels(queueName.value).inc(response.messages().size.toDouble())
168168
response.messages().forEach { message ->
169-
message.messageAttributes()[SQS_ATTRIBUTE_SENT_TIMESTAMP]?.let {
170-
val sentTimestamp = it.stringValue().toLong()
171-
val processingLag = clock.instant().minusMillis(sentTimestamp).toEpochMilli()
172-
sqsMetrics.queueProcessingLag.labels(queueName.value).observe(processingLag.toDouble())
169+
message.attributes()[MessageSystemAttributeName.SENT_TIMESTAMP]?.let {
170+
val sentTimestamp = it.toLong()
171+
val processingLag = clock.instant().minusMillis(sentTimestamp).toEpochMilli().toDouble()
172+
val receiveCounter = message.attributes()[MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT]?.toInt()
173+
if (receiveCounter == 1) {
174+
sqsMetrics.queueFirstProcessingLag.labels(queueName.value).observe(processingLag)
175+
}
176+
sqsMetrics.queueProcessingLag.labels(queueName.value).observe(processingLag)
173177
}
174178
val publishToChannelTimestamp = clock.instant()
175179
emit(
@@ -188,16 +192,12 @@ class Subscriber(
188192
private fun fetchMessages(queueUrl: String): CompletableFuture<ReceiveMessageResponse> {
189193
val request = ReceiveMessageRequest.builder()
190194
.queueUrl(queueUrl)
191-
.messageAttributeNames("All")
195+
.messageAttributeNames(MessageSystemAttributeName.ALL.name)
192196
.messageSystemAttributeNames(MessageSystemAttributeName.ALL)
193197
.maxNumberOfMessages(queueConfig.max_number_of_messages)
194198
.waitTimeSeconds(queueConfig.wait_timeout)
195199
.visibilityTimeout(queueConfig.visibility_timeout)
196200
.build()
197201
return client.receiveMessage(request)
198202
}
199-
200-
companion object {
201-
private const val SQS_ATTRIBUTE_SENT_TIMESTAMP = "SentTimestamp"
202-
}
203203
}

0 commit comments

Comments
 (0)