Skip to content

Commit 0c7e6fd

Browse files
committed
in_opentelemetry: do not use temp buffer for protobuf handling (perf improvement)
Reuse the log event encoder's body/metadata packers for the protobuf log path instead of temporary msgpack_sbuffer instances. - Point mp_pck/mp_pck_meta at encoder->body.packer and encoder->metadata.packer. Remove mp_sbuf and mp_sbuf_meta. - Pack resource/scope group content into encoder->body via dynamic_field_reset, pack, dynamic_field_flush (drop set_body_from_raw_msgpack and the temp buffer copy). - Pack per-record metadata into encoder->metadata the same way (reset, otel_pack_v1_metadata, flush); drop set_metadata_from_raw_msgpack. - Pack per-record body into encoder->body (reset, then either otlp_pack_any_value for KVLIST or inline single-key map + value, then flush); drop per-record set_body_from_raw_msgpack and append_body_values(MSGPACK_RAW_VALUE). Removes per-record and per-group temporary packing, buffer clears, and raw msgpack copies, reducing CPU and allocator pressure on the protobuf log conversion path. Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent cc97ad3 commit 0c7e6fd

File tree

1 file changed

+103
-93
lines changed

1 file changed

+103
-93
lines changed

plugins/in_opentelemetry/opentelemetry_logs.c

Lines changed: 103 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,8 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
310310
struct flb_mp_map_header mh_tmp;
311311
struct flb_time tm;
312312

313-
/* record buffer and packer */
314-
msgpack_sbuffer mp_sbuf;
315-
msgpack_packer mp_pck;
316-
317-
/* metadata buffer and packer */
318-
msgpack_sbuffer mp_sbuf_meta;
319-
msgpack_packer mp_pck_meta;
313+
msgpack_packer *mp_pck;
314+
msgpack_packer *mp_pck_meta;
320315

321316
/* OTel proto suff */
322317
Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
@@ -329,12 +324,8 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
329324
Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
330325
Opentelemetry__Proto__Resource__V1__Resource *resource;
331326

332-
/* initialize msgpack buffers */
333-
msgpack_sbuffer_init(&mp_sbuf);
334-
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
335-
336-
msgpack_sbuffer_init(&mp_sbuf_meta);
337-
msgpack_packer_init(&mp_pck_meta, &mp_sbuf_meta, msgpack_sbuffer_write);
327+
mp_pck = &encoder->body.packer;
328+
mp_pck_meta = &encoder->metadata.packer;
338329

339330
/* unpack logs from protobuf payload */
340331
input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
@@ -385,130 +376,133 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
385376
FLB_LOG_EVENT_INT64_VALUE(scope_log_index));
386377

387378

388-
flb_mp_map_header_init(&mh, &mp_pck);
379+
ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body);
380+
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
381+
flb_plg_error(ctx->ins, "failed to reset log event body: %s",
382+
flb_log_event_encoder_get_error_description(ret));
383+
goto binary_payload_to_msgpack_end;
384+
}
385+
386+
flb_mp_map_header_init(&mh, mp_pck);
389387

390388
/* Resource */
391389
flb_mp_map_header_append(&mh);
392-
msgpack_pack_str(&mp_pck, 8);
393-
msgpack_pack_str_body(&mp_pck, "resource", 8);
390+
msgpack_pack_str(mp_pck, 8);
391+
msgpack_pack_str_body(mp_pck, "resource", 8);
394392

