Skip to content

Commit b8792e1

Browse files
[cuebot/proto] Fix Elasticsearch timestamp mapping and use proto composition
- Add createIndexTemplates() to ElasticsearchClient to ensure header.timestamp is mapped as date type with epoch_millis format (fixes Grafana "No data" issue) - Refactor monitoring.proto to use composition pattern - embed Job, Layer, Frame, Host messages instead of duplicating fields - Update MonitoringEventBuilder to work with embedded proto messages - Exclude -serial compiler warning in build.gradle for protobuf-generated code - Add unit tests for FRAME_STARTED and FRAME_DISPATCHED event building The timestamp mapping fix resolves time-based filtering in Grafana dashboards for Pickup Time Metrics (FRAME_STARTED/FRAME_DISPATCHED events).
1 parent 3c0686e commit b8792e1

File tree

6 files changed

+240
-248
lines changed

6 files changed

+240
-248
lines changed

cuebot/build.gradle

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ dependencies {
7878

7979
compileJava {
8080
dependsOn generateProto
81-
options.compilerArgs << "-Xlint:all" << "-Werror"
81+
// Exclude serial warning due to protobuf-generated code warnings
82+
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
8283
}
8384

8485
compileTestJava {
8586
dependsOn generateProto
86-
options.compilerArgs << "-Xlint:all" << "-Werror"
87+
// Exclude serial warning due to protobuf-generated code warnings
88+
options.compilerArgs << "-Xlint:all,-serial" << "-Werror"
8789
}
8890

8991
protobuf {

cuebot/src/main/java/com/imageworks/spcue/monitoring/ElasticsearchClient.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public void initialize() {
9999

100100
initializeClient();
101101
initializeThreadPool();
102+
createIndexTemplates();
102103

103104
logger.info("Elasticsearch client initialized");
104105
}
@@ -118,6 +119,59 @@ private void initializeThreadPool() {
118119
TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));
119120
}
120121

122+
/**
123+
* Creates index templates with proper field mappings to ensure timestamp fields are mapped as
124+
* date type instead of text.
125+
*/
126+
private void createIndexTemplates() {
127+
String[] indexPrefixes = {INDEX_JOB_EVENTS, INDEX_LAYER_EVENTS, INDEX_FRAME_EVENTS,
128+
INDEX_HOST_EVENTS, INDEX_HOST_REPORTS, INDEX_PROC_EVENTS};
129+
130+
for (String prefix : indexPrefixes) {
131+
try {
132+
String templateName = prefix.replace("-", "_") + "_template";
133+
134+
// Check if template already exists
135+
boolean templateExists =
136+
esClient.indices().existsIndexTemplate(r -> r.name(templateName)).value();
137+
138+
if (!templateExists) {
139+
esClient.indices().putIndexTemplate(t -> t.name(templateName)
140+
.indexPatterns(List.of(prefix + "-*")).priority(100)
141+
.template(template -> template.mappings(m -> m
142+
// Map header.timestamp as date with epoch_millis format
143+
.properties("header",
144+
p -> p.object(o -> o
145+
.properties("timestamp",
146+
tp -> tp.date(
147+
d -> d.format("epoch_millis")))
148+
.properties("event_type",
149+
ep -> ep.keyword(k -> k))
150+
.properties("event_id",
151+
ep -> ep.keyword(k -> k))
152+
.properties("source_cuebot",
153+
ep -> ep.keyword(k -> k))
154+
.properties("correlation_id",
155+
ep -> ep.keyword(k -> k))))
156+
// Map common fields as keywords for proper aggregation
157+
.properties("job_id", p -> p.keyword(k -> k))
158+
.properties("job_name", p -> p.keyword(k -> k))
159+
.properties("layer_id", p -> p.keyword(k -> k))
160+
.properties("layer_name", p -> p.keyword(k -> k))
161+
.properties("show", p -> p.keyword(k -> k))
162+
.properties("host_name", p -> p.keyword(k -> k))
163+
.properties("previous_state", p -> p.keyword(k -> k)))));
164+
165+
logger.info("Created index template: {}", templateName);
166+
} else {
167+
logger.debug("Index template already exists: {}", templateName);
168+
}
169+
} catch (Exception e) {
170+
logger.warn("Failed to create index template for {}: {}", prefix, e.getMessage());
171+
}
172+
}
173+
}
174+
121175
@PreDestroy
122176
public void shutdown() {
123177
if (indexingPool != null) {

cuebot/src/main/java/com/imageworks/spcue/monitoring/KafkaEventPublisher.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public EventHeader.Builder createEventHeader(EventType eventType, String correla
171171
public void publishJobEvent(JobEvent event) {
172172
if (!enabled)
173173
return;
174-
publishEvent(TOPIC_JOB_EVENTS, event.getJobId(), event,
174+
publishEvent(TOPIC_JOB_EVENTS, event.getJob().getId(), event,
175175
event.getHeader().getEventType().name());
176176
}
177177

@@ -181,7 +181,7 @@ public void publishJobEvent(JobEvent event) {
181181
public void publishLayerEvent(LayerEvent event) {
182182
if (!enabled)
183183
return;
184-
publishEvent(TOPIC_LAYER_EVENTS, event.getLayerId(), event,
184+
publishEvent(TOPIC_LAYER_EVENTS, event.getLayer().getId(), event,
185185
event.getHeader().getEventType().name());
186186
}
187187

@@ -191,7 +191,7 @@ public void publishLayerEvent(LayerEvent event) {
191191
public void publishFrameEvent(FrameEvent event) {
192192
if (!enabled)
193193
return;
194-
publishEvent(TOPIC_FRAME_EVENTS, event.getFrameId(), event,
194+
publishEvent(TOPIC_FRAME_EVENTS, event.getFrame().getId(), event,
195195
event.getHeader().getEventType().name());
196196
}
197197

@@ -201,7 +201,7 @@ public void publishFrameEvent(FrameEvent event) {
201201
public void publishHostEvent(HostEvent event) {
202202
if (!enabled)
203203
return;
204-
publishEvent(TOPIC_HOST_EVENTS, event.getHostName(), event,
204+
publishEvent(TOPIC_HOST_EVENTS, event.getHost().getName(), event,
205205
event.getHeader().getEventType().name());
206206
}
207207

0 commit comments

Comments
 (0)