Skip to content

Commit d80c342

Browse files
committed
add monitoring
1 parent 75d1259 commit d80c342

File tree

5 files changed

+194
-2
lines changed

5 files changed

+194
-2
lines changed

.env.default

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@ PROCESS_METHOD=
88
AWS_REGION=
99
TASK_QUEUE=
1010
BUCKET=
11-
AWS_ENDPOINT=
11+
AWS_ENDPOINT=
12+
13+
# Pipeline monitoring (optional)
14+
MONITORING_URL=
15+
MONITORING_SECRET=
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import type { ILoggerComponent, IConfigComponent, IBaseComponent } from '@well-known-components/interfaces'
2+
import type { IFetchComponent } from '@well-known-components/http-server'
3+
import crypto from 'crypto'
4+
5+
export interface IMonitoringReporter extends IBaseComponent {
6+
reportHeartbeat(data: HeartbeatData): void
7+
reportJobComplete(data: JobCompleteData): void
8+
getConsumerId(): string
9+
}
10+
11+
export interface HeartbeatData {
12+
status: 'idle' | 'processing'
13+
currentSceneId?: string
14+
currentStep?: string
15+
progressPercent?: number
16+
startedAt?: string
17+
}
18+
19+
export interface JobCompleteData {
20+
sceneId: string
21+
status: 'success' | 'failed'
22+
startedAt: string
23+
completedAt: string
24+
durationMs: number
25+
errorMessage?: string
26+
}
27+
28+
interface MonitoringReporterComponents {
29+
logs: ILoggerComponent
30+
config: IConfigComponent
31+
fetch: IFetchComponent
32+
}
33+
34+
export function createMonitoringReporter(
35+
components: MonitoringReporterComponents,
36+
processMethod: string
37+
): IMonitoringReporter {
38+
const { logs, config, fetch } = components
39+
const logger = logs.getLogger('monitoring-reporter')
40+
41+
const consumerId = crypto.randomUUID()
42+
let monitoringUrl: string | undefined
43+
let monitoringSecret: string | undefined
44+
let heartbeatInterval: NodeJS.Timeout | undefined
45+
let currentHeartbeatData: HeartbeatData = { status: 'idle' }
46+
let isRunning = false
47+
48+
async function initConfig() {
49+
monitoringUrl = await config.getString('MONITORING_URL')
50+
monitoringSecret = await config.getString('MONITORING_SECRET')
51+
52+
if (!monitoringUrl || !monitoringSecret) {
53+
logger.info('Monitoring not configured (MONITORING_URL or MONITORING_SECRET missing)')
54+
} else {
55+
logger.info('Monitoring configured', { consumerId, monitoringUrl })
56+
}
57+
}
58+
59+
async function report(endpoint: string, data: object): Promise<void> {
60+
if (!monitoringUrl || !monitoringSecret) {
61+
return
62+
}
63+
64+
try {
65+
const url = `${monitoringUrl}${endpoint}`
66+
const controller = new AbortController()
67+
const timeoutId = setTimeout(() => controller.abort(), 5000)
68+
69+
await fetch.fetch(url, {
70+
method: 'POST',
71+
headers: { 'Content-Type': 'application/json' },
72+
body: JSON.stringify({ ...data, secret: monitoringSecret }),
73+
signal: controller.signal
74+
})
75+
76+
clearTimeout(timeoutId)
77+
} catch (error) {
78+
// Silently ignore - monitoring should never block pipeline
79+
logger.debug('Monitoring report failed (non-blocking)', {
80+
error: error instanceof Error ? error.message : 'Unknown error'
81+
})
82+
}
83+
}
84+
85+
function sendHeartbeat() {
86+
report('/api/monitoring/heartbeat', {
87+
consumerId,
88+
processMethod,
89+
...currentHeartbeatData
90+
})
91+
}
92+
93+
function startHeartbeat() {
94+
if (heartbeatInterval) {
95+
return
96+
}
97+
98+
// Send initial heartbeat
99+
sendHeartbeat()
100+
101+
// Set up interval (every 10 seconds)
102+
heartbeatInterval = setInterval(sendHeartbeat, 10000)
103+
}
104+
105+
function stopHeartbeat() {
106+
if (heartbeatInterval) {
107+
clearInterval(heartbeatInterval)
108+
heartbeatInterval = undefined
109+
}
110+
}
111+
112+
return {
113+
async start() {
114+
await initConfig()
115+
isRunning = true
116+
startHeartbeat()
117+
},
118+
119+
async stop() {
120+
isRunning = false
121+
stopHeartbeat()
122+
},
123+
124+
getConsumerId() {
125+
return consumerId
126+
},
127+
128+
reportHeartbeat(data: HeartbeatData) {
129+
currentHeartbeatData = data
130+
// Heartbeat will be sent on next interval, but also send immediately for status changes
131+
if (isRunning) {
132+
sendHeartbeat()
133+
}
134+
},
135+
136+
reportJobComplete(data: JobCompleteData) {
137+
report('/api/monitoring/job-complete', {
138+
consumerId,
139+
processMethod,
140+
...data
141+
})
142+
143+
// Reset heartbeat data to idle
144+
currentHeartbeatData = { status: 'idle' }
145+
}
146+
}
147+
}