395-
flb_mp_map_header_init(&mh_tmp, &mp_pck);
393+
flb_mp_map_header_init(&mh_tmp, mp_pck);
396394
if (resource) {
397395
/* look for OTel resource attributes */
398396
if (resource->n_attributes > 0 && resource->attributes) {
399397
flb_mp_map_header_append(&mh_tmp);
400-
msgpack_pack_str(&mp_pck, 10);
401-
msgpack_pack_str_body(&mp_pck, "attributes", 10);
398+
msgpack_pack_str(mp_pck, 10);
399+
msgpack_pack_str_body(mp_pck, "attributes", 10);
402400

403-
ret = otel_pack_kvarray(&mp_pck,
401+
ret = otel_pack_kvarray(mp_pck,
404402
resource->attributes,
405403
resource->n_attributes);
406404
if (ret != 0) {
407-
return ret;
405+
goto binary_payload_to_msgpack_end;
408406
}
409407
}
410408

411409
if (resource->dropped_attributes_count > 0) {
412410
flb_mp_map_header_append(&mh_tmp);
413-
msgpack_pack_str(&mp_pck, 24);
414-
msgpack_pack_str_body(&mp_pck, "dropped_attributes_count", 24);
415-
msgpack_pack_uint64(&mp_pck, resource->dropped_attributes_count);
411+
msgpack_pack_str(mp_pck, 24);
412+
msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
413+
msgpack_pack_uint64(mp_pck, resource->dropped_attributes_count);
416414
}
417415

418416
if (resource_log->schema_url) {
419417
flb_mp_map_header_append(&mh_tmp);
420-
msgpack_pack_str(&mp_pck, 10);
421-
msgpack_pack_str_body(&mp_pck, "schema_url", 10);
418+
msgpack_pack_str(mp_pck, 10);
419+
msgpack_pack_str_body(mp_pck, "schema_url", 10);
422420

423421
len = strlen(resource_log->schema_url);
424-
msgpack_pack_str(&mp_pck, len);
425-
msgpack_pack_str_body(&mp_pck, resource_log->schema_url, len);
422+
msgpack_pack_str(mp_pck, len);
423+
msgpack_pack_str_body(mp_pck, resource_log->schema_url, len);
426424
}
427425
}
428426
flb_mp_map_header_end(&mh_tmp);
429427

430428
/* scope */
431429
flb_mp_map_header_append(&mh);
432-
msgpack_pack_str(&mp_pck, 5);
433-
msgpack_pack_str_body(&mp_pck, "scope", 5);
430+
msgpack_pack_str(mp_pck, 5);
431+
msgpack_pack_str_body(mp_pck, "scope", 5);
434432

435433
/* Scope */
436434
scope = scope_log->scope;
437435

438436
if (scope && (scope->name || scope->version || scope->n_attributes > 0)) {
439-
flb_mp_map_header_init(&mh_tmp, &mp_pck);
437+
flb_mp_map_header_init(&mh_tmp, mp_pck);
440438

441439
if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) {
442440
flb_mp_map_header_append(&mh_tmp);
443-
msgpack_pack_str(&mp_pck, 10);
444-
msgpack_pack_str_body(&mp_pck, "schema_url", 10);
441+
msgpack_pack_str(mp_pck, 10);
442+
msgpack_pack_str_body(mp_pck, "schema_url", 10);
445443

446444
len = strlen(scope_log->schema_url);
447-
msgpack_pack_str(&mp_pck, len);
448-
msgpack_pack_str_body(&mp_pck, scope_log->schema_url, len);
445+
msgpack_pack_str(mp_pck, len);
446+
msgpack_pack_str_body(mp_pck, scope_log->schema_url, len);
449447
}
450448

451449
if (scope->name && strlen(scope->name) > 0) {
452450
flb_mp_map_header_append(&mh_tmp);
453-
msgpack_pack_str(&mp_pck, 4);
454-
msgpack_pack_str_body(&mp_pck, "name", 4);
451+
msgpack_pack_str(mp_pck, 4);
452+
msgpack_pack_str_body(mp_pck, "name", 4);
455453

456454
len = strlen(scope->name);
457-
msgpack_pack_str(&mp_pck, len);
458-
msgpack_pack_str_body(&mp_pck, scope->name, len);
455+
msgpack_pack_str(mp_pck, len);
456+
msgpack_pack_str_body(mp_pck, scope->name, len);
459457
}
460458
if (scope->version && strlen(scope->version) > 0) {
461459
flb_mp_map_header_append(&mh_tmp);
462460

463-
msgpack_pack_str(&mp_pck, 7);
464-
msgpack_pack_str_body(&mp_pck, "version", 7);
461+
msgpack_pack_str(mp_pck, 7);
462+
msgpack_pack_str_body(mp_pck, "version", 7);
465463

466464
len = strlen(scope->version);
467-
msgpack_pack_str(&mp_pck, len);
468-
msgpack_pack_str_body(&mp_pck, scope->version, len);
465+
msgpack_pack_str(mp_pck, len);
466+
msgpack_pack_str_body(mp_pck, scope->version, len);
469467
}
470468

471469
if (scope->n_attributes > 0 && scope->attributes) {
472470
flb_mp_map_header_append(&mh_tmp);
473-
msgpack_pack_str(&mp_pck, 10);
474-
msgpack_pack_str_body(&mp_pck, "attributes", 10);
475-
ret = otel_pack_kvarray(&mp_pck,
471+
msgpack_pack_str(mp_pck, 10);
472+
msgpack_pack_str_body(mp_pck, "attributes", 10);
473+
ret = otel_pack_kvarray(mp_pck,
476474
scope->attributes,
477475
scope->n_attributes);
478476
if (ret != 0) {
479-
return ret;
477+
goto binary_payload_to_msgpack_end;
480478
}
481479
}
482480

483481
if (scope->dropped_attributes_count > 0) {
484482
flb_mp_map_header_append(&mh_tmp);
485-
msgpack_pack_str(&mp_pck, 24);
486-
msgpack_pack_str_body(&mp_pck, "dropped_attributes_count", 24);
487-
msgpack_pack_uint64(&mp_pck, scope->dropped_attributes_count);
483+
msgpack_pack_str(mp_pck, 24);
484+
msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
485+
msgpack_pack_uint64(mp_pck, scope->dropped_attributes_count);
488486
}
489487

490488
flb_mp_map_header_end(&mh_tmp);
491489
}
492490
else {
493491
/* set an empty scope */
494-
msgpack_pack_map(&mp_pck, 0);
492+
msgpack_pack_map(mp_pck, 0);
495493
}
496494

497495
flb_mp_map_header_end(&mh);
498496

499-
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
500-
encoder,
501-
mp_sbuf.data,
502-
mp_sbuf.size);
497+
ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
503498
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
504-
flb_plg_error(ctx->ins, "could not set group content metadata");
499+
flb_plg_error(ctx->ins, "could not set group content metadata: %s",
500+
flb_log_event_encoder_get_error_description(ret));
505501
goto binary_payload_to_msgpack_end;
506502
}
507503

