Skip to content

Commit b0f2ca0

Browse files
authored
S3 datasource: add support for SNS format (#3716)
1 parent 4528ed6 commit b0f2ca0

File tree

2 files changed

+74
-6
lines changed

2 files changed

+74
-6
lines changed

pkg/acquisition/modules/s3/s3.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,18 @@ type S3Event struct {
9393
} `json:"detail"`
9494
}
9595

96+
// For events that are published to SQS by SNS
97+
// We only care about the message itself, the other SNS metadata are not needed
98+
type SNSEvent struct {
99+
Message string `json:"Message"`
100+
}
101+
96102
const (
97103
PollMethodList = "list"
98104
PollMethodSQS = "sqs"
99105
SQSFormatEventBridge = "eventbridge"
100106
SQSFormatS3Notification = "s3notification"
107+
SQSFormatSNS = "sns"
101108
)
102109

103110
var linesRead = prometheus.NewCounterVec(
@@ -296,6 +303,16 @@ func extractBucketAndPrefixFromS3Notif(message *string) (string, string, error)
296303
return s3notifBody.Records[0].S3.Bucket.Name, s3notifBody.Records[0].S3.Object.Key, nil
297304
}
298305

306+
func extractBucketAndPrefixFromSNSNotif(message *string) (string, string, error) {
307+
snsBody := SNSEvent{}
308+
err := json.Unmarshal([]byte(*message), &snsBody)
309+
if err != nil {
310+
return "", "", err
311+
}
312+
//It's just a SQS message wrapped in SNS
313+
return extractBucketAndPrefixFromS3Notif(&snsBody.Message)
314+
}
315+
299316
func (s *S3Source) extractBucketAndPrefix(message *string) (string, string, error) {
300317
switch s.Config.SQSFormat {
301318
case SQSFormatEventBridge:
@@ -310,6 +327,12 @@ func (s *S3Source) extractBucketAndPrefix(message *string) (string, string, erro
310327
return "", "", err
311328
}
312329
return bucket, key, nil
330+
case SQSFormatSNS:
331+
bucket, key, err := extractBucketAndPrefixFromSNSNotif(message)
332+
if err != nil {
333+
return "", "", err
334+
}
335+
return bucket, key, nil
313336
default:
314337
bucket, key, err := extractBucketAndPrefixFromEventBridge(message)
315338
if err == nil {
@@ -321,6 +344,11 @@ func (s *S3Source) extractBucketAndPrefix(message *string) (string, string, erro
321344
s.Config.SQSFormat = SQSFormatS3Notification
322345
return bucket, key, nil
323346
}
347+
bucket, key, err = extractBucketAndPrefixFromSNSNotif(message)
348+
if err == nil {
349+
s.Config.SQSFormat = SQSFormatSNS
350+
return bucket, key, nil
351+
}
324352
return "", "", errors.New("SQS message format not supported")
325353
}
326354
}
@@ -506,8 +534,8 @@ func (s *S3Source) UnmarshalConfig(yamlConfig []byte) error {
506534
return errors.New("bucket_name is required")
507535
}
508536

509-
if s.Config.SQSFormat != "" && s.Config.SQSFormat != SQSFormatEventBridge && s.Config.SQSFormat != SQSFormatS3Notification {
510-
return fmt.Errorf("invalid sqs_format %s, must be empty, %s or %s", s.Config.SQSFormat, SQSFormatEventBridge, SQSFormatS3Notification)
537+
if s.Config.SQSFormat != "" && s.Config.SQSFormat != SQSFormatEventBridge && s.Config.SQSFormat != SQSFormatS3Notification && s.Config.SQSFormat != SQSFormatSNS {
538+
return fmt.Errorf("invalid sqs_format %s, must be empty, %s, %s or %s", s.Config.SQSFormat, SQSFormatEventBridge, SQSFormatS3Notification, SQSFormatSNS)
511539
}
512540

513541
return nil

pkg/acquisition/modules/s3/s3_test.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,31 @@ func (msqs mockSQSClientNotif) DeleteMessage(input *sqs.DeleteMessageInput) (*sq
210210
return &sqs.DeleteMessageOutput{}, nil
211211
}
212212

213+
type mockSQSClientSNS struct {
214+
sqsiface.SQSAPI
215+
counter *int32
216+
}
217+
218+
func (msqs mockSQSClientSNS) ReceiveMessageWithContext(ctx context.Context, input *sqs.ReceiveMessageInput, options ...request.Option) (*sqs.ReceiveMessageOutput, error) {
219+
if atomic.LoadInt32(msqs.counter) == 1 {
220+
return &sqs.ReceiveMessageOutput{}, nil
221+
}
222+
atomic.AddInt32(msqs.counter, 1)
223+
return &sqs.ReceiveMessageOutput{
224+
Messages: []*sqs.Message{
225+
{
226+
Body: aws.String(`
227+
{"Type":"Notification","MessageId":"95f3b5d2-c347-577e-b07d-d535ff80d9c4","TopicArn":"arn:aws:sns:eu-west-1:309081560286:s3-notif","Subject":"Amazon S3 Notification","Message":"{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2025-07-08T15:34:31.272Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:xxx:[email protected]\"},\"requestParameters\":{\"sourceIPAddress\":\"1.1.1.1\"},\"responseElements\":{\"x-amz-request-id\":\"F8PK5SP9MC5R76F5\",\"x-amz-id-2\":\"dEZVAhJ9ufBn3ufcJH8wzRw2bfiwGzqaq4iQ9rYKkScQ3o4fGjbqe4dWCAPNwc1khCVKRSbfRwD9HXgDElOHcZazOIBxVY1l\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"test\",\"bucket\":{\"name\":\"my_bucket\",\"ownerIdentity\":{\"principalId\":\"A2BHZN7P6G2N16\"},\"arn\":\"arn:aws:s3:::my_bucket\"},\"object\":{\"key\":\"foo.log\",\"size\":3,\"eTag\":\"50a2fabfdd276f573ff97ace8b11c5f4\",\"sequencer\":\"00686D3A8738EE3CA0\"}}}]}","Timestamp":"2025-07-08T15:34:31.803Z","SignatureVersion":"1","Signature":"lkkFr7lYAUEBl6CPPDUDg1D1/zRToR2a9M1MnAmzC8pN33VQf1m+lUQJAgAOKUNxHfIUx1grFyxFQa+84/+edpE4tdhwr0bJ3QELlmJd0xot2pdoc2syrBC1Yq/3IsGc3ZIIIyyG9FXW0Q60aQeZAkx9XQC0tUQDwc8d3kef8CzN5i+ys3QXtX+7KUzj1tNoWQSCcjzqid3JSSyJzRZRD1/0Zkvnd3XVBXaM/QTtin1/Ja8uEObHw9AOy+oi/CygjREBaRzYUBdQHY7/kiA1sdDiSqkyEZ0uSu36aA8A4LO1O6ltP/h4avN8LARmgkdcVbGoPKZIu6Xe5tYvOdJKeA==","SigningCertURL":"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-9c6465fa7f48f5cacd23014631ec1136.pem","UnsubscribeURL":"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:309081560286:s3-notif:acfdadc0-43d0-48ba-81c9-052bd253febe"}
228+
`),
229+
},
230+
},
231+
}, nil
232+
}
233+
234+
func (msqs mockSQSClientSNS) DeleteMessage(input *sqs.DeleteMessageInput) (*sqs.DeleteMessageOutput, error) {
235+
return &sqs.DeleteMessageOutput{}, nil
236+
}
237+
213238
func TestDSNAcquis(t *testing.T) {
214239
ctx := t.Context()
215240
tests := []struct {
@@ -368,7 +393,7 @@ polling_method: sqs
368393
sqs_name: test
369394
`,
370395
expectedCount: 2,
371-
notifType: "eventbridge",
396+
notifType: SQSFormatEventBridge,
372397
},
373398
{
374399
name: "notification",
@@ -378,7 +403,17 @@ polling_method: sqs
378403
sqs_name: test
379404
`,
380405
expectedCount: 2,
381-
notifType: "notification",
406+
notifType: SQSFormatS3Notification,
407+
},
408+
{
409+
name: "sns",
410+
config: `
411+
source: s3
412+
polling_method: sqs
413+
sqs_name: test
414+
`,
415+
expectedCount: 2,
416+
notifType: SQSFormatSNS,
382417
},
383418
}
384419
for _, test := range tests {
@@ -396,10 +431,15 @@ sqs_name: test
396431

397432
counter := int32(0)
398433
f.s3Client = mockS3Client{}
399-
if test.notifType == "eventbridge" {
434+
switch test.notifType {
435+
case SQSFormatEventBridge:
400436
f.sqsClient = mockSQSClient{counter: &counter}
401-
} else {
437+
case SQSFormatS3Notification:
402438
f.sqsClient = mockSQSClientNotif{counter: &counter}
439+
case SQSFormatSNS:
440+
f.sqsClient = mockSQSClientSNS{counter: &counter}
441+
default:
442+
t.Fatalf("unknown notification type %s", test.notifType)
403443
}
404444

405445
out := make(chan types.Event)

0 commit comments

Comments
 (0)