Skip to content

Commit 7b6fd22

Browse files
kfirg-cetuclaude
andcommitted
feat(aws_s3 source): add custom SQS authentication option
Add support for configuring separate authentication credentials for SQS in the AWS S3 source. This enables cross-account scenarios where S3 buckets and SQS queues are in different AWS accounts or require different permission models. Changes: - Add optional `sqs.auth` field to SQS configuration - Add optional `sqs.deferred.auth` field for deferred queue - Modify client creation logic to use SQS-specific auth when provided - Fallback to main `auth` when `sqs.auth` is not specified - Full backwards compatibility maintained Configuration example: ```yaml [sources.my_s3_source] type = "aws_s3" auth.access_key_id = "S3_KEY" auth.secret_access_key = "S3_SECRET" sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/123/queue" sqs.auth.access_key_id = "SQS_KEY" sqs.auth.secret_access_key = "SQS_SECRET" ``` Testing: - Unit tests for config parsing (all auth variants) - Integration tests with LocalStack (multi-auth scenarios) - Backwards compatibility tests (existing configs work unchanged) Files modified: - src/sources/aws_s3/mod.rs (client creation logic) - src/sources/aws_s3/sqs.rs (config structures and tests) - changelog.d/aws_s3_custom_sqs_auth.enhancement.md (changelog) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 2a8183a commit 7b6fd22

File tree

3 files changed

+309
-5
lines changed

3 files changed

+309
-5
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
The `aws_s3` source now supports configuring separate authentication credentials for SQS via the new `sqs.auth` configuration option. This enables cross-account scenarios where S3 buckets and SQS queues are in different AWS accounts or require different permission models.
2+
3+
When `sqs.auth` is not specified, the source falls back to using the main `auth` configuration, maintaining full backwards compatibility with existing deployments.
4+
5+
The `sqs.deferred.auth` option is also available for configuring separate authentication for the deferred message queue.
6+
7+
authors: kfir

