@@ -13,36 +13,40 @@ export interface ITaskQueue<T> {
1313 publish ( job : T ) : Promise < TaskQueueMessage >
1414 // awaits for a job. then calls and waits for the taskRunner argument.
1515 // the result is then returned to the wrapper function.
16- consumeAndProcessJob < R > ( taskRunner : ( job : T , message : TaskQueueMessage ) => Promise < R > ) : Promise < { result : R | undefined } >
16+ consumeAndProcessJob < R > (
17+ taskRunner : ( job : T , message : TaskQueueMessage ) => Promise < R >
18+ ) : Promise < { result : R | undefined } >
1719}
1820
1921export const queueMetrics = validateMetricsDeclaration ( {
2022 job_queue_duration_seconds : {
2123 type : IMetricsComponent . HistogramType ,
2224 help : 'Duration of each job in seconds' ,
23- labelNames : [ " queue_name" ] ,
25+ labelNames : [ ' queue_name' ] ,
2426 buckets : [ 1 , 10 , 100 , 200 , 300 , 400 , 500 , 600 , 700 , 1000 , 1200 , 1600 , 1800 , 3600 ]
2527 } ,
2628 job_queue_enqueue_total : {
2729 type : IMetricsComponent . CounterType ,
28- help : " Total amount of enqueued jobs" ,
29- labelNames : [ " queue_name" ] ,
30+ help : ' Total amount of enqueued jobs' ,
31+ labelNames : [ ' queue_name' ]
3032 } ,
3133 job_queue_failures_total : {
3234 type : IMetricsComponent . CounterType ,
33- help : " Total amount of failed tasks" ,
34- labelNames : [ " queue_name" ] ,
35- } ,
35+ help : ' Total amount of failed tasks' ,
36+ labelNames : [ ' queue_name' ]
37+ }
3638} )
3739
3840type SNSOverSQSMessage = {
3941 Message : string
4042}
4143
42-
43- export function createMemoryQueueAdapter < T > ( components : Pick < AppComponents , "logs" | 'metrics' > , options : { queueName : string } ) : ITaskQueue < T > & IBaseComponent {
44- type InternalElement = { message : TaskQueueMessage , job : T }
45- const q = new AsyncQueue < InternalElement > ( ( action ) => void 0 )
44+ export function createMemoryQueueAdapter < T > (
45+ components : Pick < AppComponents , 'logs' | 'metrics' > ,
46+ options : { queueName : string }
47+ ) : ITaskQueue < T > & IBaseComponent {
48+ type InternalElement = { message : TaskQueueMessage ; job : T }
49+ const q = new AsyncQueue < InternalElement > ( ( _action ) => void 0 )
4650 let lastJobId = 0
4751
4852 const logger = components . logs . getLogger ( options . queueName )
@@ -77,12 +81,14 @@ export function createMemoryQueueAdapter<T>(components: Pick<AppComponents, "log
7781 }
7882 }
7983 return { result : undefined }
80- } ,
84+ }
8185 }
8286}
8387
84-
85- export function createSqsAdapter < T > ( components : Pick < AppComponents , "logs" | 'metrics' > , options : { queueUrl : string , queueRegion ?: string } ) : ITaskQueue < T > {
88+ export function createSqsAdapter < T > (
89+ components : Pick < AppComponents , 'logs' | 'metrics' > ,
90+ options : { queueUrl : string ; queueRegion ?: string }
91+ ) : ITaskQueue < T > {
8692 const logger = components . logs . getLogger ( options . queueUrl )
8793
8894 const sqs = new SQS ( { apiVersion : 'latest' , region : options . queueRegion } )
@@ -93,11 +99,12 @@ export function createSqsAdapter<T>(components: Pick<AppComponents, "logs" | 'me
9399 Message : JSON . stringify ( job )
94100 }
95101
96- const published = await sqs . sendMessage (
97- {
102+ const published = await sqs
103+ . sendMessage ( {
98104 QueueUrl : options . queueUrl ,
99- MessageBody : JSON . stringify ( snsOverSqs ) ,
100- } ) . promise ( )
105+ MessageBody : JSON . stringify ( snsOverSqs )
106+ } )
107+ . promise ( )
101108
102109 const m : TaskQueueMessage = { id : published . MessageId ! }
103110
@@ -126,7 +133,9 @@ export function createSqsAdapter<T>(components: Pick<AppComponents, "logs" | 'me
126133 if ( response . Messages && response . Messages . length > 0 ) {
127134 for ( const it of response . Messages ) {
128135 const message : TaskQueueMessage = { id : it . MessageId ! }
129- const { end } = components . metrics . startTimer ( 'job_queue_duration_seconds' , { queue_name : options . queueUrl } )
136+ const { end } = components . metrics . startTimer ( 'job_queue_duration_seconds' , {
137+ queue_name : options . queueUrl
138+ } )
130139 try {
131140 logger . info ( `Processing job` , { id : message . id } )
132141 const result = await taskRunner ( JSON . parse ( it . Body ! ) , message )
@@ -150,7 +159,7 @@ export function createSqsAdapter<T>(components: Pick<AppComponents, "logs" | 'me
150159 await sleep ( 1000 )
151160 }
152161 }
153- } ,
162+ }
154163 }
155164}
156165
0 commit comments