Skip to content

Commit c3d4095

Browse files
committed
encode_opentelemetry: Add encoding cut off for Prometheus Mimir
In Promethemus mimir, it requests to limit for 5 minutes in the same batch: https://github.com/grafana/mimir/blob/main/pkg/distributor/distributor.go#L1010-L1020 Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 4651386 commit c3d4095

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

include/cmetrics/cmt_encode_opentelemetry.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
#define CMT_ENCODE_OPENTELEMETRY_INVALID_ARGUMENT_ERROR 2
3131
#define CMT_ENCODE_OPENTELEMETRY_UNEXPECTED_METRIC_TYPE 3
3232
#define CMT_ENCODE_OPENTELEMETRY_DATA_POINT_INIT_ERROR 4
33+
#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR 5
34+
35+
#define CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD 300000000000L /* 5 minutes in nanoseconds */
36+
3337

3438
struct cmt_opentelemetry_context
3539
{

src/cmt_encode_opentelemetry.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2369,6 +2369,17 @@ int append_sample_to_metric(struct cmt_opentelemetry_context *context,
23692369
return result;
23702370
}
23712371

2372+
static int check_staled_timestamp(struct cmt_metric *metric, uint64_t now, uint64_t cutoff)
2373+
{
2374+
uint64_t ts;
2375+
uint64_t diff;
2376+
2377+
ts = cmt_metric_get_timestamp(metric);
2378+
diff = now - ts;
2379+
2380+
return diff > cutoff;
2381+
}
2382+
23722383
int pack_basic_type(struct cmt_opentelemetry_context *context,
23732384
struct cmt_map *map,
23742385
size_t *metric_index)
@@ -2382,8 +2393,10 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,
23822393
Opentelemetry__Proto__Metrics__V1__Metric *metric;
23832394
int result;
23842395
struct cfl_list *head;
2396+
uint64_t now;
23852397

23862398
sample_count = 0;
2399+
now = cfl_time_now();
23872400

23882401
if (map->metric_static_set) {
23892402
sample_count++;
@@ -2434,6 +2447,12 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,
24342447
&map->metric,
24352448
sample_index++);
24362449

2450+
if (check_staled_timestamp(&map->metric, now,
2451+
CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD)) {
2452+
/* Skip processing metrics which are staled over the threshold */
2453+
return CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR;
2454+
}
2455+
24372456
if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
24382457
destroy_metric(metric);
24392458

@@ -2444,6 +2463,12 @@ int pack_basic_type(struct cmt_opentelemetry_context *context,
24442463
cfl_list_foreach(head, &map->metrics) {
24452464
sample = cfl_list_entry(head, struct cmt_metric, _head);
24462465

2466+
if (check_staled_timestamp(&map->metric, now,
2467+
CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD)) {
2468+
/* Skip processing metrics which are staled over the threshold */
2469+
return CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR;
2470+
}
2471+
24472472
result = append_sample_to_metric(context,
24482473
metric,
24492474
map,
@@ -2527,6 +2552,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
25272552
counter = cfl_list_entry(head, struct cmt_counter, _head);
25282553
result = pack_basic_type(context, counter->map, &metric_index);
25292554

2555+
if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
2556+
continue;
2557+
}
2558+
25302559
if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
25312560
break;
25322561
}
@@ -2538,6 +2567,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
25382567
gauge = cfl_list_entry(head, struct cmt_gauge, _head);
25392568
result = pack_basic_type(context, gauge->map, &metric_index);
25402569

2570+
if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
2571+
continue;
2572+
}
2573+
25412574
if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
25422575
break;
25432576
}
@@ -2549,6 +2582,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
25492582
untyped = cfl_list_entry(head, struct cmt_untyped, _head);
25502583
result = pack_basic_type(context, untyped->map, &metric_index);
25512584

2585+
if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
2586+
continue;
2587+
}
2588+
25522589
if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
25532590
break;
25542591
}
@@ -2560,6 +2597,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
25602597
summary = cfl_list_entry(head, struct cmt_summary, _head);
25612598
result = pack_basic_type(context, summary->map, &metric_index);
25622599

2600+
if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
2601+
continue;
2602+
}
2603+
25632604
if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
25642605
break;
25652606
}
@@ -2571,6 +2612,10 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
25712612
histogram = cfl_list_entry(head, struct cmt_histogram, _head);
25722613
result = pack_basic_type(context, histogram->map, &metric_index);
25732614

2615+
if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
2616+
continue;
2617+
}
2618+
25742619
if (result != CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
25752620
break;
25762621
}
@@ -2580,6 +2625,9 @@ cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt)
25802625
if (result == CMT_ENCODE_OPENTELEMETRY_SUCCESS) {
25812626
buf = render_opentelemetry_context_to_sds(context);
25822627
}
2628+
else if (result == CMT_ENCODE_OPENTELEMETRY_CUTOFF_ERROR) {
2629+
buf = NULL;
2630+
}
25832631

25842632
destroy_opentelemetry_context(context);
25852633

tests/encoding.c

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,10 +583,12 @@ void test_opentelemetry()
583583
cfl_sds_t payload;
584584
struct cmt *cmt;
585585
FILE *sample_file;
586+
uint64_t ts;
586587

587588
cmt_initialize();
589+
ts = cfl_time_now();
588590

589-
cmt = generate_encoder_test_data();
591+
cmt = generate_encoder_test_data_with_timestamp(ts);
590592

591593
payload = cmt_encode_opentelemetry_create(cmt);
592594
TEST_CHECK(NULL != payload);
@@ -609,7 +611,26 @@ curl -v 'http://localhost:9090/v1/metrics' -H 'Content-Type: application/x-proto
609611

610612
fclose(sample_file);
611613

612-
cmt_encode_prometheus_remote_write_destroy(payload);
614+
cmt_encode_opentelemetry_destroy(payload);
615+
616+
cmt_destroy(cmt);
617+
}
618+
619+
void test_opentelemetry_outdated()
620+
{
621+
cfl_sds_t payload;
622+
struct cmt *cmt;
623+
uint64_t ts;
624+
625+
cmt_initialize();
626+
ts = cfl_time_now() - CMT_ENCODE_OPENTELEMETRY_CUTOFF_THRESHOLD * 1.5;
627+
628+
cmt = generate_encoder_test_data_with_timestamp(ts);
629+
630+
payload = cmt_encode_opentelemetry_create(cmt);
631+
TEST_CHECK(NULL == payload);
632+
633+
cmt_encode_opentelemetry_destroy(payload);
613634

614635
cmt_destroy(cmt);
615636
}
@@ -1173,6 +1194,7 @@ TEST_LIST = {
11731194
{"cmt_msgpack_labels", test_cmt_to_msgpack_labels},
11741195
{"cmt_msgpack", test_cmt_to_msgpack},
11751196
{"opentelemetry", test_opentelemetry},
1197+
{"opentelemetry_old_context", test_opentelemetry_outdated},
11761198
{"cloudwatch_emf", test_cloudwatch_emf},
11771199
{"prometheus", test_prometheus},
11781200
{"text", test_text},

0 commit comments

Comments
 (0)