@@ -9,6 +9,7 @@ import { type DequeuedMessage } from "@trigger.dev/core/v3";
99import {
1010 DockerResourceMonitor ,
1111 KubernetesResourceMonitor ,
12+ NoopResourceMonitor ,
1213 type ResourceMonitor ,
1314} from "./resourceMonitor.js" ;
1415import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js" ;
@@ -33,7 +34,7 @@ class ManagedSupervisor {
3334 private readonly metricsServer ?: HttpServer ;
3435 private readonly workloadServer : WorkloadServer ;
3536 private readonly workloadManager : WorkloadManager ;
36- private readonly logger = new SimpleStructuredLogger ( "managed-worker " ) ;
37+ private readonly logger = new SimpleStructuredLogger ( "managed-supervisor " ) ;
3738 private readonly resourceMonitor : ResourceMonitor ;
3839 private readonly checkpointClient ?: CheckpointClient ;
3940
@@ -47,11 +48,11 @@ class ManagedSupervisor {
4748 const { TRIGGER_WORKER_TOKEN , MANAGED_WORKER_SECRET , ...envWithoutSecrets } = env ;
4849
4950 if ( env . DEBUG ) {
50- console . debug ( "[ManagedSupervisor] Starting up" , { envWithoutSecrets } ) ;
51+ this . logger . debug ( "Starting up" , { envWithoutSecrets } ) ;
5152 }
5253
5354 if ( this . warmStartUrl ) {
54- this . logger . log ( "[ManagedWorker] 🔥 Warm starts enabled" , {
55+ this . logger . log ( "🔥 Warm starts enabled" , {
5556 warmStartUrl : this . warmStartUrl ,
5657 } ) ;
5758 }
@@ -69,9 +70,19 @@ class ManagedSupervisor {
6970 dockerAutoremove : env . RUNNER_DOCKER_AUTOREMOVE ,
7071 } satisfies WorkloadManagerOptions ;
7172
73+ this . resourceMonitor = env . RESOURCE_MONITOR_ENABLED
74+ ? this . isKubernetes
75+ ? new KubernetesResourceMonitor ( createK8sApi ( ) , env . TRIGGER_WORKER_INSTANCE_NAME )
76+ : new DockerResourceMonitor ( new Docker ( ) )
77+ : new NoopResourceMonitor ( ) ;
78+
79+ this . workloadManager = this . isKubernetes
80+ ? new KubernetesWorkloadManager ( workloadManagerOptions )
81+ : new DockerWorkloadManager ( workloadManagerOptions ) ;
82+
7283 if ( this . isKubernetes ) {
7384 if ( env . POD_CLEANER_ENABLED ) {
74- this . logger . log ( "[ManagedWorker] 🧹 Pod cleaner enabled" , {
85+ this . logger . log ( "🧹 Pod cleaner enabled" , {
7586 namespace : env . KUBERNETES_NAMESPACE ,
7687 batchSize : env . POD_CLEANER_BATCH_SIZE ,
7788 intervalMs : env . POD_CLEANER_INTERVAL_MS ,
@@ -83,11 +94,11 @@ class ManagedSupervisor {
8394 intervalMs : env . POD_CLEANER_INTERVAL_MS ,
8495 } ) ;
8596 } else {
86- this . logger . warn ( "[ManagedWorker] Pod cleaner disabled" ) ;
97+ this . logger . warn ( "Pod cleaner disabled" ) ;
8798 }
8899
89100 if ( env . FAILED_POD_HANDLER_ENABLED ) {
90- this . logger . log ( "[ManagedWorker] 🔁 Failed pod handler enabled" , {
101+ this . logger . log ( "🔁 Failed pod handler enabled" , {
91102 namespace : env . KUBERNETES_NAMESPACE ,
92103 reconnectIntervalMs : env . FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS ,
93104 } ) ;
@@ -97,17 +108,14 @@ class ManagedSupervisor {
97108 reconnectIntervalMs : env . FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS ,
98109 } ) ;
99110 } else {
100- this . logger . warn ( "[ManagedWorker] Failed pod handler disabled" ) ;
111+ this . logger . warn ( "Failed pod handler disabled" ) ;
101112 }
113+ }
102114
103- this . resourceMonitor = new KubernetesResourceMonitor (
104- createK8sApi ( ) ,
105- env . TRIGGER_WORKER_INSTANCE_NAME
115+ if ( env . TRIGGER_DEQUEUE_INTERVAL_MS > env . TRIGGER_DEQUEUE_IDLE_INTERVAL_MS ) {
116+ this . logger . warn (
117+ `⚠️ TRIGGER_DEQUEUE_INTERVAL_MS ( ${ env . TRIGGER_DEQUEUE_INTERVAL_MS } ) is greater than TRIGGER_DEQUEUE_IDLE_INTERVAL_MS ( ${ env . TRIGGER_DEQUEUE_IDLE_INTERVAL_MS } ) - did you mix them up?`
106118 ) ;
107- this . workloadManager = new KubernetesWorkloadManager ( workloadManagerOptions ) ;
108- } else {
109- this . resourceMonitor = new DockerResourceMonitor ( new Docker ( ) ) ;
110- this . workloadManager = new DockerWorkloadManager ( workloadManagerOptions ) ;
111119 }
112120
113121 this . workerSession = new SupervisorSession ( {
@@ -123,12 +131,17 @@ class ManagedSupervisor {
123131 runNotificationsEnabled : env . TRIGGER_WORKLOAD_API_ENABLED ,
124132 heartbeatIntervalSeconds : env . TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS ,
125133 preDequeue : async ( ) => {
134+ if ( ! env . RESOURCE_MONITOR_ENABLED ) {
135+ return { } ;
136+ }
137+
126138 if ( this . isKubernetes ) {
127139 // Not used in k8s for now
128140 return { } ;
129141 }
130142
131143 const resources = await this . resourceMonitor . getNodeResources ( ) ;
144+
132145 return {
133146 maxResources : {
134147 cpu : resources . cpuAvailable ,
@@ -144,7 +157,7 @@ class ManagedSupervisor {
144157 } ) ;
145158
146159 if ( env . TRIGGER_CHECKPOINT_URL ) {
147- this . logger . log ( "[ManagedWorker] 🥶 Checkpoints enabled" , {
160+ this . logger . log ( "🥶 Checkpoints enabled" , {
148161 checkpointUrl : env . TRIGGER_CHECKPOINT_URL ,
149162 } ) ;
150163
@@ -155,43 +168,34 @@ class ManagedSupervisor {
155168 } ) ;
156169 }
157170
158- // setInterval(async () => {
159- // const resources = await this.resourceMonitor.getNodeResources(true);
160- // this.logger.debug("[ManagedWorker] Current resources", { resources });
161- // }, 1000);
162-
163171 this . workerSession . on ( "runNotification" , async ( { time, run } ) => {
164- this . logger . log ( "[ManagedWorker] runNotification" , { time, run } ) ;
172+ this . logger . log ( "runNotification" , { time, run } ) ;
165173
166174 this . workloadServer . notifyRun ( { run } ) ;
167175 } ) ;
168176
169177 this . workerSession . on ( "runQueueMessage" , async ( { time, message } ) => {
170- this . logger . log (
171- `[ManagedWorker] Received message with timestamp ${ time . toLocaleString ( ) } ` ,
172- message
173- ) ;
178+ this . logger . log ( `Received message with timestamp ${ time . toLocaleString ( ) } ` , message ) ;
174179
175180 if ( message . completedWaitpoints . length > 0 ) {
176- this . logger . debug ( "[ManagedWorker] Run has completed waitpoints" , {
181+ this . logger . debug ( "Run has completed waitpoints" , {
177182 runId : message . run . id ,
178183 completedWaitpoints : message . completedWaitpoints . length ,
179184 } ) ;
180- // TODO: Do something with them or if we don't need the data here, maybe we shouldn't even send it
181185 }
182186
183187 if ( ! message . image ) {
184- this . logger . error ( "[ManagedWorker] Run has no image" , { runId : message . run . id } ) ;
188+ this . logger . error ( "Run has no image" , { runId : message . run . id } ) ;
185189 return ;
186190 }
187191
188192 const { checkpoint, ...rest } = message ;
189193
190194 if ( checkpoint ) {
191- this . logger . log ( "[ManagedWorker] Restoring run" , { runId : message . run . id } ) ;
195+ this . logger . log ( "Restoring run" , { runId : message . run . id } ) ;
192196
193197 if ( ! this . checkpointClient ) {
194- this . logger . error ( "[ManagedWorker] No checkpoint client" , { runId : message . run . id } ) ;
198+ this . logger . error ( "No checkpoint client" , { runId : message . run . id } ) ;
195199 return ;
196200 }
197201
@@ -206,23 +210,23 @@ class ManagedSupervisor {
206210 } ) ;
207211
208212 if ( didRestore ) {
209- this . logger . log ( "[ManagedWorker] Restore successful" , { runId : message . run . id } ) ;
213+ this . logger . log ( "Restore successful" , { runId : message . run . id } ) ;
210214 } else {
211- this . logger . error ( "[ManagedWorker] Restore failed" , { runId : message . run . id } ) ;
215+ this . logger . error ( "Restore failed" , { runId : message . run . id } ) ;
212216 }
213217 } catch ( error ) {
214- this . logger . error ( "[ManagedWorker] Failed to restore run" , { error } ) ;
218+ this . logger . error ( "Failed to restore run" , { error } ) ;
215219 }
216220
217221 return ;
218222 }
219223
220- this . logger . log ( "[ManagedWorker] Scheduling run" , { runId : message . run . id } ) ;
224+ this . logger . log ( "Scheduling run" , { runId : message . run . id } ) ;
221225
222226 const didWarmStart = await this . tryWarmStart ( message ) ;
223227
224228 if ( didWarmStart ) {
225- this . logger . log ( "[ManagedWorker] Warm start successful" , { runId : message . run . id } ) ;
229+ this . logger . log ( "Warm start successful" , { runId : message . run . id } ) ;
226230 return ;
227231 }
228232
@@ -249,7 +253,7 @@ class ManagedSupervisor {
249253 // memory: message.run.machine.memory,
250254 // });
251255 } catch ( error ) {
252- this . logger . error ( "[ManagedWorker] Failed to create workload" , { error } ) ;
256+ this . logger . error ( "Failed to create workload" , { error } ) ;
253257 }
254258 } ) ;
255259
@@ -277,12 +281,12 @@ class ManagedSupervisor {
277281 }
278282
279283 async onRunConnected ( { run } : { run : { friendlyId : string } } ) {
280- this . logger . debug ( "[ManagedWorker] Run connected" , { run } ) ;
284+ this . logger . debug ( "Run connected" , { run } ) ;
281285 this . workerSession . subscribeToRunNotifications ( [ run . friendlyId ] ) ;
282286 }
283287
284288 async onRunDisconnected ( { run } : { run : { friendlyId : string } } ) {
285- this . logger . debug ( "[ManagedWorker] Run disconnected" , { run } ) ;
289+ this . logger . debug ( "Run disconnected" , { run } ) ;
286290 this . workerSession . unsubscribeFromRunNotifications ( [ run . friendlyId ] ) ;
287291 }
288292
@@ -303,7 +307,7 @@ class ManagedSupervisor {
303307 } ) ;
304308
305309 if ( ! res . ok ) {
306- this . logger . error ( "[ManagedWorker] Warm start failed" , {
310+ this . logger . error ( "Warm start failed" , {
307311 runId : dequeuedMessage . run . id ,
308312 } ) ;
309313 return false ;
@@ -313,7 +317,7 @@ class ManagedSupervisor {
313317 const parsedData = z . object ( { didWarmStart : z . boolean ( ) } ) . safeParse ( data ) ;
314318
315319 if ( ! parsedData . success ) {
316- this . logger . error ( "[ManagedWorker] Warm start response invalid" , {
320+ this . logger . error ( "Warm start response invalid" , {
317321 runId : dequeuedMessage . run . id ,
318322 data,
319323 } ) ;
@@ -322,7 +326,7 @@ class ManagedSupervisor {
322326
323327 return parsedData . data . didWarmStart ;
324328 } catch ( error ) {
325- this . logger . error ( "[ManagedWorker] Warm start error" , {
329+ this . logger . error ( "Warm start error" , {
326330 runId : dequeuedMessage . run . id ,
327331 error,
328332 } ) ;
@@ -331,29 +335,29 @@ class ManagedSupervisor {
331335 }
332336
333337 async start ( ) {
334- this . logger . log ( "[ManagedWorker] Starting up" ) ;
338+ this . logger . log ( "Starting up" ) ;
335339
336340 // Optional services
337341 await this . podCleaner ?. start ( ) ;
338342 await this . failedPodHandler ?. start ( ) ;
339343 await this . metricsServer ?. start ( ) ;
340344
341345 if ( env . TRIGGER_WORKLOAD_API_ENABLED ) {
342- this . logger . log ( "[ManagedWorker] Workload API enabled" , {
346+ this . logger . log ( "Workload API enabled" , {
343347 protocol : env . TRIGGER_WORKLOAD_API_PROTOCOL ,
344348 domain : env . TRIGGER_WORKLOAD_API_DOMAIN ,
345349 port : env . TRIGGER_WORKLOAD_API_PORT_INTERNAL ,
346350 } ) ;
347351 await this . workloadServer . start ( ) ;
348352 } else {
349- this . logger . warn ( "[ManagedWorker] Workload API disabled" ) ;
353+ this . logger . warn ( "Workload API disabled" ) ;
350354 }
351355
352356 await this . workerSession . start ( ) ;
353357 }
354358
355359 async stop ( ) {
356- this . logger . log ( "[ManagedWorker] Shutting down" ) ;
360+ this . logger . log ( "Shutting down" ) ;
357361 await this . workerSession . stop ( ) ;
358362
359363 // Optional services
0 commit comments