Skip to content

Commit 5b8b02d

Browse files
[cuebot/pycue/proto/sandbox/docs] Add full event-driven monitoring stack, enhance metrics, dashboards, and documentation (#2086)
**Link the Issue(s) this Pull Request is related to.** - #2085 **Summarize your change.** [cuebot/pycue/proto/rust/sandbox/docs] Add event-driven monitoring stack for OpenCue Implement event-driven monitoring infrastructure enabling real-time and historical analysis of render farm activity. Adds a Kafka + Elasticsearch pipeline for collecting job, layer, frame, host, and proc lifecycle events, with Prometheus and Grafana integration for live dashboards and operational visibility. Proto & Event Model: - Define monitoring.proto with job/layer/frame/host/proc lifecycle events - Use proto composition pattern - embed Job, Layer, Frame, Host messages - Exclude HostReportEvent from pipeline (too high frequency for Kafka/ES) Cuebot Event Publishing: - Add KafkaEventPublisher for async event publishing to Kafka topics - Add KafkaAdminClient for topic creation with configurable partitions/retention - Add MonitoringEventBuilder as Spring-managed bean for event construction - Hook publishing into FrameCompleteHandler, HostReportHandler, DispatchSupportService, JobManagerSupport, DependManagerService - Publish pickup time tracking events (FRAME_STARTED, FRAME_DISPATCHED) - Add isFrameDispatchable() to DependDao for dependency checking Prometheus Metrics: - cue_frames_completed_total (with show, shot, state labels) - cue_jobs_completed_total (with show, shot, state labels) - cue_job_core_seconds histogram - cue_layer_max_runtime_seconds histogram - cue_layer_max_memory_bytes histogram Rust monitoring-indexer Service: - Add rust/crates/monitoring-indexer: standalone Kafka-to-Elasticsearch indexer - Async Kafka consumer with configurable batch processing - Elasticsearch bulk indexing with date-based indices and field mappings - Parallel event processing using rayon for CPU-bound operations - Index templates for all event types (job, layer, frame, host, proc) - Graceful handling of UnknownTopicOrPartition during startup gRPC & PyCue: - Add MonitoringInterface gRPC service - Implement pycue monitoring wrapper with historical data API methods Infrastructure (docker-compose.monitoring-full.yml): - Zookeeper, Kafka, Kafka UI - Elasticsearch, Kibana - Prometheus (with cuebot scrape config) - Grafana (with provisioned dashboard) - monitoring-indexer service Grafana Dashboard: - Frame completion rates by state (DEAD/red, SUCCEEDED/green, WAITING/yellow) - Job completion by show - Frame runtime and memory distribution - Job core seconds distribution - Pickup time metrics (FRAME_STARTED/FRAME_DISPATCHED) - Layer max runtime/memory panels Documentation: - Architecture, concepts, and pipeline explanation - Deployment and Quick Start guides - User and Developer guides - API Reference and tutorials - Elasticsearch query reference guide Utilities: - sandbox/monitor_events.py: Example Kafka consumer - sandbox/load_test_jobs.py: Test data generator with CLI args Configuration (opt-in, disabled by default): - monitoring.kafka.enabled, monitoring.kafka.bootstrap.servers - monitoring.kafka.topic.partitions, .replication.factor, .retention.ms - monitoring.elasticsearch.enabled, monitoring.elasticsearch.host --------- Signed-off-by: Ramon Figueiredo <[email protected]>
1 parent 74683bd commit 5b8b02d

File tree

148 files changed

+10345
-244
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

148 files changed

+10345
-244
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ docs/bin/
3232
sandbox/kafka-data
3333
sandbox/zookeeper-data
3434
sandbox/zookeeper-logs
35+
sandbox/rqd/shots/
3536
docs/_data/version.yml

VERSION.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.14
1+
1.15

cuebot/build.gradle

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ repositories {
2626
def grpcVersion = '1.47.0'
2727
def protobufVersion = '3.21.2'
2828
def activemqVersion = '5.12.0'
29+
def kafkaVersion = '3.4.0'
30+
def elasticsearchVersion = '8.8.0'
2931

3032
// Spring dependency versions are managed by the io.spring.dependency-management plugin.
3133
// Appropriate versions will be pulled based on the spring boot version specified in the
@@ -52,6 +54,15 @@ dependencies {
5254
implementation group: 'io.prometheus', name: 'simpleclient', version: '0.16.0'
5355
implementation group: 'io.prometheus', name: 'simpleclient_servlet', version: '0.16.0'
5456

57+
// Kafka for event publishing
58+
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
59+
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.9.0'
60+
61+
// Elasticsearch for historical data storage
62+
implementation group: 'co.elastic.clients', name: 'elasticsearch-java', version: "${elasticsearchVersion}"
63+
implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: "${elasticsearchVersion}"
64+
implementation group: 'jakarta.json', name: 'jakarta.json-api', version: '2.1.1'
65+
5566
protobuf files("../proto/src/")
5667

5768
testImplementation group: 'junit', name: 'junit', version: '4.12'
@@ -67,12 +78,14 @@ dependencies {
6778

6879
compileJava {
6980
dependsOn generateProto
70-
options.compilerArgs << "-Xlint:all" << "-Werror"
81+
// Exclude serial warning due to protobuf-generated code warnings
82+
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
7183
}
7284

7385
compileTestJava {
7486
dependsOn generateProto
75-
options.compilerArgs << "-Xlint:all" << "-Werror"
87+
// Exclude serial warning due to protobuf-generated code warnings
88+
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
7689
}
7790

7891
protobuf {

cuebot/src/main/java/com/imageworks/spcue/CuebotApplication.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import org.apache.logging.log4j.LogManager;
2525
import org.springframework.boot.SpringApplication;
2626
import org.springframework.boot.autoconfigure.SpringBootApplication;
27+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
2728

28-
@SpringBootApplication
29+
@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
2930
public class CuebotApplication extends SpringApplication {
3031
private static String[] checkArgs(String[] args) {
3132
Optional<String> deprecatedFlag = Arrays.stream(args)

cuebot/src/main/java/com/imageworks/spcue/ExecutionSummary.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ public class ExecutionSummary {
2828
public long gpuTimeSuccess;
2929
public long gpuTimeFail;
3030
public long highMemoryKb;
31+
public int highFrameSec;
32+
33+
public int getHighFrameSec() {
34+
return highFrameSec;
35+
}
36+
37+
public void setHighFrameSec(int highFrameSec) {
38+
this.highFrameSec = highFrameSec;
39+
}
3140

3241
public long getHighMemoryKb() {
3342
return highMemoryKb;

cuebot/src/main/java/com/imageworks/spcue/PrometheusMetricsCollector.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,38 @@ public class PrometheusMetricsCollector {
119119
.labelNames("env", "cuebot_host", "render_node", "job_name", "frame_name", "frame_id")
120120
.register();
121121

122+
private static final Counter frameCompletedCounter = Counter.build()
123+
.name("cue_frames_completed_total").help("Total number of frames completed")
124+
.labelNames("env", "cuebot_host", "state", "show", "shot").register();
125+
126+
private static final Counter jobCompletedCounter =
127+
Counter.build().name("cue_jobs_completed_total").help("Total number of jobs completed")
128+
.labelNames("env", "cuebot_host", "state", "show", "shot").register();
129+
130+
private static final Histogram jobCoreSecondsHistogram = Histogram.build()
131+
.name("cue_job_core_seconds").help("Histogram of total core seconds per job")
132+
.labelNames("env", "cuebot_host", "show", "shot")
133+
.buckets(3600, 36000, 360000, 3600000, 36000000).register();
134+
135+
private static final Histogram layerMaxRuntimeHistogram =
136+
Histogram.build().name("cue_layer_max_runtime_seconds")
137+
.help("Histogram of max frame runtime per layer in seconds")
138+
.labelNames("env", "cuebot_host", "show", "shot", "layer_type")
139+
.buckets(60, 300, 600, 1800, 3600, 7200, 14400, 28800, 86400).register();
140+
141+
private static final Histogram layerMaxMemoryHistogram =
142+
Histogram.build().name("cue_layer_max_memory_bytes")
143+
.help("Histogram of max frame memory usage per layer in bytes")
144+
.labelNames("env", "cuebot_host", "show", "shot", "layer_type")
145+
.buckets(256L * 1024 * 1024, 512L * 1024 * 1024, 1024L * 1024 * 1024,
146+
2048L * 1024 * 1024, 4096L * 1024 * 1024, 8192L * 1024 * 1024,
147+
16384L * 1024 * 1024, 32768L * 1024 * 1024)
148+
.register();
149+
150+
private static final Counter hostReportsReceivedCounter = Counter.build()
151+
.name("cue_host_reports_received_total").help("Total number of host reports received")
152+
.labelNames("env", "cuebot_host", "facility").register();
153+
122154
private String deployment_environment;
123155
private String cuebot_host;
124156

@@ -269,6 +301,82 @@ public void incrementFrameKillFailureCounter(String hostname, String jobName, St
269301
jobName, frameName, frameId).inc();
270302
}
271303

304+
/**
305+
* Record a frame completion
306+
*
307+
* @param state final state of the frame
308+
* @param show show name
309+
* @param shot shot name
310+
*/
311+
public void recordFrameCompleted(String state, String show, String shot) {
312+
frameCompletedCounter
313+
.labels(this.deployment_environment, this.cuebot_host, state, show, shot).inc();
314+
}
315+
316+
/**
317+
* Record a job completion
318+
*
319+
* @param state final state of the job
320+
* @param show show name
321+
* @param shot shot name
322+
*/
323+
public void recordJobCompleted(String state, String show, String shot) {
324+
jobCompletedCounter.labels(this.deployment_environment, this.cuebot_host, state, show, shot)
325+
.inc();
326+
}
327+
328+
/**
329+
* Record job total core seconds for histogramming
330+
*
331+
* @param coreSeconds total core seconds consumed by the job
332+
* @param show show name
333+
* @param shot shot name
334+
*/
335+
public void recordJobCoreSeconds(double coreSeconds, String show, String shot) {
336+
jobCoreSecondsHistogram.labels(this.deployment_environment, this.cuebot_host, show, shot)
337+
.observe(coreSeconds);
338+
}
339+
340+
/**
341+
* Record layer max runtime for histogramming
342+
*
343+
* @param runtimeSeconds max runtime in seconds for the layer
344+
* @param show show name
345+
* @param shot shot name
346+
* @param layerType layer type
347+
*/
348+
public void recordLayerMaxRuntime(double runtimeSeconds, String show, String shot,
349+
String layerType) {
350+
layerMaxRuntimeHistogram
351+
.labels(this.deployment_environment, this.cuebot_host, show, shot, layerType)
352+
.observe(runtimeSeconds);
353+
}
354+
355+
/**
356+
* Record layer max memory usage for histogramming
357+
*
358+
* @param memoryBytes max memory in bytes for the layer
359+
* @param show show name
360+
* @param shot shot name
361+
* @param layerType layer type
362+
*/
363+
public void recordLayerMaxMemory(double memoryBytes, String show, String shot,
364+
String layerType) {
365+
layerMaxMemoryHistogram
366+
.labels(this.deployment_environment, this.cuebot_host, show, shot, layerType)
367+
.observe(memoryBytes);
368+
}
369+
370+
/**
371+
* Record a host report received
372+
*
373+
* @param facility facility name
374+
*/
375+
public void recordHostReport(String facility) {
376+
hostReportsReceivedCounter.labels(this.deployment_environment, this.cuebot_host, facility)
377+
.inc();
378+
}
379+
272380
// Setters used for dependency injection
273381
public void setBookingQueue(BookingQueue bookingQueue) {
274382
this.bookingQueue = bookingQueue;

cuebot/src/main/java/com/imageworks/spcue/config/AppConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
"classpath:conf/spring/applicationContext-grpcServer.xml",
3939
"classpath:conf/spring/applicationContext-service.xml",
4040
"classpath:conf/spring/applicationContext-jms.xml",
41-
"classpath:conf/spring/applicationContext-criteria.xml"})
41+
"classpath:conf/spring/applicationContext-criteria.xml",
42+
"classpath:conf/spring/applicationContext-monitoring.xml"})
4243
@EnableConfigurationProperties
4344
@PropertySource({"classpath:opencue.properties"})
4445
public class AppConfig {

cuebot/src/main/java/com/imageworks/spcue/dao/DependDao.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,14 @@ public interface DependDao {
194194
*/
195195
boolean decrementDependCount(FrameInterface f);
196196

197+
/**
198+
* Check if a frame is dispatchable (has depend_count = 0).
199+
*
200+
* @param f the frame to check
201+
* @return true if the frame's depend_count is 0
202+
*/
203+
boolean isFrameDispatchable(FrameInterface f);
204+
197205
/**
198206
* Returns true if this is the thread that set the depend to inactive.
199207
*

cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DependDaoJdbc.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,15 @@ public boolean decrementDependCount(FrameInterface f) {
215215
return getJdbcTemplate().update(DECREMENT_DEPEND_COUNT, f.getFrameId()) == 1;
216216
}
217217

218+
private static final String IS_FRAME_DISPATCHABLE =
219+
"SELECT int_depend_count = 0 FROM frame WHERE pk_frame = ?";
220+
221+
@Override
222+
public boolean isFrameDispatchable(FrameInterface f) {
223+
return Boolean.TRUE.equals(getJdbcTemplate().queryForObject(IS_FRAME_DISPATCHABLE,
224+
Boolean.class, f.getFrameId()));
225+
}
226+
218227
private static final String[] DELETE_DEPEND =
219228
{"DELETE FROM depend WHERE pk_parent=?", "DELETE FROM depend WHERE pk_depend=?"};
220229

cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,13 @@ public FrameStateTotals mapRow(ResultSet rs, int rowNum) throws SQLException {
409409
}, layer.getLayerId());
410410
}
411411

412-
private static final String GET_EXECUTION_SUMMARY = "SELECT "
413-
+ "layer_usage.int_core_time_success," + "layer_usage.int_core_time_fail,"
414-
+ "layer_usage.int_gpu_time_success," + "layer_usage.int_gpu_time_fail,"
415-
+ "layer_usage.int_clock_time_success," + "layer_mem.int_max_rss " + "FROM " + "layer,"
416-
+ "layer_usage, " + "layer_mem " + "WHERE " + "layer.pk_layer = layer_usage.pk_layer "
417-
+ "AND " + "layer.pk_layer = layer_mem.pk_layer " + "AND " + "layer.pk_layer = ?";
412+
private static final String GET_EXECUTION_SUMMARY =
413+
"SELECT " + "layer_usage.int_core_time_success," + "layer_usage.int_core_time_fail,"
414+
+ "layer_usage.int_gpu_time_success," + "layer_usage.int_gpu_time_fail,"
415+
+ "layer_usage.int_clock_time_success," + "layer_usage.int_clock_time_high,"
416+
+ "layer_mem.int_max_rss " + "FROM " + "layer," + "layer_usage, " + "layer_mem "
417+
+ "WHERE " + "layer.pk_layer = layer_usage.pk_layer " + "AND "
418+
+ "layer.pk_layer = layer_mem.pk_layer " + "AND " + "layer.pk_layer = ?";
418419

419420
@Override
420421
public ExecutionSummary getExecutionSummary(LayerInterface layer) {
@@ -429,6 +430,7 @@ public ExecutionSummary mapRow(ResultSet rs, int rowNum) throws SQLException {
429430
e.gpuTimeFail = rs.getLong("int_gpu_time_fail");
430431
e.gpuTime = e.gpuTimeSuccess + e.gpuTimeFail;
431432
e.highMemoryKb = rs.getLong("int_max_rss");
433+
e.highFrameSec = rs.getInt("int_clock_time_high");
432434
return e;
433435
}
434436
}, layer.getLayerId());

0 commit comments

Comments
 (0)