|
1 | | -use std::sync::Arc; |
2 | | - |
3 | 1 | use crate::metric::MetricsClient; |
4 | | -use http::{header::CONTENT_TYPE, Method}; |
5 | | -use opentelemetry::otel_debug; |
6 | 2 | use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; |
7 | 3 | use opentelemetry_sdk::metrics::data::ResourceMetrics; |
8 | 4 |
|
9 | 5 | use super::OtlpHttpClient; |
10 | 6 |
|
11 | | -#[cfg(feature = "http-retry")] |
12 | | -use super::{classify_http_export_error, HttpExportError, HttpRetryData}; |
13 | | -#[cfg(feature = "http-retry")] |
14 | | -use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; |
15 | | -#[cfg(feature = "http-retry")] |
16 | | -use opentelemetry_sdk::runtime::Tokio; |
17 | | - |
18 | 7 | impl MetricsClient for OtlpHttpClient { |
19 | | - #[cfg(feature = "http-retry")] |
20 | 8 | async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { |
21 | | - let policy = RetryPolicy { |
22 | | - max_retries: 3, |
23 | | - initial_delay_ms: 100, |
24 | | - max_delay_ms: 1600, |
25 | | - jitter_ms: 100, |
| 9 | + let build_body_wrapper = |client: &OtlpHttpClient, metrics: &ResourceMetrics| { |
| 10 | + client |
| 11 | + .build_metrics_export_body(metrics) |
| 12 | + .ok_or_else(|| "Failed to serialize metrics".to_string()) |
26 | 13 | }; |
27 | 14 |
|
28 | | - // Build request body once before retry loop |
29 | | - let (body, content_type, content_encoding) = |
30 | | - self.build_metrics_export_body(metrics).ok_or_else(|| { |
31 | | - OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) |
32 | | - })?; |
33 | | - |
34 | | - let retry_data = Arc::new(HttpRetryData { |
35 | | - body, |
36 | | - headers: self.headers.clone(), |
37 | | - endpoint: self.collector_endpoint.to_string(), |
38 | | - }); |
39 | | - |
40 | | - retry_with_backoff( |
41 | | - Tokio, |
42 | | - policy, |
43 | | - classify_http_export_error, |
44 | | - "HttpMetricsClient.Export", |
45 | | - || async { |
46 | | - // Get client |
47 | | - let client = self |
48 | | - .client |
49 | | - .lock() |
50 | | - .map_err(|e| HttpExportError::new(500, format!("Mutex lock failed: {e}")))? |
51 | | - .as_ref() |
52 | | - .ok_or_else(|| { |
53 | | - HttpExportError::new(500, "Exporter already shutdown".to_string()) |
54 | | - })? |
55 | | - .clone(); |
56 | | - |
57 | | - // Build HTTP request |
58 | | - let mut request_builder = http::Request::builder() |
59 | | - .method(Method::POST) |
60 | | - .uri(&retry_data.endpoint) |
61 | | - .header(CONTENT_TYPE, content_type); |
62 | | - |
63 | | - if let Some(encoding) = content_encoding { |
64 | | - request_builder = request_builder.header("Content-Encoding", encoding); |
65 | | - } |
66 | | - |
67 | | - let mut request = request_builder |
68 | | - .body(retry_data.body.clone().into()) |
69 | | - .map_err(|e| { |
70 | | - HttpExportError::new(400, format!("Failed to build HTTP request: {e}")) |
71 | | - })?; |
72 | | - |
73 | | - for (k, v) in retry_data.headers.iter() { |
74 | | - request.headers_mut().insert(k.clone(), v.clone()); |
75 | | - } |
76 | | - |
77 | | - let request_uri = request.uri().to_string(); |
78 | | - otel_debug!(name: "HttpMetricsClient.ExportStarted"); |
79 | | - |
80 | | - // Send request |
81 | | - let response = client.send_bytes(request).await.map_err(|e| { |
82 | | - HttpExportError::new(0, format!("Network error: {e:?}")) // Network error |
83 | | - })?; |
84 | | - |
85 | | - let status_code = response.status().as_u16(); |
86 | | - let retry_after = response |
87 | | - .headers() |
88 | | - .get("retry-after") |
89 | | - .and_then(|v| v.to_str().ok()) |
90 | | - .map(|s| s.to_string()); |
91 | | - |
92 | | - if !response.status().is_success() { |
93 | | - let message = format!( |
94 | | - "HTTP export failed. Url: {}, Status: {}, Response: {:?}", |
95 | | - request_uri, |
96 | | - status_code, |
97 | | - response.body() |
98 | | - ); |
99 | | - return Err(match retry_after { |
100 | | - Some(retry_after) => { |
101 | | - HttpExportError::with_retry_after(status_code, retry_after, message) |
102 | | - } |
103 | | - None => HttpExportError::new(status_code, message), |
104 | | - }); |
105 | | - } |
106 | | - |
107 | | - otel_debug!(name: "HttpMetricsClient.ExportSucceeded"); |
108 | | - Ok(()) |
109 | | - }, |
110 | | - ) |
111 | | - .await |
112 | | - .map_err(|e| OTelSdkError::InternalFailure(e.message)) |
113 | | - } |
114 | | - |
115 | | - #[cfg(not(feature = "http-retry"))] |
116 | | - async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { |
117 | | - let client = self |
118 | | - .client |
119 | | - .lock() |
120 | | - .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}"))) |
121 | | - .and_then(|g| match &*g { |
122 | | - Some(client) => Ok(Arc::clone(client)), |
123 | | - _ => Err(OTelSdkError::AlreadyShutdown), |
124 | | - })?; |
125 | | - |
126 | | - let (body, content_type, content_encoding) = |
127 | | - self.build_metrics_export_body(metrics).ok_or_else(|| { |
128 | | - OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) |
129 | | - })?; |
130 | | - |
131 | | - let mut request_builder = http::Request::builder() |
132 | | - .method(Method::POST) |
133 | | - .uri(&self.collector_endpoint) |
134 | | - .header(CONTENT_TYPE, content_type); |
135 | | - |
136 | | - if let Some(encoding) = content_encoding { |
137 | | - request_builder = request_builder.header("Content-Encoding", encoding); |
138 | | - } |
139 | | - |
140 | | - let mut request = request_builder |
141 | | - .body(body.into()) |
142 | | - .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; |
143 | | - |
144 | | - for (k, v) in self.headers.iter() { |
145 | | - request.headers_mut().insert(k.clone(), v.clone()); |
146 | | - } |
147 | | - |
148 | | - otel_debug!(name: "HttpMetricsClient.ExportStarted"); |
149 | | - let result = client.send_bytes(request).await; |
150 | | - |
151 | | - match result { |
152 | | - Ok(response) => { |
153 | | - if response.status().is_success() { |
154 | | - otel_debug!(name: "HttpMetricsClient.ExportSucceeded"); |
155 | | - Ok(()) |
156 | | - } else { |
157 | | - let error = format!( |
158 | | - "OpenTelemetry metrics export failed. Status Code: {}, Response: {:?}", |
159 | | - response.status().as_u16(), |
160 | | - response.body() |
161 | | - ); |
162 | | - otel_debug!(name: "HttpMetricsClient.ExportFailed", error = &error); |
163 | | - Err(OTelSdkError::InternalFailure(error)) |
164 | | - } |
165 | | - } |
166 | | - Err(e) => { |
167 | | - let error = format!("{e:?}"); |
168 | | - otel_debug!(name: "HttpMetricsClient.ExportFailed", error = &error); |
169 | | - Err(OTelSdkError::InternalFailure(error)) |
170 | | - } |
171 | | - } |
| 15 | + self.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export") |
| 16 | + .await |
172 | 17 | } |
173 | 18 |
|
174 | 19 | fn shutdown(&self) -> OTelSdkResult { |
|
0 commit comments