508504
flb_log_event_encoder_group_header_end(encoder);
509505

510-
msgpack_sbuffer_clear(&mp_sbuf);
511-
512506
for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) {
513507
ret = flb_log_event_encoder_begin_record(encoder);
514508

@@ -528,54 +522,72 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
528522
}
529523

530524
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
531-
msgpack_sbuffer_clear(&mp_sbuf_meta);
532-
ret = otel_pack_v1_metadata(ctx, &mp_pck_meta, log_records[log_record_index], resource, scope_log->scope);
533-
if (ret != 0) {
534-
flb_plg_error(ctx->ins, "failed to convert log record");
525+
ret = flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
526+
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
527+
flb_plg_error(ctx->ins, "failed to reset log event metadata: %s",
528+
flb_log_event_encoder_get_error_description(ret));
535529
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
536530
}
537531
else {
538-
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
539-
encoder,
540-
mp_sbuf_meta.data,
541-
mp_sbuf_meta.size);
532+
ret = otel_pack_v1_metadata(ctx,
533+
mp_pck_meta,
534+
log_records[log_record_index],
535+
resource,
536+
scope_log->scope);
542537
}
543538

544-
msgpack_sbuffer_clear(&mp_sbuf_meta);
539+
if (ret != 0) {
540+
flb_plg_error(ctx->ins, "failed to convert log record");
541+
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
542+
}
543+
else {
544+
ret = flb_log_event_encoder_dynamic_field_flush(&encoder->metadata);
545+
}
545546
}
546547

547548
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
548-
ret = otlp_pack_any_value(
549-
&mp_pck,
550-
log_records[log_record_index]->body);
551-
552-
if (ret != 0) {
553-
flb_plg_error(ctx->ins, "failed to convert log record body");
549+
ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body);
550+
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
551+
flb_plg_error(ctx->ins, "failed to reset log event body: %s",
552+
flb_log_event_encoder_get_error_description(ret));
554553
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
555554
}
555+
else if (ctx->logs_body_key == NULL &&
556+
log_records[log_record_index]->body != NULL &&
557+
log_records[log_record_index]->body->value_case ==
558+
OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
559+
ret = otlp_pack_any_value(
560+
mp_pck,
561+
log_records[log_record_index]->body);
562+
}
556563
else {
557-
if (ctx->logs_body_key == NULL &&
558-
log_records[log_record_index]->body != NULL &&
559-
log_records[log_record_index]->body->value_case ==
560-
OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
561-
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
562-
encoder,
563-
mp_sbuf.data,
564-
mp_sbuf.size);
564+
logs_body_key = ctx->logs_body_key;
565+
if (logs_body_key == NULL) {
566+
logs_body_key = "log";
567+
}
568+
ret = msgpack_pack_map(mp_pck, 1);
569+
if (ret == 0) {
570+
ret = msgpack_pack_str(mp_pck, strlen(logs_body_key));
571+
}
572+
if (ret == 0) {
573+
ret = msgpack_pack_str_body(mp_pck,
574+
logs_body_key,
575+
strlen(logs_body_key));
565576
}
566-
else {
567-
logs_body_key = ctx->logs_body_key;
568-
if (logs_body_key == NULL) {
569-
logs_body_key = "log";
570-
}
571-
ret = flb_log_event_encoder_append_body_values(
572-
encoder,
573-
FLB_LOG_EVENT_CSTRING_VALUE(logs_body_key),
574-
FLB_LOG_EVENT_MSGPACK_RAW_VALUE(mp_sbuf.data, mp_sbuf.size));
577+
if (ret == 0) {
578+
ret = otlp_pack_any_value(
579+
mp_pck,
580+
log_records[log_record_index]->body);
575581
}
576582
}
577583

578-
msgpack_sbuffer_clear(&mp_sbuf);
584+
if (ret != 0) {
585+
flb_plg_error(ctx->ins, "failed to convert log record body");
586+
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
587+
}
588+
else {
589+
ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
590+
}
579591
}
580592

581593
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
@@ -593,8 +605,6 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
593605
}
594606

595607
binary_payload_to_msgpack_end:
596-
msgpack_sbuffer_destroy(&mp_sbuf);
597-
msgpack_sbuffer_destroy(&mp_sbuf_meta);
598608
if (input_logs) {
599609
opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
600610
input_logs, NULL);

0 commit comments

Comments
 (0)