From f2eda44fccf888643e438a4c0cfb1720e22a2f84 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 6 Jan 2026 13:01:58 -0600 Subject: [PATCH] out_opentelemetry: on HTTP/2, read and process gRPC status code Signed-off-by: Eduardo Silva --- plugins/out_opentelemetry/opentelemetry.c | 134 +++++++++++++++++++++- 1 file changed, 133 insertions(+), 1 deletion(-) diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index aac8e48af59..47cd6d12623 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -30,9 +30,11 @@ #include #include -#include #include #include +#include + +#include #include #include @@ -75,6 +77,115 @@ static int is_http_status_code_retrayable(int http_code) return FLB_FALSE; } +static int opentelemetry_is_grpc_status_retryable(int status_code) +{ + if (status_code == 1 || /* CANCELLED */ + status_code == 4 || /* DEADLINE_EXCEEDED */ + status_code == 8 || /* RESOURCE_EXHAUSTED */ + status_code == 10 || /* ABORTED */ + status_code == 13 || /* INTERNAL */ + status_code == 14) { /* UNAVAILABLE */ + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int opentelemetry_lookup_header_value(struct flb_hash_table *table, + const char *name, + cfl_sds_t *out_value) +{ + void *value; + size_t value_length; + int result; + + if (table == NULL) { + return FLB_FALSE; + } + + result = flb_hash_table_get(table, + name, + strlen(name), + &value, + &value_length); + + if (result == -1) { + return FLB_FALSE; + } + + *out_value = cfl_sds_create_len((const char *) value, value_length); + + if (*out_value == NULL) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int opentelemetry_check_grpc_status(struct opentelemetry_context *ctx, + struct flb_http_response *response) +{ + cfl_sds_t grpc_message; + cfl_sds_t grpc_status_text; + int grpc_status; + int result; + + grpc_message = NULL; + grpc_status_text = NULL; + grpc_status = 0; + result = FLB_OK; + + /* ref: https://grpc.io/docs/guides/status-codes/ */ + if (opentelemetry_lookup_header_value(response->trailer_headers, + "grpc-status", + &grpc_status_text) == FLB_FALSE && + opentelemetry_lookup_header_value(response->headers, + "grpc-status", + &grpc_status_text) == FLB_FALSE) { + + return FLB_OK; + } + + grpc_status = strtol(grpc_status_text, NULL, 10); + + if (opentelemetry_lookup_header_value(response->trailer_headers, + "grpc-message", + &grpc_message) == FLB_FALSE) { + opentelemetry_lookup_header_value(response->headers, + "grpc-message", + &grpc_message); + } + + if (grpc_status != 0) { + if (grpc_message != NULL) { + flb_plg_error(ctx->ins, + "grpc-status=%d, grpc-message=%s", + grpc_status, + grpc_message); + } + else { + flb_plg_error(ctx->ins, "grpc-status=%d", grpc_status); + } + + if (opentelemetry_is_grpc_status_retryable(grpc_status)) { + result = FLB_RETRY; + } + else { + result = FLB_ERROR; + } + } + + if (grpc_message != NULL) { + cfl_sds_destroy(grpc_message); + } + + if (grpc_status_text != NULL) { + cfl_sds_destroy(grpc_status_text); + } + + return result; +} + int opentelemetry_legacy_post(struct opentelemetry_context *ctx, const void *body, size_t body_len, const char *tag, int tag_len, @@ -342,6 +453,20 @@ int opentelemetry_post(struct opentelemetry_context *ctx, if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 && ctx->enable_grpc_flag) { + /* nghttp2 does not automatically add the TE header because it is not + * tied to the gRPC semantics, so we must set the required + * "te: trailers" header explicitly for gRPC-over-HTTP/2. + */ + result = flb_http_request_set_header(request, + "te", 2, + "trailers", 8); + + if (result != 0) { + flb_plg_error(ctx->ins, "failed to set gRPC TE header"); + flb_http_client_request_destroy(request, FLB_TRUE); + + return FLB_RETRY; + } grpc_body = cfl_sds_create_size(body_len + 5); @@ -521,6 +646,13 @@ int opentelemetry_post(struct opentelemetry_context *ctx, out_ret = FLB_OK; } + if (ctx->enable_grpc_flag && request->protocol_version == HTTP_PROTOCOL_VERSION_20 && out_ret == FLB_OK) { + result = opentelemetry_check_grpc_status(ctx, response); + if (result != FLB_OK) { + out_ret = result; + } + } + flb_http_client_request_destroy(request, FLB_TRUE); return out_ret;