src/components.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import { AppComponents, GlobalContext } from './types'
1313
import path from 'path'
1414
import { createMockSnsAdapterComponent, createSnsAdapterComponent } from './adapters/sns'
1515
import { AwsCredentialIdentity } from '@smithy/types'
16+
import { createMonitoringReporter } from './adapters/monitoring-reporter'
17+
import { getProcessMethod } from './service'
1618

1719
// Helper function to handle entityId logic
1820
async function handleEntityId(
@@ -152,6 +154,10 @@ export async function initComponents(): Promise<AppComponents> {
152154
? createSnsAdapterComponent({ logs }, { snsArn, snsEndpoint: snsEndpoint })
153155
: createMockSnsAdapterComponent({ logs })
154156

157+
// Create monitoring reporter
158+
const processMethod = await getProcessMethod(config)
159+
const monitoringReporter = createMonitoringReporter({ logs, config, fetch }, processMethod)
160+
155161
const entityIdIndex = process.argv.findIndex((p) => p === '--entityId')
156162
if (entityIdIndex !== -1) {
157163
const entityId = process.argv[entityIdIndex + 1]
@@ -173,6 +179,7 @@ export async function initComponents(): Promise<AppComponents> {
173179
runner,
174180
deploymentsByPointer: mitt(),
175181
storage,
176-
snsAdapter
182+
snsAdapter,
183+
monitoringReporter
177184
}
178185
}

src/service.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,19 @@ export async function main(program: Lifecycle.EntryPointParameters<AppComponents
4949
components.runner.runTask(async (opt) => {
5050
while (opt.isRunning) {
5151
await components.taskQueue.consumeAndProcessJob(async (job, message) => {
52+
const sceneId = job.entity.entityId
53+
const startedAt = new Date().toISOString()
54+
const startTime = Date.now()
55+
56+
// Report job start
57+
components.monitoringReporter.reportHeartbeat({
58+
status: 'processing',
59+
currentSceneId: sceneId,
60+
currentStep: 'Starting',
61+
progressPercent: 0,
62+
startedAt
63+
})
64+
5265
try {
5366
switch (processMethod) {
5467
case 'godot_optimizer':
@@ -67,9 +80,28 @@ export async function main(program: Lifecycle.EntryPointParameters<AppComponents
6780
logger.info('Consume and Process: ', { job: JSON.stringify(job), message: JSON.stringify(message) })
6881
break
6982
}
83+
84+
// Report job success
85+
components.monitoringReporter.reportJobComplete({
86+
sceneId,
87+
status: 'success',
88+
startedAt,
89+
completedAt: new Date().toISOString(),
90+
durationMs: Date.now() - startTime
91+
})
7092
} catch (error) {
7193
logger.error(`Error processing job ${job.entity.entityId}`)
7294
logger.error(error as any)
95+
96+
// Report job failure
97+
components.monitoringReporter.reportJobComplete({
98+
sceneId,
99+
status: 'failed',
100+
startedAt,
101+
completedAt: new Date().toISOString(),
102+
durationMs: Date.now() - startTime,
103+
errorMessage: error instanceof Error ? error.message : 'Unknown error'
104+
})
73105
}
74106
})
75107
}

src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { IRunnerComponent } from './adapters/runner'
1313
import { Emitter } from 'mitt'
1414
import { IStorageComponent } from './adapters/storage'
1515
import { ISNSAdapterComponent } from './adapters/sns'
16+
import { IMonitoringReporter } from './adapters/monitoring-reporter'
1617

1718
export type GlobalContext = {
1819
components: BaseComponents
@@ -30,6 +31,7 @@ export type BaseComponents = {
3031
deploymentsByPointer: Emitter<Record<string /* pointer */, DeploymentToSqs>>
3132
storage: IStorageComponent
3233
snsAdapter: ISNSAdapterComponent
34+
monitoringReporter: IMonitoringReporter
3335
}
3436

3537
// components used in runtime

0 commit comments

Comments
 (0)