|
30 | 30 | #include <cfl/cfl.h> |
31 | 31 | #include <fluent-otel-proto/fluent-otel.h> |
32 | 32 |
|
33 | | -#include <cmetrics/cmetrics.h> |
34 | 33 | #include <fluent-bit/flb_gzip.h> |
35 | 34 | #include <fluent-bit/flb_zstd.h> |
| 35 | +#include <fluent-bit/flb_hash_table.h> |
| 36 | + |
| 37 | +#include <cmetrics/cmetrics.h> |
36 | 38 | #include <cmetrics/cmt_encode_opentelemetry.h> |
37 | 39 |
|
38 | 40 | #include <ctraces/ctraces.h> |
@@ -75,6 +77,115 @@ static int is_http_status_code_retrayable(int http_code) |
75 | 77 | return FLB_FALSE; |
76 | 78 | } |
77 | 79 |
|
| 80 | +static int opentelemetry_is_grpc_status_retryable(int status_code) |
| 81 | +{ |
| 82 | + if (status_code == 1 || /* CANCELLED */ |
| 83 | + status_code == 4 || /* DEADLINE_EXCEEDED */ |
| 84 | + status_code == 8 || /* RESOURCE_EXHAUSTED */ |
| 85 | + status_code == 10 || /* ABORTED */ |
| 86 | + status_code == 13 || /* INTERNAL */ |
| 87 | + status_code == 14) { /* UNAVAILABLE */ |
| 88 | + return FLB_TRUE; |
| 89 | + } |
| 90 | + |
| 91 | + return FLB_FALSE; |
| 92 | +} |
| 93 | + |
| 94 | +static int opentelemetry_lookup_header_value(struct flb_hash_table *table, |
| 95 | + const char *name, |
| 96 | + cfl_sds_t *out_value) |
| 97 | +{ |
| 98 | + void *value; |
| 99 | + size_t value_length; |
| 100 | + int result; |
| 101 | + |
| 102 | + if (table == NULL) { |
| 103 | + return FLB_FALSE; |
| 104 | + } |
| 105 | + |
| 106 | + result = flb_hash_table_get(table, |
| 107 | + name, |
| 108 | + strlen(name), |
| 109 | + &value, |
| 110 | + &value_length); |
| 111 | + |
| 112 | + if (result == -1) { |
| 113 | + return FLB_FALSE; |
| 114 | + } |
| 115 | + |
| 116 | + *out_value = cfl_sds_create_len((const char *) value, value_length); |
| 117 | + |
| 118 | + if (*out_value == NULL) { |
| 119 | + return FLB_FALSE; |
| 120 | + } |
| 121 | + |
| 122 | + return FLB_TRUE; |
| 123 | +} |
| 124 | + |
| 125 | +static int opentelemetry_check_grpc_status(struct opentelemetry_context *ctx, |
| 126 | + struct flb_http_response *response) |
| 127 | +{ |
| 128 | + cfl_sds_t grpc_message; |
| 129 | + cfl_sds_t grpc_status_text; |
| 130 | + int grpc_status; |
| 131 | + int result; |
| 132 | + |
| 133 | + grpc_message = NULL; |
| 134 | + grpc_status_text = NULL; |
| 135 | + grpc_status = 0; |
| 136 | + result = FLB_OK; |
| 137 | + |
| 138 | + /* ref: https://grpc.io/docs/guides/status-codes/ */ |
| 139 | + if (opentelemetry_lookup_header_value(response->trailer_headers, |
| 140 | + "grpc-status", |
| 141 | + &grpc_status_text) == FLB_FALSE && |
| 142 | + opentelemetry_lookup_header_value(response->headers, |
| 143 | + "grpc-status", |
| 144 | + &grpc_status_text) == FLB_FALSE) { |
| 145 | + |
| 146 | + return FLB_OK; |
| 147 | + } |
| 148 | + |
| 149 | + grpc_status = strtol(grpc_status_text, NULL, 10); |
| 150 | + |
| 151 | + if (opentelemetry_lookup_header_value(response->trailer_headers, |
| 152 | + "grpc-message", |
| 153 | + &grpc_message) == FLB_FALSE) { |
| 154 | + opentelemetry_lookup_header_value(response->headers, |
| 155 | + "grpc-message", |
| 156 | + &grpc_message); |
| 157 | + } |
| 158 | + |
| 159 | + if (grpc_status != 0) { |
| 160 | + if (grpc_message != NULL) { |
| 161 | + flb_plg_error(ctx->ins, |
| 162 | + "grpc-status=%d, grpc-message=%s", |
| 163 | + grpc_status, |
| 164 | + grpc_message); |
| 165 | + } |
| 166 | + else { |
| 167 | + flb_plg_error(ctx->ins, "grpc-status=%d", grpc_status); |
| 168 | + } |
| 169 | + |
| 170 | + if (opentelemetry_is_grpc_status_retryable(grpc_status)) { |
| 171 | + result = FLB_RETRY; |
| 172 | + } |
| 173 | + else { |
| 174 | + result = FLB_ERROR; |
| 175 | + } |
| 176 | + } |
| 177 | + |
| 178 | + if (grpc_message != NULL) { |
| 179 | + cfl_sds_destroy(grpc_message); |
| 180 | + } |
| 181 | + |
| 182 | + if (grpc_status_text != NULL) { |
| 183 | + cfl_sds_destroy(grpc_status_text); |
| 184 | + } |
| 185 | + |
| 186 | + return result; |
| 187 | +} |
| 188 | + |
78 | 189 | int opentelemetry_legacy_post(struct opentelemetry_context *ctx, |
79 | 190 | const void *body, size_t body_len, |
80 | 191 | const char *tag, int tag_len, |
@@ -342,6 +453,21 @@ int opentelemetry_post(struct opentelemetry_context *ctx, |
342 | 453 |
|
343 | 454 | if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 && |
344 | 455 | ctx->enable_grpc_flag) { |
| 456 | + /* nghttp2 does not automatically add the TE header because it is not |
| 457 | + * tied to the gRPC semantics, so we must set the required |
| 458 | + * "te: trailers" header explicitly for gRPC-over-HTTP/2. |
| 459 | + */ |
| 460 | + result = flb_http_request_set_header(request, |
| 461 | + "te", 2, |
| 462 | + "trailers", 8); |
| 463 | + |
| 464 | + if (result != 0) { |
| 465 | + flb_plg_error(ctx->ins, |
| 466 | + "failed to set gRPC TE header: %s", strerror(errno)); |
| 467 | + flb_http_client_request_destroy(request, FLB_TRUE); |
| 468 | + |
| 469 | + return FLB_RETRY; |
| 470 | + } |
345 | 471 |
|
346 | 472 | grpc_body = cfl_sds_create_size(body_len + 5); |
347 | 473 |
|
@@ -521,6 +647,13 @@ int opentelemetry_post(struct opentelemetry_context *ctx, |
521 | 647 | out_ret = FLB_OK; |
522 | 648 | } |
523 | 649 |
|
| 650 | + if (ctx->enable_grpc_flag && request->protocol_version == HTTP_PROTOCOL_VERSION_20 && out_ret == FLB_OK) { |
| 651 | + result = opentelemetry_check_grpc_status(ctx, response); |
| 652 | + if (result != FLB_OK) { |
| 653 | + out_ret = result; |
| 654 | + } |
| 655 | + } |
| 656 | + |
524 | 657 | flb_http_client_request_destroy(request, FLB_TRUE); |
525 | 658 |
|
526 | 659 | return out_ret; |
|
0 commit comments