Skip to content

Commit 535f7f0

Browse files
committed
priority report
1 parent d80c342 commit 535f7f0

File tree

3 files changed

+19
-7
lines changed

3 files changed

+19
-7
lines changed

src/adapters/monitoring-reporter.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export interface HeartbeatData {
1414
currentStep?: string
1515
progressPercent?: number
1616
startedAt?: string
17+
isPriority?: boolean
1718
}
1819

1920
export interface JobCompleteData {
@@ -23,6 +24,7 @@ export interface JobCompleteData {
2324
completedAt: string
2425
durationMs: number
2526
errorMessage?: string
27+
isPriority?: boolean
2628
}
2729

2830
interface MonitoringReporterComponents {

src/adapters/sqs.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { AppComponents } from '../types'
66

77
export interface TaskQueueMessage {
88
id: string
9+
isPriority: boolean
910
}
1011

1112
export interface ITaskQueue<T> {
@@ -52,9 +53,9 @@ export function createMemoryQueueAdapter<T>(
5253
async stop() {
5354
q.close()
5455
},
55-
async publish(job) {
56+
async publish(job, prioritize?: boolean) {
5657
const id = 'job-' + (++lastJobId).toString()
57-
const message: TaskQueueMessage = { id }
58+
const message: TaskQueueMessage = { id, isPriority: !!prioritize }
5859
q.enqueue({ job, message })
5960
logger.info(`Publishing job`, { id })
6061
components.metrics.increment('job_queue_enqueue_total', { queue_name: options.queueName })
@@ -139,7 +140,7 @@ export function createSqsAdapter<T>(
139140

140141
const published = await sqs.send(command)
141142

142-
const m: TaskQueueMessage = { id: published.MessageId! }
143+
const m: TaskQueueMessage = { id: published.MessageId!, isPriority: !!prioritize }
143144

144145
logger.info(`Publishing job ${JSON.stringify(m)}`)
145146

@@ -154,7 +155,8 @@ export function createSqsAdapter<T>(
154155

155156
if (response && response.Messages && response.Messages.length > 0) {
156157
for (const it of response.Messages) {
157-
const message: TaskQueueMessage = { id: it.MessageId! }
158+
const isPriority = options.priorityQueueUrl ? queueUsed === options.priorityQueueUrl : false
159+
const message: TaskQueueMessage = { id: it.MessageId!, isPriority }
158160
const { end } = components.metrics.startTimer('job_queue_duration_seconds', {
159161
queue_name: options.queueUrl
160162
})

src/service.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,20 @@ export async function main(program: Lifecycle.EntryPointParameters<AppComponents
5252
const sceneId = job.entity.entityId
5353
const startedAt = new Date().toISOString()
5454
const startTime = Date.now()
55+
const isPriority = message.isPriority
56+
57+
if (isPriority) {
58+
logger.info('Processing PRIORITY job', { sceneId })
59+
}
5560

5661
// Report job start
5762
components.monitoringReporter.reportHeartbeat({
5863
status: 'processing',
5964
currentSceneId: sceneId,
6065
currentStep: 'Starting',
6166
progressPercent: 0,
62-
startedAt
67+
startedAt,
68+
isPriority
6369
})
6470

6571
try {
@@ -87,7 +93,8 @@ export async function main(program: Lifecycle.EntryPointParameters<AppComponents
8793
status: 'success',
8894
startedAt,
8995
completedAt: new Date().toISOString(),
90-
durationMs: Date.now() - startTime
96+
durationMs: Date.now() - startTime,
97+
isPriority
9198
})
9299
} catch (error) {
93100
logger.error(`Error processing job ${job.entity.entityId}`)
@@ -100,7 +107,8 @@ export async function main(program: Lifecycle.EntryPointParameters<AppComponents
100107
startedAt,
101108
completedAt: new Date().toISOString(),
102109
durationMs: Date.now() - startTime,
103-
errorMessage: error instanceof Error ? error.message : 'Unknown error'
110+
errorMessage: error instanceof Error ? error.message : 'Unknown error',
111+
isPriority
104112
})
105113
}
106114
})

0 commit comments

Comments
 (0)