|
1 |
| -use std::sync::Arc; |
2 |
| - |
3 | 1 | use crate::metric::MetricsClient;
|
4 |
| -use http::{header::CONTENT_TYPE, Method}; |
5 |
| -use opentelemetry::otel_debug; |
6 | 2 | use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
|
7 | 3 | use opentelemetry_sdk::metrics::data::ResourceMetrics;
|
8 | 4 |
|
9 | 5 | use super::OtlpHttpClient;
|
10 | 6 |
|
11 |
| -#[cfg(feature = "http-retry")] |
12 |
| -use super::{classify_http_export_error, HttpExportError, HttpRetryData}; |
13 |
| -#[cfg(feature = "http-retry")] |
14 |
| -use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; |
15 |
| -#[cfg(feature = "http-retry")] |
16 |
| -use opentelemetry_sdk::runtime::Tokio; |
17 |
| - |
18 | 7 | impl MetricsClient for OtlpHttpClient {
|
19 |
| - #[cfg(feature = "http-retry")] |
20 | 8 | async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
|
21 |
| - let policy = RetryPolicy { |
22 |
| - max_retries: 3, |
23 |
| - initial_delay_ms: 100, |
24 |
| - max_delay_ms: 1600, |
25 |
| - jitter_ms: 100, |
| 9 | + // Wrapper function to handle Option return type from build_metrics_export_body |
| 10 | + let build_body_wrapper = |client: &OtlpHttpClient, metrics: &ResourceMetrics| { |
| 11 | + client |
| 12 | + .build_metrics_export_body(metrics) |
| 13 | + .ok_or_else(|| "Failed to serialize metrics".to_string()) |
26 | 14 | };
|
27 | 15 |
|
28 |
| - // Build request body once before retry loop |
29 |
| - let (body, content_type, content_encoding) = |
30 |
| - self.build_metrics_export_body(metrics).ok_or_else(|| { |
31 |
| - OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) |
32 |
| - })?; |
33 |
| - |
34 |
| - let retry_data = Arc::new(HttpRetryData { |
35 |
| - body, |
36 |
| - headers: self.headers.clone(), |
37 |
| - endpoint: self.collector_endpoint.to_string(), |
38 |
| - }); |
39 |
| - |
40 |
| - retry_with_backoff( |
41 |
| - Tokio, |
42 |
| - policy, |
43 |
| - classify_http_export_error, |
44 |
| - "HttpMetricsClient.Export", |
45 |
| - || async { |
46 |
| - // Get client |
47 |
| - let client = self |
48 |
| - .client |
49 |
| - .lock() |
50 |
| - .map_err(|e| HttpExportError::new(500, format!("Mutex lock failed: {e}")))? |
51 |
| - .as_ref() |
52 |
| - .ok_or_else(|| { |
53 |
| - HttpExportError::new(500, "Exporter already shutdown".to_string()) |
54 |
| - })? |
55 |
| - .clone(); |
56 |
| - |
57 |
| - // Build HTTP request |
58 |
| - let mut request_builder = http::Request::builder() |
59 |
| - .method(Method::POST) |
60 |
| - .uri(&retry_data.endpoint) |
61 |
| - .header(CONTENT_TYPE, content_type); |
62 |
| - |
63 |
| - if let Some(encoding) = content_encoding { |
64 |
| - request_builder = request_builder.header("Content-Encoding", encoding); |
65 |
| - } |
66 |
| - |
67 |
| - let mut request = request_builder |
68 |
| - .body(retry_data.body.clone().into()) |
69 |
| - .map_err(|e| { |
70 |
| - HttpExportError::new(400, format!("Failed to build HTTP request: {e}")) |
71 |
| - })?; |
72 |
| - |
73 |
| - for (k, v) in retry_data.headers.iter() { |
74 |
| - request.headers_mut().insert(k.clone(), v.clone()); |
75 |
| - } |
76 |
| - |
77 |
| - let request_uri = request.uri().to_string(); |
78 |
| - otel_debug!(name: "HttpMetricsClient.ExportStarted"); |
79 |
| - |
80 |
| - // Send request |
81 |
| - let response = client.send_bytes(request).await.map_err(|e| { |
82 |
| - HttpExportError::new(0, format!("Network error: {e:?}")) // Network error |
83 |
| - })?; |
84 |
| - |
85 |
| - let status_code = response.status().as_u16(); |
86 |
| - let retry_after = response |
87 |
| - .headers() |
88 |
| - .get("retry-after") |
89 |
| - .and_then(|v| v.to_str().ok()) |
90 |
| - .map(|s| s.to_string()); |
91 |
| - |
92 |
| - if !response.status().is_success() { |
93 |
| - let message = format!( |
94 |
| - "HTTP export failed. Url: {}, Status: {}, Response: {:?}", |
95 |
| - request_uri, |
96 |
| - status_code, |
97 |
| - response.body() |
98 |
| - ); |
99 |
| - return Err(match retry_after { |
100 |
| - Some(retry_after) => { |
101 |
| - HttpExportError::with_retry_after(status_code, retry_after, message) |
102 |
| - } |
103 |
| - None => HttpExportError::new(status_code, message), |
104 |
| - }); |
105 |
| - } |
106 |
| - |
107 |
| - otel_debug!(name: "HttpMetricsClient.ExportSucceeded"); |
108 |
| - Ok(()) |
109 |
| - }, |
110 |
| - ) |
111 |
| - .await |
112 |
| - .map_err(|e| OTelSdkError::InternalFailure(e.message)) |
113 |
| - } |
114 |
| - |
115 |
| - #[cfg(not(feature = "http-retry"))] |
116 |
| - async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { |
117 |
| - let client = self |
118 |
| - .client |
119 |
| - .lock() |
120 |
| - .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}"))) |
121 |
| - .and_then(|g| match &*g { |
122 |
| - Some(client) => Ok(Arc::clone(client)), |
123 |
| - _ => Err(OTelSdkError::AlreadyShutdown), |
124 |
| - })?; |
125 |
| - |
126 |
| - let (body, content_type, content_encoding) = |
127 |
| - self.build_metrics_export_body(metrics).ok_or_else(|| { |
128 |
| - OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) |
129 |
| - })?; |
130 |
| - |
131 |
| - let mut request_builder = http::Request::builder() |
132 |
| - .method(Method::POST) |
133 |
| - .uri(&self.collector_endpoint) |
134 |
| - .header(CONTENT_TYPE, content_type); |
135 |
| - |
136 |
| - if let Some(encoding) = content_encoding { |
137 |
| - request_builder = request_builder.header("Content-Encoding", encoding); |
138 |
| - } |
139 |
| - |
140 |
| - let mut request = request_builder |
141 |
| - .body(body.into()) |
142 |
| - .map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?; |
143 |
| - |
144 |
| - for (k, v) in self.headers.iter() { |
145 |
| - request.headers_mut().insert(k.clone(), v.clone()); |
146 |
| - } |
147 |
| - |
148 |
| - otel_debug!(name: "HttpMetricsClient.ExportStarted"); |
149 |
| - let result = client.send_bytes(request).await; |
150 |
| - |
151 |
| - match result { |
152 |
| - Ok(response) => { |
153 |
| - if response.status().is_success() { |
154 |
| - otel_debug!(name: "HttpMetricsClient.ExportSucceeded"); |
155 |
| - Ok(()) |
156 |
| - } else { |
157 |
| - let error = format!( |
158 |
| - "OpenTelemetry metrics export failed. Status Code: {}, Response: {:?}", |
159 |
| - response.status().as_u16(), |
160 |
| - response.body() |
161 |
| - ); |
162 |
| - otel_debug!(name: "HttpMetricsClient.ExportFailed", error = &error); |
163 |
| - Err(OTelSdkError::InternalFailure(error)) |
164 |
| - } |
165 |
| - } |
166 |
| - Err(e) => { |
167 |
| - let error = format!("{e:?}"); |
168 |
| - otel_debug!(name: "HttpMetricsClient.ExportFailed", error = &error); |
169 |
| - Err(OTelSdkError::InternalFailure(error)) |
170 |
| - } |
171 |
| - } |
| 16 | + self.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export") |
| 17 | + .await |
172 | 18 | }
|
173 | 19 |
|
174 | 20 | fn shutdown(&self) -> OTelSdkResult {
|
|
0 commit comments