src/sources/aws_s3/mod.rs

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,20 +263,45 @@ impl AwsS3Config {
263263

264264
match self.sqs {
265265
Some(ref sqs) => {
266+
// Use SQS-specific auth if provided, otherwise fall back to main auth
267+
let sqs_auth = sqs.auth.as_ref().unwrap_or(&self.auth);
268+
266269
let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
267270
&SqsClientBuilder {},
268-
&self.auth,
271+
sqs_auth,
269272
region.clone(),
270-
endpoint,
273+
endpoint.clone(),
271274
proxy,
272275
sqs.tls_options.as_ref(),
273276
sqs.timeout.as_ref(),
274277
)
275278
.await?;
276279

280+
// Create deferred SQS client if deferred queue has separate auth
281+
let deferred_sqs_client = if let Some(ref deferred) = sqs.deferred {
282+
if let Some(ref deferred_auth) = deferred.auth {
283+
let (client, _) = create_client_and_region::<SqsClientBuilder>(
284+
&SqsClientBuilder {},
285+
deferred_auth,
286+
Some(region.clone()),
287+
endpoint.clone(),
288+
proxy,
289+
sqs.tls_options.as_ref(),
290+
sqs.timeout.as_ref(),
291+
)
292+
.await?;
293+
Some(client)
294+
} else {
295+
None
296+
}
297+
} else {
298+
None
299+
};
300+
277301
let ingestor = sqs::Ingestor::new(
278302
region,
279303
sqs_client,
304+
deferred_sqs_client,
280305
s3_client,
281306
sqs.clone(),
282307
self.compression,
@@ -1068,4 +1093,130 @@ mod integration_tests {
10681093
.await
10691094
.unwrap()
10701095
}
1096+
1097+
#[tokio::test]
1098+
async fn s3_process_message_with_different_sqs_auth() {
1099+
trace_init();
1100+
1101+
let logs: Vec<String> = random_lines(100).take(10).collect();
1102+
1103+
// Create config with separate SQS auth
1104+
let s3 = s3_client().await;
1105+
let sqs = sqs_client().await;
1106+
let queue = create_queue(&sqs).await;
1107+
let bucket = create_bucket(&s3).await;
1108+
1109+
tokio::time::sleep(Duration::from_secs(1)).await;
1110+
1111+
let mut config = config(&queue, None, false, DeserializerConfig::Bytes);
1112+
1113+
// Add custom SQS auth (LocalStack accepts any credentials)
1114+
config.sqs.as_mut().unwrap().auth = Some(AwsAuthentication::AccessKey {
1115+
access_key_id: "DIFFERENT_KEY_ID".to_string().into(),
1116+
secret_access_key: "DIFFERENT_SECRET_KEY".to_string().into(),
1117+
session_token: None,
1118+
assume_role: None,
1119+
external_id: None,
1120+
region: None,
1121+
session_name: None,
1122+
});
1123+
1124+
// Verify the source builds and runs with different SQS auth
1125+
let key = uuid::Uuid::new_v4().to_string();
1126+
let payload = logs.join("\n").into_bytes();
1127+
1128+
s3.put_object()
1129+
.bucket(bucket.clone())
1130+
.key(key.clone())
1131+
.body(ByteStream::from(payload))
1132+
.send()
1133+
.await
1134+
.expect("Could not put object");
1135+
1136+
let sqs_client = sqs_client().await;
1137+
let mut s3_event: S3Event = serde_json::from_str(
1138+
r#"{"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2022-03-24T19:43:00.548Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:ARNOTAREALIDD4:user.name"},"requestParameters":{"sourceIPAddress":"136.56.73.213"},"responseElements":{"x-amz-request-id":"ZX6X98Q6NM9NQTP3","x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"},"s3":{"s3SchemaVersion":"1.0","configurationId":"asdfasdf","bucket":{"name":"bucket-name","ownerIdentity":{"principalId":"A3PEG170DF9VNQ"},"arn":"arn:aws:s3:::nfox-testing-vector"},"object":{"key":"test-log.txt","size":33,"eTag":"c981ce6672c4251048b0b834e334007f","sequencer":"00623CC9C47AB5634C"}}}]}"#,
1139+
)
1140+
.unwrap();
1141+
1142+
s3_event.records[0].s3.bucket.name.clone_from(&bucket);
1143+
s3_event.records[0].s3.object.key.clone_from(&key);
1144+
1145+
sqs_client
1146+
.send_message()
1147+
.queue_url(queue.clone())
1148+
.message_body(serde_json::to_string(&s3_event).unwrap())
1149+
.send()
1150+
.await
1151+
.unwrap();
1152+
1153+
let (tx, rx) = SourceSender::new_test_finalize(Delivered);
1154+
let cx = SourceContext::new_test(tx, None);
1155+
let source = config.build(cx).await.unwrap();
1156+
tokio::spawn(async move { source.await.unwrap() });
1157+
1158+
let events = collect_n(rx, logs.len()).await;
1159+
assert_eq!(logs.len(), events.len());
1160+
1161+
tokio::time::sleep(Duration::from_secs(10)).await;
1162+
assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
1163+
}
1164+
1165+
#[tokio::test]
1166+
async fn s3_process_message_without_sqs_auth_uses_default() {
1167+
trace_init();
1168+
1169+
let logs: Vec<String> = random_lines(100).take(10).collect();
1170+
1171+
// Test backwards compatibility - config without sqs.auth should work
1172+
test_event(
1173+
None,
1174+
None,
1175+
None,
1176+
None,
1177+
logs.join("\n").into_bytes(),
1178+
logs,
1179+
Delivered,
1180+
false,
1181+
DeserializerConfig::Bytes,
1182+
None,
1183+
)
1184+
.await;
1185+
}
1186+
1187+
#[tokio::test]
1188+
async fn s3_process_message_with_deferred_auth() {
1189+
trace_init();
1190+
1191+
let s3 = s3_client().await;
1192+
let sqs = sqs_client().await;
1193+
let queue = create_queue(&sqs).await;
1194+
let deferred_queue = create_queue(&sqs).await;
1195+
let _bucket = create_bucket(&s3).await;
1196+
1197+
tokio::time::sleep(Duration::from_secs(1)).await;
1198+
1199+
let mut config = config(&queue, None, false, DeserializerConfig::Bytes);
1200+
1201+
// Add deferred queue with custom auth
1202+
config.sqs.as_mut().unwrap().deferred = Some(sqs::DeferredConfig {
1203+
queue_url: deferred_queue.clone(),
1204+
max_age_secs: 0, // Set to 0 so all messages are deferred
1205+
auth: Some(AwsAuthentication::AccessKey {
1206+
access_key_id: "DEFERRED_KEY_ID".to_string().into(),
1207+
secret_access_key: "DEFERRED_SECRET_KEY".to_string().into(),
1208+
session_token: None,
1209+
assume_role: None,
1210+
external_id: None,
1211+
region: None,
1212+
session_name: None,
1213+
}),
1214+
});
1215+
1216+
// Verify the source builds correctly with deferred auth
1217+
let (tx, _rx) = SourceSender::new_test();
1218+
let cx = SourceContext::new_test(tx, None);
1219+
let source = config.build(cx).await;
1220+
assert!(source.is_ok(), "Source should build successfully with deferred auth");
1221+
}
10711222
}

src/sources/aws_s3/sqs.rs

Lines changed: 149 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use vector_lib::{
4343

4444
use crate::{
4545
SourceSender,
46-
aws::AwsTimeout,
46+
aws::{AwsAuthentication, AwsTimeout},
4747
codecs::Decoder,
4848
common::backoff::ExponentialBackoff,
4949
config::{SourceAcknowledgementsConfig, SourceContext},
@@ -83,6 +83,15 @@ pub(super) struct DeferredConfig {
8383
#[configurable(metadata(docs::type_unit = "seconds"))]
8484
#[configurable(metadata(docs::examples = 3600))]
8585
pub(super) max_age_secs: u64,
86+
87+
/// Authentication configuration for the deferred queue.
88+
///
89+
/// If not specified, the main `auth` configuration will be used.
90+
///
91+
/// This allows the deferred queue to be in a different AWS account or use different credentials.
92+
#[configurable(derived)]
93+
#[serde(default)]
94+
pub(super) auth: Option<AwsAuthentication>,
8695
}
8796

8897
/// SQS configuration options.
@@ -180,6 +189,16 @@ pub(super) struct Config {
180189
#[serde(flatten)]
181190
pub(super) timeout: Option<AwsTimeout>,
182191

192+
/// Authentication configuration for SQS.
193+
///
194+
/// If not specified, the main `auth` configuration will be used.
195+
///
196+
/// This allows SQS to use different credentials than S3, enabling cross-account scenarios
197+
/// where the S3 bucket and SQS queue are in different AWS accounts.
198+
#[configurable(derived)]
199+
#[serde(default)]
200+
pub(super) auth: Option<AwsAuthentication>,
201+
183202
/// Configuration for deferring events to another queue based on their age.
184203
#[configurable(derived)]
185204
pub(super) deferred: Option<DeferredConfig>,
@@ -279,6 +298,7 @@ pub struct State {
279298

280299
s3_client: S3Client,
281300
sqs_client: SqsClient,
301+
deferred_sqs_client: Option<SqsClient>,
282302

283303
multiline: Option<line_agg::Config>,
284304
compression: super::Compression,
@@ -303,6 +323,7 @@ impl Ingestor {
303323
pub(super) async fn new(
304324
region: Region,
305325
sqs_client: SqsClient,
326+
deferred_sqs_client: Option<SqsClient>,
306327
s3_client: S3Client,
307328
config: Config,
308329
compression: super::Compression,
@@ -319,6 +340,7 @@ impl Ingestor {
319340

320341
s3_client,
321342
sqs_client,
343+
deferred_sqs_client,
322344

323345
compression,
324346
multiline,
@@ -896,8 +918,14 @@ impl IngestorProcess {
896918
entries: Vec<SendMessageBatchRequestEntry>,
897919
queue_url: String,
898920
) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
899-
self.state
900-
.sqs_client
921+
// Use deferred SQS client if available, otherwise use main client
922+
let client = self
923+
.state
924+
.deferred_sqs_client
925+
.as_ref()
926+
.unwrap_or(&self.state.sqs_client);
927+
928+
client
901929
.send_message_batch()
902930
.queue_url(queue_url.clone())
903931
.set_entries(Some(entries))
@@ -1299,3 +1327,121 @@ fn parse_sqs_config() {
12991327
);
13001328
assert!(test.is_err());
13011329
}
1330+
1331+
#[test]
1332+
fn parse_sqs_config_with_custom_auth() {
1333+
let config: Config = toml::from_str(
1334+
r#"
1335+
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1336+
[auth]
1337+
access_key_id = "AKIAIOSFODNN7EXAMPLE"
1338+
secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
1339+
"#,
1340+
)
1341+
.unwrap();
1342+
assert_eq!(
1343+
config.queue_url,
1344+
"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1345+
);
1346+
assert!(config.auth.is_some());
1347+
1348+
// Verify the auth was parsed correctly
1349+
match config.auth.unwrap() {
1350+
AwsAuthentication::AccessKey {
1351+
access_key_id,
1352+
secret_access_key,
1353+
..
1354+
} => {
1355+
assert_eq!(access_key_id.inner(), "AKIAIOSFODNN7EXAMPLE");
1356+
assert_eq!(
1357+
secret_access_key.inner(),
1358+
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
1359+
);
1360+
}
1361+
_ => panic!("Expected AccessKey auth variant"),
1362+
}
1363+
}
1364+
1365+
#[test]
1366+
fn parse_sqs_config_with_assume_role() {
1367+
let config: Config = toml::from_str(
1368+
r#"
1369+
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1370+
[auth]
1371+
assume_role = "arn:aws:iam::123456789012:role/SQSRole"
1372+
external_id = "external123"
1373+
"#,
1374+
)
1375+
.unwrap();
1376+
assert_eq!(
1377+
config.queue_url,
1378+
"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1379+
);
1380+
assert!(config.auth.is_some());
1381+
1382+
// Verify role assumption was parsed correctly
1383+
match config.auth.unwrap() {
1384+
AwsAuthentication::Role {
1385+
assume_role,
1386+
external_id,
1387+
..
1388+
} => {
1389+
assert_eq!(assume_role, "arn:aws:iam::123456789012:role/SQSRole");
1390+
assert_eq!(external_id, Some("external123".to_string()));
1391+
}
1392+
_ => panic!("Expected Role auth variant"),
1393+
}
1394+
}
1395+
1396+
#[test]
1397+
fn parse_sqs_config_without_auth_uses_default() {
1398+
let config: Config = toml::from_str(
1399+
r#"
1400+
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1401+
"#,
1402+
)
1403+
.unwrap();
1404+
assert_eq!(
1405+
config.queue_url,
1406+
"https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1407+
);
1408+
// Auth should be None, allowing fallback to main auth
1409+
assert!(config.auth.is_none());
1410+
}
1411+
1412+
#[test]
1413+
fn parse_deferred_config_with_custom_auth() {
1414+
let config: Config = toml::from_str(
1415+
r#"
1416+
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1417+
[deferred]
1418+
queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1419+
max_age_secs = 3600
1420+
[deferred.auth]
1421+
access_key_id = "AKIADEFERREDEXAMPLE"
1422+
secret_access_key = "deferredSecretKeyExample"
1423+
"#,
1424+
)
1425+
.unwrap();
1426+
1427+
let deferred = config.deferred.expect("Expected deferred config");
1428+
assert_eq!(
1429+
deferred.queue_url,
1430+
"https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1431+
);
1432+
assert_eq!(deferred.max_age_secs, 3600);
1433+
assert!(deferred.auth.is_some());
1434+
1435+
// Verify deferred auth was parsed correctly
1436+
match deferred.auth.unwrap() {
1437+
AwsAuthentication::AccessKey {
1438+
access_key_id,
1439+
secret_access_key,
1440+
..
1441+
} => {
1442+
assert_eq!(access_key_id.inner(), "AKIADEFERREDEXAMPLE");
1443+
assert_eq!(secret_access_key.inner(), "deferredSecretKeyExample");
1444+
}
1445+
_ => panic!("Expected AccessKey auth variant"),
1446+
}
1447+
}

0 commit comments

Comments
 (0)