|
| 1 | +import { IBaseComponent, IMetricsComponent } from '@well-known-components/interfaces' |
| 2 | +import { validateMetricsDeclaration } from '@well-known-components/metrics' |
| 3 | +import { AsyncQueue } from '@well-known-components/pushable-channel' |
| 4 | +import { SQS } from 'aws-sdk' |
| 5 | +import { AppComponents } from '../types' |
| 6 | + |
| 7 | +export interface TaskQueueMessage { |
| 8 | + id: string |
| 9 | +} |
| 10 | + |
| 11 | +export interface ITaskQueue<T> { |
| 12 | + // publishes a job for the queue |
| 13 | + publish(job: T): Promise<TaskQueueMessage> |
| 14 | + // awaits for a job. then calls and waits for the taskRunner argument. |
| 15 | + // the result is then returned to the wrapper function. |
| 16 | + consumeAndProcessJob<R>(taskRunner: (job: T, message: TaskQueueMessage) => Promise<R>): Promise<{ result: R | undefined }> |
| 17 | +} |
| 18 | + |
| 19 | +export const queueMetrics = validateMetricsDeclaration({ |
| 20 | + job_queue_duration_seconds: { |
| 21 | + type: IMetricsComponent.HistogramType, |
| 22 | + help: 'Duration of each job in seconds', |
| 23 | + labelNames: ["queue_name"], |
| 24 | + buckets: [1, 10, 100, 200, 300, 400, 500, 600, 700, 1000, 1200, 1600, 1800, 3600] |
| 25 | + }, |
| 26 | + job_queue_enqueue_total: { |
| 27 | + type: IMetricsComponent.CounterType, |
| 28 | + help: "Total amount of enqueued jobs", |
| 29 | + labelNames: ["queue_name"], |
| 30 | + }, |
| 31 | + job_queue_failures_total: { |
| 32 | + type: IMetricsComponent.CounterType, |
| 33 | + help: "Total amount of failed tasks", |
| 34 | + labelNames: ["queue_name"], |
| 35 | + }, |
| 36 | +}) |
| 37 | + |
| 38 | +type SNSOverSQSMessage = { |
| 39 | + Message: string |
| 40 | +} |
| 41 | + |
| 42 | + |
| 43 | +export function createMemoryQueueAdapter<T>(components: Pick<AppComponents, "logs" | 'metrics'>, options: { queueName: string }): ITaskQueue<T> & IBaseComponent { |
| 44 | + type InternalElement = { message: TaskQueueMessage, job: T } |
| 45 | + const q = new AsyncQueue<InternalElement>((action) => void 0) |
| 46 | + let lastJobId = 0 |
| 47 | + |
| 48 | + const logger = components.logs.getLogger(options.queueName) |
| 49 | + |
| 50 | + return { |
| 51 | + async stop() { |
| 52 | + q.close() |
| 53 | + }, |
| 54 | + async publish(job) { |
| 55 | + const id = 'job-' + (++lastJobId).toString() |
| 56 | + const message: TaskQueueMessage = { id } |
| 57 | + q.enqueue({ job, message }) |
| 58 | + logger.info(`Publishing job`, { id }) |
| 59 | + components.metrics.increment('job_queue_enqueue_total', { queue_name: options.queueName }) |
| 60 | + return message |
| 61 | + }, |
| 62 | + async consumeAndProcessJob(taskRunner) { |
| 63 | + const it: InternalElement = (await q.next()).value |
| 64 | + if (it) { |
| 65 | + const { end } = components.metrics.startTimer('job_queue_duration_seconds', { queue_name: options.queueName }) |
| 66 | + try { |
| 67 | + logger.info(`Processing job`, { id: it.message.id }) |
| 68 | + const result = await taskRunner(it.job, it.message) |
| 69 | + logger.info(`Processed job`, { id: it.message.id }) |
| 70 | + return { result, message: it.message } |
| 71 | + } catch (err: any) { |
| 72 | + components.metrics.increment('job_queue_failures_total', { queue_name: options.queueName }) |
| 73 | + logger.error(err, { id: it.message.id }) |
| 74 | + // q.enqueue(it) |
| 75 | + } finally { |
| 76 | + end() |
| 77 | + } |
| 78 | + } |
| 79 | + return { result: undefined } |
| 80 | + }, |
| 81 | + } |
| 82 | +} |
| 83 | + |
| 84 | + |
| 85 | +export function createSqsAdapter<T>(components: Pick<AppComponents, "logs" | 'metrics'>, options: { queueUrl: string, queueRegion?: string }): ITaskQueue<T> { |
| 86 | + const logger = components.logs.getLogger(options.queueUrl) |
| 87 | + |
| 88 | + const sqs = new SQS({ apiVersion: 'latest', region: options.queueRegion }) |
| 89 | + |
| 90 | + return { |
| 91 | + async publish(job) { |
| 92 | + const snsOverSqs: SNSOverSQSMessage = { |
| 93 | + Message: JSON.stringify(job) |
| 94 | + } |
| 95 | + |
| 96 | + const published = await sqs.sendMessage( |
| 97 | + { |
| 98 | + QueueUrl: options.queueUrl, |
| 99 | + MessageBody: JSON.stringify(snsOverSqs), |
| 100 | + }).promise() |
| 101 | + |
| 102 | + const m: TaskQueueMessage = { id: published.MessageId! } |
| 103 | + |
| 104 | + logger.info(`Publishing job`, m as any) |
| 105 | + |
| 106 | + components.metrics.increment('job_queue_enqueue_total', { queue_name: options.queueUrl }) |
| 107 | + return m |
| 108 | + }, |
| 109 | + async consumeAndProcessJob(taskRunner) { |
| 110 | + while (true) { |
| 111 | + const params: AWS.SQS.ReceiveMessageRequest = { |
| 112 | + AttributeNames: ['SentTimestamp'], |
| 113 | + MaxNumberOfMessages: 1, |
| 114 | + MessageAttributeNames: ['All'], |
| 115 | + QueueUrl: options.queueUrl, |
| 116 | + WaitTimeSeconds: 15, |
| 117 | + VisibilityTimeout: 3 * 3600 // 3 hours |
| 118 | + } |
| 119 | + |
| 120 | + try { |
| 121 | + const response = await Promise.race([ |
| 122 | + sqs.receiveMessage(params).promise(), |
| 123 | + timeout(30 * 60 * 1000, 'Timed out sqs.receiveMessage') |
| 124 | + ]) |
| 125 | + |
| 126 | + if (response.Messages && response.Messages.length > 0) { |
| 127 | + for (const it of response.Messages) { |
| 128 | + const message: TaskQueueMessage = { id: it.MessageId! } |
| 129 | + const { end } = components.metrics.startTimer('job_queue_duration_seconds', { queue_name: options.queueUrl }) |
| 130 | + try { |
| 131 | + const snsOverSqs: SNSOverSQSMessage = JSON.parse(it.Body!) |
| 132 | + logger.info(`Processing job`, { id: message.id, message: snsOverSqs.Message }) |
| 133 | + const result = await taskRunner(JSON.parse(snsOverSqs.Message), message) |
| 134 | + logger.info(`Processed job`, { id: message.id }) |
| 135 | + return { result, message } |
| 136 | + } catch (err: any) { |
| 137 | + logger.error(err) |
| 138 | + |
| 139 | + components.metrics.increment('job_queue_failures_total', { queue_name: options.queueUrl }) |
| 140 | + |
| 141 | + return { result: undefined, message } |
| 142 | + } finally { |
| 143 | + await sqs.deleteMessage({ QueueUrl: options.queueUrl, ReceiptHandle: it.ReceiptHandle! }).promise() |
| 144 | + end() |
| 145 | + } |
| 146 | + } |
| 147 | + } |
| 148 | + logger.info(`No new messages in queue. Retrying for 15 seconds`) |
| 149 | + } catch (err: any) { |
| 150 | + logger.error(err) |
| 151 | + await sleep(1000) |
| 152 | + } |
| 153 | + } |
| 154 | + }, |
| 155 | + } |
| 156 | +} |
| 157 | + |
| 158 | +export async function sleep(ms: number) { |
| 159 | + return new Promise<void>((ok) => setTimeout(ok, ms)) |
| 160 | +} |
| 161 | + |
| 162 | +export async function timeout(ms: number, message: string) { |
| 163 | + return new Promise<never>((_, reject) => setTimeout(() => reject(new Error(message)), ms)) |
| 164 | +} |
0 commit comments