Skip to content

Commit 225a059

Browse files
authored
chore: Define an E2E ingestion processing time metric (#20054)
1 parent 3559d4b commit 225a059

File tree

5 files changed

+123
-24
lines changed

5 files changed

+123
-24
lines changed

pkg/dataobj/consumer/partition_processor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type partitionProcessor struct {
7474
// The initial value is zero and must be reset to zero after each flush.
7575
lastModified time.Time
7676

77+
// earliestRecordTime tracks the earliest timestamp all the records Appended to the builder for each object.
78+
earliestRecordTime time.Time
79+
7780
// Metrics
7881
metrics *partitionOffsetMetrics
7982

@@ -216,8 +219,9 @@ func (p *partitionProcessor) initBuilder() error {
216219

217220
func (p *partitionProcessor) emitObjectWrittenEvent(ctx context.Context, objectPath string) error {
218221
event := &metastore.ObjectWrittenEvent{
219-
ObjectPath: objectPath,
220-
WriteTime: p.clock.Now().Format(time.RFC3339),
222+
ObjectPath: objectPath,
223+
WriteTime: p.clock.Now().Format(time.RFC3339),
224+
EarliestRecordTime: p.earliestRecordTime.Format(time.RFC3339),
221225
}
222226

223227
eventBytes, err := event.Marshal()
@@ -240,6 +244,10 @@ func (p *partitionProcessor) processRecord(ctx context.Context, record partition
240244
// Update offset metric at the end of processing
241245
defer p.metrics.updateOffset(record.Offset)
242246

247+
if record.Timestamp.Before(p.earliestRecordTime) || p.earliestRecordTime.IsZero() {
248+
p.earliestRecordTime = record.Timestamp
249+
}
250+
243251
// Observe processing delay
244252
p.metrics.observeProcessingDelay(record.Timestamp)
245253

@@ -325,6 +333,7 @@ func (p *partitionProcessor) flush(ctx context.Context) error {
325333

326334
p.lastModified = time.Time{}
327335
p.lastFlushed = p.clock.Now()
336+
p.earliestRecordTime = time.Time{}
328337

329338
return nil
330339
}

pkg/dataobj/index/indexer.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
325325

326326
// Update metrics
327327
buildTime := time.Since(start)
328-
si.updateMetrics(buildTime)
328+
si.updateMetrics(buildTime, getEarliestIndexedRecord(si.logger, events[:processed]))
329329

330330
if err != nil {
331331
level.Error(si.logger).Log("msg", "failed to build index",
@@ -350,6 +350,21 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
350350
}
351351
}
352352

353+
func getEarliestIndexedRecord(logger log.Logger, events []metastore.ObjectWrittenEvent) time.Time {
354+
var earliestIndexedRecordTime time.Time
355+
for _, ev := range events {
356+
ts, err := time.Parse(time.RFC3339, ev.EarliestRecordTime)
357+
if err != nil {
358+
level.Error(logger).Log("msg", "failed to parse earliest record time", "err", err)
359+
continue
360+
}
361+
if ts.Before(earliestIndexedRecordTime) || earliestIndexedRecordTime.IsZero() {
362+
earliestIndexedRecordTime = ts
363+
}
364+
}
365+
return earliestIndexedRecordTime
366+
}
367+
353368
// buildIndex is writing all metastore events to a single index object. It
354369
// returns the index path and the number of events processed or an error if the index object is not created.
355370
// The number of events processed can be less than the number of events if the builder becomes full
@@ -501,10 +516,13 @@ func (si *serialIndexer) flushIndex(ctx context.Context, partition int32) (strin
501516
}
502517

503518
// updateMetrics updates internal build metrics
504-
func (si *serialIndexer) updateMetrics(buildTime time.Duration) {
519+
func (si *serialIndexer) updateMetrics(buildTime time.Duration, earliestIndexedRecord time.Time) {
505520
si.indexerMetrics.incBuilds()
506521
si.indexerMetrics.setBuildTime(buildTime)
507522
si.indexerMetrics.setQueueDepth(len(si.buildRequestChan))
523+
if !earliestIndexedRecord.IsZero() {
524+
si.indexerMetrics.setEndToEndProcessingTime(time.Since(earliestIndexedRecord))
525+
}
508526
}
509527

510528
// ObjectKey generates the object key for storing an index object in object storage.

pkg/dataobj/index/metrics.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ type indexerMetrics struct {
9090

9191
// Queue metrics
9292
queueDepth prometheus.Gauge
93+
94+
// End-to-end processing time metric
95+
endToEndProcessingTime prometheus.Gauge
9396
}
9497

9598
func newIndexerMetrics() *indexerMetrics {
@@ -110,6 +113,10 @@ func newIndexerMetrics() *indexerMetrics {
110113
Name: "loki_index_builder_queue_depth",
111114
Help: "Current depth of the build request queue",
112115
}),
116+
endToEndProcessingTime: prometheus.NewGauge(prometheus.GaugeOpts{
117+
Name: "loki_ingest_end_to_end_processing_time_seconds",
118+
Help: "Time between a log line being written to kafka by the distributors and the index-builder making it available for querying in seconds",
119+
}),
113120
}
114121

115122
return m
@@ -121,6 +128,7 @@ func (m *indexerMetrics) register(reg prometheus.Registerer) error {
121128
m.totalBuilds,
122129
m.buildTimeSeconds,
123130
m.queueDepth,
131+
m.endToEndProcessingTime,
124132
}
125133

126134
for _, collector := range collectors {
@@ -139,6 +147,7 @@ func (m *indexerMetrics) unregister(reg prometheus.Registerer) {
139147
m.totalBuilds,
140148
m.buildTimeSeconds,
141149
m.queueDepth,
150+
m.endToEndProcessingTime,
142151
}
143152

144153
for _, collector := range collectors {
@@ -161,3 +170,7 @@ func (m *indexerMetrics) setBuildTime(duration time.Duration) {
161170
func (m *indexerMetrics) setQueueDepth(depth int) {
162171
m.queueDepth.Set(float64(depth))
163172
}
173+
174+
func (m *indexerMetrics) setEndToEndProcessingTime(duration time.Duration) {
175+
m.endToEndProcessingTime.Set(duration.Seconds())
176+
}

pkg/dataobj/metastore/metastore.pb.go

Lines changed: 77 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/dataobj/metastore/metastore.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ package dataobj.metastore;
55

66
option go_package = "github.com/grafana/loki/v3/pkg/dataobj/metastore";
77

8-
// ColumnInfo describes an individual column within a data set.
8+
// ObjectWrittenEvent describes a new logs object being written to object storage.
99
message ObjectWrittenEvent {
1010
string objectPath = 2;
1111
string writeTime = 3;
12+
string earliestRecordTime = 4;
1213
reserved 1; // tenant
1314
}

0 commit comments

Comments
 (0)