forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.rs
More file actions
189 lines (163 loc) · 5.96 KB
/
config.rs
File metadata and controls
189 lines (163 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
use futures::FutureExt;
use http::{StatusCode, Uri};
use hyper::Body;
use snafu::Snafu;
use vector_lib::configurable::configurable_component;
use tokio::time::{interval, Duration};
use crate::{
gcp::{GcpAuthenticator, GcpError},
http::{HttpClient, HttpError},
sinks::{
gcs_common::service::GcsResponse,
util::retries::{RetryAction, RetryLogic},
Healthcheck, HealthcheckError,
},
};
pub const BASE_URL: &str = "https://storage.googleapis.com/";
/// GCS Predefined ACLs.
///
/// For more information, see [Predefined ACLs][predefined_acls].
///
/// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "kebab-case")]
pub enum GcsPredefinedAcl {
/// Bucket/object can be read by authenticated users.
///
/// The bucket/object owner is granted the `OWNER` permission, and anyone authenticated Google
/// account holder is granted the `READER` permission.
AuthenticatedRead,
/// Object is semi-private.
///
/// Both the object owner and bucket owner are granted the `OWNER` permission.
///
/// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
/// specified for a bucket.
BucketOwnerFullControl,
/// Object is private, except to the bucket owner.
///
/// The object owner is granted the `OWNER` permission, and the bucket owner is granted the
/// `READER` permission.
///
/// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
/// specified for a bucket.
BucketOwnerRead,
/// Bucket/object are private.
///
/// The bucket/object owner is granted the `OWNER` permission, and no one else has
/// access.
Private,
/// Bucket/object are private within the project.
///
/// Project owners and project editors are granted the `OWNER` permission, and anyone who is
/// part of the project team is granted the `READER` permission.
///
/// This is the default.
#[derivative(Default)]
ProjectPrivate,
/// Bucket/object can be read publically.
///
/// The bucket/object owner is granted the `OWNER` permission, and all other users, whether
/// authenticated or anonymous, are granted the `READER` permission.
PublicRead,
}
/// GCS storage classes.
///
/// For more information, see [Storage classes][storage_classes].
///
/// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
#[derivative(Default)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum GcsStorageClass {
/// Standard storage.
///
/// This is the default.
#[derivative(Default)]
Standard,
/// Nearline storage.
Nearline,
/// Coldline storage.
Coldline,
/// Archive storage.
Archive,
}
#[derive(Debug, Snafu)]
pub enum GcsError {
#[snafu(display("Bucket {:?} not found", bucket))]
BucketNotFound { bucket: String },
}
pub fn build_healthcheck(
bucket: String,
client: HttpClient,
base_url: String,
auth: GcpAuthenticator,
) -> crate::Result<Healthcheck> {
let healthcheck = async move {
let uri = base_url.parse::<Uri>()?;
let mut num_retries = 0;
let max_retries = 3;
// repeat healthcheck every 5 sec
let mut interval = interval(Duration::from_secs(5));
let mut num_failures = 0;
let not_found_error = GcsError::BucketNotFound { bucket }.into();
loop {
interval.tick().await;
let mut request = http::Request::head(uri.clone()).body(Body::empty())?;
auth.apply(&mut request);
let response = client.send(request).await?;
num_retries += 1;
if response.status().is_success() {
// the healthcheck passes on the first success
return healthcheck_response(response, not_found_error);
} else {
// debug the healthcheck response
warn!("healthcheck response was not successful! {:#?}", response);
num_failures += 1;
}
if num_retries >= max_retries {
info!("non-success healthcheck responses = {}", num_failures);
info!("total healthcheck attempts = {}", num_retries);
return healthcheck_response(response, not_found_error);
}
}
};
Ok(healthcheck.boxed())
}
pub fn healthcheck_response(
response: http::Response<hyper::Body>,
not_found_error: crate::Error,
) -> crate::Result<()> {
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::FORBIDDEN => Err(GcpError::HealthcheckForbidden.into()),
StatusCode::NOT_FOUND => Err(not_found_error),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}
#[derive(Clone)]
pub struct GcsRetryLogic;
// This is a clone of HttpRetryLogic for the Body type, should get merged
impl RetryLogic for GcsRetryLogic {
type Error = HttpError;
type Response = GcsResponse;
fn is_retriable_error(&self, _error: &Self::Error) -> bool {
true
}
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
let status = response.inner.status();
match status {
StatusCode::UNAUTHORIZED => RetryAction::Retry("unauthorized".into()),
StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
StatusCode::NOT_IMPLEMENTED => {
RetryAction::DontRetry("endpoint not implemented".into())
}
_ if status.is_server_error() => RetryAction::Retry(status.to_string().into()),
_ if status.is_success() => RetryAction::Successful,
_ => RetryAction::Retry(format!("catchall retry with response status: {}", status).into()),
}
}
}