Skip to content

Commit 9d51cf7

Browse files
Merge branch 'main' into cancel-none-fix
2 parents 1c7ffd4 + d1f88ee commit 9d51cf7

File tree

24 files changed

+335
-57
lines changed

24 files changed

+335
-57
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/api-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ clp-rust-utils = { path = "../clp-rust-utils" }
2525
futures = "0.3.31"
2626
mongodb = "3.3.0"
2727
num_enum = "0.7.5"
28+
non-empty-string = { version = "0.2.6", features = ["serde"] }
2829
pin-project-lite = "0.2.16"
2930
rmp-serde = "1.3.0"
3031
secrecy = "0.10.3"

components/api-server/src/client.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::pin::Pin;
22

33
use async_stream::stream;
44
use clp_rust_utils::{
5+
aws::AWS_DEFAULT_REGION,
56
clp_config::{
67
AwsAuthentication,
78
package::{
@@ -161,6 +162,7 @@ impl Client {
161162
/// * Forwards [`Client::get_status`]'s return values on failure.
162163
/// * Forwards [`Client::get_job_config`]'s return values on failure.
163164
/// * Forwards [`Client::fetch_results_from_mongo`]'s return values on failure.
165+
/// * Forwards [`Client::fetch_results_from_s3`]'s return values on failure.
164166
pub async fn fetch_results(
165167
&self,
166168
search_job_id: u64,
@@ -197,7 +199,7 @@ impl Client {
197199
inner: self.fetch_results_from_file(search_job_id),
198200
},
199201
StreamOutputStorage::S3 { .. } => SearchResultStream::S3 {
200-
inner: self.fetch_results_from_s3(search_job_id).await,
202+
inner: self.fetch_results_from_s3(search_job_id).await?,
201203
},
202204
};
203205
return Ok(stream);
@@ -311,13 +313,20 @@ impl Client {
311313
/// * Forwards [`aws_smithy_types::byte_stream::ByteStream::collect`]'s return values on
312314
/// failure.
313315
///
316+
/// # Errors
317+
///
318+
/// Return an error if:
319+
///
320+
/// * [`ClientError::Aws`] if a region code is not provided when using the default AWS S3
321+
/// endpoint.
322+
///
314323
/// # Panics
315324
///
316325
/// Panics if the stream output storage is not S3.
317326
async fn fetch_results_from_s3(
318327
&self,
319328
search_job_id: u64,
320-
) -> impl Stream<Item = Result<String, ClientError>> + use<> {
329+
) -> Result<impl Stream<Item = Result<String, ClientError>> + use<>, ClientError> {
321330
tracing::info!("Streaming results from S3");
322331
let StreamOutputStorage::S3 { s3_config, .. } = &self.config.stream_output.storage else {
323332
unreachable!();
@@ -326,13 +335,22 @@ impl Client {
326335
let AwsAuthentication::Credentials { credentials } = &s3_config.aws_authentication;
327336

328337
let s3_config = s3_config.clone();
329-
let credentials = credentials.clone();
338+
if s3_config.region_code.is_none() && s3_config.endpoint_url.is_none() {
339+
return Err(ClientError::Aws {
340+
description: "a region code must be given when using the default AWS S3 endpoint"
341+
.to_owned(),
342+
});
343+
}
330344

345+
let credentials = credentials.clone();
331346
let s3_client = clp_rust_utils::s3::create_new_client(
332-
s3_config.region_code.as_str(),
333347
credentials.access_key_id.as_str(),
334348
credentials.secret_access_key.as_str(),
335-
None,
349+
s3_config
350+
.region_code
351+
.as_ref()
352+
.map_or(AWS_DEFAULT_REGION, non_empty_string::NonEmptyString::as_str),
353+
s3_config.endpoint_url.as_ref(),
336354
)
337355
.await;
338356

@@ -345,7 +363,7 @@ impl Client {
345363
.into_paginator()
346364
.send();
347365

348-
stream! {
366+
Ok(stream! {
349367
while let Some(object_page) = object_pages.next().await {
350368
tracing::debug!("Received S3 object page: {:?}", object_page);
351369
for object in object_page?.contents() {
@@ -372,7 +390,7 @@ impl Client {
372390
}
373391
}
374392
}
375-
}
393+
})
376394
}
377395

378396
/// Asynchronously fetches results of a completed search job from `MongoDB`.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub const AWS_DEFAULT_REGION: &str = "us-east-1";

components/clp-rust-utils/src/clp_config/s3_config.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize};
55
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
66
pub struct S3Config {
77
pub bucket: NonEmptyString,
8-
pub region_code: String,
8+
pub region_code: Option<NonEmptyString>,
99
pub key_prefix: NonEmptyString,
10+
pub endpoint_url: Option<NonEmptyString>,
1011
pub aws_authentication: AwsAuthentication,
1112
}
1213

components/clp-rust-utils/src/job_config/ingestion.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ pub mod s3 {
66
/// Base configuration for ingesting logs from S3.
77
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
88
pub struct BaseConfig {
9-
/// AWS service region.
10-
pub region: String,
11-
129
/// The S3 bucket to ingest from.
1310
#[schema(value_type = String, min_length = 1)]
1411
pub bucket_name: NonEmptyString,
@@ -17,6 +14,17 @@ pub mod s3 {
1714
#[schema(value_type = String, min_length = 1)]
1815
pub key_prefix: NonEmptyString,
1916

17+
/// AWS service region. Must be provided if using the default AWS S3 endpoint.
18+
#[serde(default)]
19+
#[schema(value_type = String, min_length = 1)]
20+
pub region: Option<NonEmptyString>,
21+
22+
/// The endpoint URL for custom S3-compatible object stores (e.g., `MinIO`, `LocalStack`).
23+
/// Use the default AWS S3 endpoint if not provided.
24+
#[serde(default)]
25+
#[schema(value_type = String, min_length = 1)]
26+
pub endpoint_url: Option<NonEmptyString>,
27+
2028
/// The dataset to ingest into. Defaults to `None` (which uses the default dataset).
2129
#[serde(default)]
2230
#[schema(value_type = String, min_length = 1)]

components/clp-rust-utils/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod aws;
12
pub mod clp_config;
23
pub mod database;
34
mod error;

components/clp-rust-utils/src/s3/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use aws_sdk_s3::{
33
Client,
44
config::{Builder, Credentials, Region},
55
};
6+
use non_empty_string::NonEmptyString;
67

78
/// Creates a new S3 client.
89
///
@@ -16,10 +17,10 @@ use aws_sdk_s3::{
1617
/// A newly created S3 client.
1718
#[must_use]
1819
pub async fn create_new_client(
19-
region_id: &str,
2020
access_key_id: &str,
2121
secret_access_key: &str,
22-
endpoint: Option<&str>,
22+
region_id: &str,
23+
endpoint: Option<&NonEmptyString>,
2324
) -> Client {
2425
let credential = Credentials::new(
2526
access_key_id,
@@ -28,13 +29,12 @@ pub async fn create_new_client(
2829
None,
2930
"clp-credential-provider",
3031
);
31-
let region = Region::new(region_id.to_owned());
3232
let base_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
3333
let mut config_builder = Builder::from(&base_config)
34-
.region(region)
3534
.credentials_provider(credential)
35+
.region(Some(Region::new(region_id.to_string())))
3636
.force_path_style(true);
37-
config_builder.set_endpoint_url(endpoint.map(std::borrow::ToOwned::to_owned));
37+
config_builder.set_endpoint_url(endpoint.map(std::string::ToString::to_string));
3838
let config = config_builder.build();
3939
Client::from_conf(config)
4040
}

components/clp-rust-utils/src/sqs/client.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use aws_sdk_sqs::{
33
Client,
44
config::{Builder, Credentials, Region},
55
};
6+
use non_empty_string::NonEmptyString;
67

78
/// Creates a new SQS client.
89
/// The client is configured using the latest AWS SDK behavior version.
@@ -12,10 +13,10 @@ use aws_sdk_sqs::{
1213
/// A newly created SQS client.
1314
#[must_use]
1415
pub async fn create_new_client(
15-
region_id: &str,
1616
access_key_id: &str,
1717
secret_access_key: &str,
18-
endpoint: Option<&str>,
18+
region_id: &str,
19+
endpoint: Option<&NonEmptyString>,
1920
) -> Client {
2021
let credential = Credentials::new(
2122
access_key_id,
@@ -24,11 +25,10 @@ pub async fn create_new_client(
2425
None,
2526
"clp-credential-provider",
2627
);
27-
let region = Region::new(region_id.to_owned());
2828
let base_config = aws_config::defaults(BehaviorVersion::latest()).load().await;
2929
let mut config_builder = Builder::from(&base_config)
3030
.credentials_provider(credential)
31-
.region(region);
32-
config_builder.set_endpoint_url(endpoint.map(std::borrow::ToOwned::to_owned));
31+
.region(Some(Region::new(region_id.to_string())));
32+
config_builder.set_endpoint_url(endpoint.map(std::string::ToString::to_string));
3333
Client::from_conf(config_builder.build())
3434
}

components/clp-rust-utils/tests/clp_config_test.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use serde_json::Value;
1111
fn test_clp_io_config_serialization() {
1212
let s3_config = S3Config {
1313
bucket: NonEmptyString::from_static_str("yscope"),
14-
region_code: "us-east-2".into(),
14+
region_code: Some(NonEmptyString::from_static_str("us-east-2")),
1515
key_prefix: NonEmptyString::from_static_str("sample-logs/cockroachdb.clp.zst"),
16+
endpoint_url: None,
1617
aws_authentication: AwsAuthentication::Credentials {
1718
credentials: AwsCredentials {
1819
access_key_id: "ACCESS_KEY_ID".into(),
@@ -42,13 +43,12 @@ fn test_clp_io_config_serialization() {
4243

4344
let brotli_compressed_msgpack = BrotliMsgpack::serialize(&config)
4445
.expect("Brotli-compressed MessagePack serialized config.");
45-
let expected = "1ba00100e4ffdf9f43284b650e496850ba5f1eeefb53844a05d074faa66eb23ebef2dc45638\
46-
275e9c24cb3bccba29c9bfc9d95db42175d52eecc81793cb3bc3c4ed0bf604c56e5c9a24581d9e65080\
47-
1fd7263a8fb774fa362adf02eecc5b9d99532b8be8be173f6b659a9538c6c56a15571bc9856e20d0267\
48-
b1591599975a75cdeb2aea30b83c8b486f3a2b3a74b419d6f99db0742a1482603a9480912e1336f2780\
49-
dd9c3391503a9205a89a755bfe2c0d3a6be4c98ef0489c0b7e7f2d50b85f8f6e671a54d5dc6fa16d1ac\
50-
cbaaffc5c3f1fb140f21ba0dce6ff0e8bc5f2da3c58426a9947046ca3cf9a06c7c8219e25a6ad0c4c67\
51-
b6aceb8c88c782293b";
46+
let expected = "1bae0100e4ffdf9f43284b650e496850ba5f98ef7b53044b04d074faa66eb23ebef25c0d1a13ac\
47+
4b17669ac5cbe254cffc4e7423edc455d2f61c988e679697db09ba171ce575912c9a14986d0e05f87e4dd1babda\
48+
9d5b5d1c5267067deea8cacf9928cf673f1a75a627945e06c7cbe6c6b21d98945e16df94ba0d5fce7c2d61158ca\
49+
6541545e74dd5e3cddf21ea4832251d93cf6b3f69e3c05ab7eabc2dee3f305120988057210f19f59dc01ece49c0\
50+
8f8e2812440d5acf6d2cfd2ae334791ec8888c43ae0f7870485fb756b76b4485c29fc164a27fd592de69feb5724\
51+
16287f03545be2df6230185e931b8348a98a88804df492d3e01fd9c4b3c4b4b081e1cc970aaa2319b07cca0c";
5252
assert_eq!(expected, hex::encode(brotli_compressed_msgpack));
5353

5454
let json_serialized_result = serde_json::to_string_pretty(&config);
@@ -60,6 +60,7 @@ fn test_clp_io_config_serialization() {
6060
"bucket": "yscope",
6161
"region_code": "us-east-2",
6262
"key_prefix": "sample-logs/cockroachdb.clp.zst",
63+
"endpoint_url": null,
6364
"aws_authentication": {
6465
"type": "credentials",
6566
"credentials": {

0 commit comments

Comments
 (0)