11use axum:: {
22 Router ,
33 extract:: { Request , State } ,
4- http:: StatusCode ,
4+ http:: { header , StatusCode } ,
55 response:: { IntoResponse , Response } ,
66 routing:: post,
77} ;
88use libdd_trace_utils:: trace_utils:: TracerHeaderTags as DatadogTracerHeaderTags ;
9- use serde_json:: json;
9+ use opentelemetry_proto:: tonic:: collector:: trace:: v1:: ExportTraceServiceResponse ;
10+ use prost:: Message ;
1011use std:: net:: SocketAddr ;
1112use std:: sync:: Arc ;
1213use tokio:: { net:: TcpListener , sync:: mpsc:: Sender } ;
1314use tokio_util:: sync:: CancellationToken ;
15+ use tonic:: Code ;
16+ use tonic_types:: Status ;
1417use tracing:: { debug, error} ;
1518
1619use crate :: {
@@ -141,35 +144,30 @@ impl Agent {
141144 let ( parts, body) = match extract_request_body ( request) . await {
142145 Ok ( r) => r,
143146 Err ( e) => {
144- return (
147+ return create_otlp_error_response (
145148 StatusCode :: INTERNAL_SERVER_ERROR ,
146- format ! ( "OTLP | Failed to extract request body: {e}" ) ,
147- )
148- . into_response ( ) ;
149+ Code :: Internal ,
150+ & format ! ( "Failed to extract request body: {e}" ) ,
151+ ) ;
149152 }
150153 } ;
151154
152155 let traces = match processor. process ( & body) {
153156 Ok ( traces) => traces,
154157 Err ( e) => {
155- error ! ( "OTLP | Failed to process request: {:?}" , e) ;
156- return (
157- StatusCode :: INTERNAL_SERVER_ERROR ,
158- format ! ( "Failed to process request: {e}" ) ,
159- )
160- . into_response ( ) ;
158+ return create_otlp_error_response (
159+ StatusCode :: BAD_REQUEST ,
160+ Code :: InvalidArgument ,
161+ & format ! ( "Failed to decode request: {e}" ) ,
162+ ) ;
161163 }
162164 } ;
163165
164166 let tracer_header_tags: DatadogTracerHeaderTags = ( & parts. headers ) . into ( ) ;
165167 let body_size = size_of_val ( & traces) ;
166168 if body_size == 0 {
167- return (
168- StatusCode :: INTERNAL_SERVER_ERROR ,
169- json ! ( { "message" : "Not sending traces, processor returned empty data" } )
170- . to_string ( ) ,
171- )
172- . into_response ( ) ;
169+ debug ! ( "OTLP | Not sending traces, processor returned empty data" ) ;
170+ return create_otlp_success_response ( ) ;
173171 }
174172
175173 let compute_trace_stats_on_extension = config. compute_trace_stats_on_extension ;
@@ -187,11 +185,11 @@ impl Agent {
187185 debug ! ( "OTLP | Successfully buffered traces to be aggregated." ) ;
188186 }
189187 Err ( err) => {
190- error ! ( "OTLP | Error sending traces to the trace aggregator: {err}" ) ;
191- return (
188+ return create_otlp_error_response (
192189 StatusCode :: INTERNAL_SERVER_ERROR ,
193- json ! ( { "message" : format!( "Error sending traces to the trace aggregator: {err}" ) } ) . to_string ( )
194- ) . into_response ( ) ;
190+ Code :: Internal ,
191+ & format ! ( "Trace aggregator unavailable: {err}" ) ,
192+ ) ;
195193 }
196194 } ;
197195
@@ -205,11 +203,57 @@ impl Agent {
205203 }
206204 }
207205
208- (
206+ create_otlp_success_response ( )
207+ }
208+ }
209+
210+ /// Creates an OTLP-compliant success response with protobuf encoding.
211+ /// Returns 200 OK with an empty ExportTraceServiceResponse.
212+ /// https://opentelemetry.io/docs/specs/otlp/
213+ fn create_otlp_success_response ( ) -> Response {
214+ let response = ExportTraceServiceResponse {
215+ partial_success : None ,
216+ } ;
217+ let mut buf = Vec :: new ( ) ;
218+ match response. encode ( & mut buf) {
219+ Ok ( ( ) ) => (
209220 StatusCode :: OK ,
210- json ! ( { "rate_by_service" : { "service:,env:" : 1 } } ) . to_string ( ) ,
221+ [ ( header:: CONTENT_TYPE , "application/x-protobuf" ) ] ,
222+ buf,
223+ )
224+ . into_response ( ) ,
225+ Err ( e) => {
226+ error ! ( "OTLP | Failed to encode success response: {}" , e) ;
227+ ( StatusCode :: OK , Vec :: new ( ) ) . into_response ( )
228+ }
229+ }
230+ }
231+
232+ /// Creates an OTLP-compliant error response with google.rpc.Status protobuf encoding.
233+ /// Returns the specified HTTP status code with a google.rpc.Status message.
234+ /// https://opentelemetry.io/docs/specs/otlp/
235+ fn create_otlp_error_response ( http_status : StatusCode , grpc_code : Code , message : & str ) -> Response {
236+ error ! (
237+ "OTLP | Error response: {} (gRPC code {:?}) - {}" ,
238+ http_status, grpc_code, message
239+ ) ;
240+ let status = Status {
241+ code : grpc_code as i32 ,
242+ message : message. to_string ( ) ,
243+ details : Vec :: new ( ) ,
244+ } ;
245+ let mut buf = Vec :: new ( ) ;
246+ match status. encode ( & mut buf) {
247+ Ok ( ( ) ) => (
248+ http_status,
249+ [ ( header:: CONTENT_TYPE , "application/x-protobuf" ) ] ,
250+ buf,
211251 )
212- . into_response ( )
252+ . into_response ( ) ,
253+ Err ( e) => {
254+ error ! ( "OTLP | Failed to encode error response: {}" , e) ;
255+ ( http_status, message. to_string ( ) ) . into_response ( )
256+ }
213257 }
214258}
215259
0 commit comments