Skip to content

Commit 3559108

Browse files
authored
Merge pull request #456 from eberle1080/main
Implement S3 Support
2 parents 4ee1c5d + b1eeebe commit 3559108

File tree

23 files changed

+1858
-38
lines changed

23 files changed

+1858
-38
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ SDKs:
4545

4646
</div>
4747

48-
Outpost is a self-hosted and open-source infrastructure that enables event producers to add outbound webhooks and [Event Destinations](https://eventdestinations.org?ref=github-outpost) to their platform with support for destination types such as Webhooks, Hookdeck Event Gateway, Amazon EventBridge, AWS SQS, AWS SNS, GCP Pub/Sub, RabbitMQ, and Kafka.
48+
Outpost is a self-hosted and open-source infrastructure that enables event producers to add outbound webhooks and [Event Destinations](https://eventdestinations.org?ref=github-outpost) to their platform with support for destination types such as Webhooks, Hookdeck Event Gateway, Amazon EventBridge, AWS SQS, AWS S3, GCP Pub/Sub, RabbitMQ, and Kafka.
4949

5050
The Outpost runtime has minimal dependencies (Redis, PostgreSQL or Clickhouse, and one of the supported message queues), is 100% backward compatible with your existing webhooks implementation and is optimized for high-throughput, low-cost operation.
5151

@@ -66,7 +66,7 @@ Read [Outpost Concepts](https://outpost.hookdeck.com/docs/concepts) to learn mor
6666
- **User portal**: Allow customers to view metrics, manage, debug, and observe their event destinations.
6767
- **Delivery failure alerts**: Manage event delivery failure alerts.
6868
- **OpenTelemetry**: OTel standardized traces, metrics, and logs.
69-
- **Event destination types**: Out of the box support for Webhooks, Hookdeck Event Gateway, Amazon EventBridge, AWS SQS, AWS SNS. GCP Pub/Sub, RabbitMQ, and Kafka.
69+
- **Event destination types**: Out of the box support for Webhooks, Hookdeck Event Gateway, Amazon EventBridge, AWS SQS, AWS S3, GCP Pub/Sub, RabbitMQ, and Kafka.
7070
- **Webhook best practices**: Opt-out webhook best practices, such as headers for idempotency, timestamp and signature, and signature rotation.
7171
- **SDKs and MCP server**: Go, Python, and TypeScript SDK are available. Outpost also ships with an MCP server. All generated by [Speakeasy](https://speakeasy.com).
7272

build/dev/deps/compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ services:
7373
aws:
7474
image: localstack/localstack:latest
7575
environment:
76-
- SERVICES=sns,sts,sqs,kinesis
76+
- SERVICES=s3,sns,sts,sqs,kinesis
7777
ports:
7878
- 4566:4566
7979
- 4571:4571

build/test/compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ services:
2828
aws:
2929
image: localstack/localstack:latest
3030
environment:
31-
- SERVICES=sns,sts,sqs,kinesis
31+
- SERVICES=s3,sns,sts,sqs,kinesis
3232
ports:
3333
- 34566:4566
3434
- 34571:4571

cmd/destinations/awss3/main.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"strings"
10+
"syscall"
11+
"time"
12+
13+
"github.com/aws/aws-sdk-go-v2/aws"
14+
"github.com/aws/aws-sdk-go-v2/config"
15+
"github.com/aws/aws-sdk-go-v2/credentials"
16+
"github.com/aws/aws-sdk-go-v2/service/s3"
17+
)
18+
19+
// Change these constants to match your AWS S3 configuration
20+
21+
const (
22+
S3Region = "us-east-1"
23+
AWSCredentials = "aws_key:aws_secret:aws_session"
24+
S3Bucket = "destination_s3_bucket"
25+
)
26+
27+
// This program monitors an AWS S3 bucket for new objects and logs them.
28+
// Note that this is meant for demonstration purposes and shouldn't be
29+
// used on large buckets or in production without proper error handling
30+
// and optimizations. Listing objects in a bucket is slow. For large
31+
// buckets, consider using S3 event notifications or a more efficient
32+
// mechanism to track new objects.
33+
func main() {
34+
if err := run(); err != nil {
35+
panic(err)
36+
}
37+
}
38+
39+
func run() error {
40+
ctx := context.Background()
41+
42+
credsParts := strings.Split(AWSCredentials, ":")
43+
if len(credsParts) != 3 {
44+
return fmt.Errorf("invalid AWS credentials format")
45+
}
46+
47+
// Set up AWS configuration with the provided credentials
48+
awsCfg, err := config.LoadDefaultConfig(ctx,
49+
config.WithRegion(S3Region),
50+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
51+
credsParts[0], credsParts[1], credsParts[2],
52+
)),
53+
)
54+
if err != nil {
55+
return err
56+
}
57+
58+
// Create an S3 client using the loaded configuration
59+
s3Client := s3.NewFromConfig(awsCfg)
60+
61+
// Listen for termination signals
62+
termChan := make(chan os.Signal, 1)
63+
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
64+
65+
// Maintain a list of seen objects to avoid logging duplicates
66+
seen := make(map[string]struct{})
67+
68+
// First pass to populate the seen map with existing objects
69+
log.Printf("[*] Initializing S3 bucket monitoring...\n\tBucket: %s", S3Bucket)
70+
if err := checkForNewObjects(ctx, s3Client, seen, false); err != nil {
71+
return err
72+
}
73+
74+
// Check for new objects every 10 seconds
75+
ticker := time.NewTicker(10 * time.Second)
76+
defer ticker.Stop()
77+
78+
// Start a goroutine to periodically check for new objects
79+
// and listen for termination signals.
80+
go func() {
81+
for {
82+
select {
83+
case <-ticker.C:
84+
if err := checkForNewObjects(ctx, s3Client, seen, true); err != nil {
85+
log.Printf("[*] error listing bucket: %v", err)
86+
}
87+
case <-termChan:
88+
return
89+
}
90+
}
91+
}()
92+
93+
log.Printf("[*] Ready to monitor S3 bucket.\n\tBucket: %s", S3Bucket)
94+
log.Printf("[*] Waiting for new objects. To exit press CTRL+C")
95+
96+
<-termChan
97+
98+
return nil
99+
}
100+
101+
// checkForNewObjects lists objects in the S3 bucket and logs new ones.
102+
func checkForNewObjects(ctx context.Context, client *s3.Client, seen map[string]struct{}, logNew bool) error {
103+
var token *string
104+
for {
105+
out, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
106+
Bucket: aws.String(S3Bucket),
107+
ContinuationToken: token,
108+
})
109+
if err != nil {
110+
return err
111+
}
112+
for _, obj := range out.Contents {
113+
key := aws.ToString(obj.Key)
114+
if _, ok := seen[key]; !ok {
115+
seen[key] = struct{}{}
116+
if logNew {
117+
log.Printf("[x] New object: %s (%d bytes)", key, obj.Size)
118+
}
119+
}
120+
}
121+
if out.IsTruncated == nil || !*out.IsTruncated {
122+
break
123+
}
124+
token = out.NextContinuationToken
125+
}
126+
return nil
127+
}

contributing/mq.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ This document will focus on the first two types of integrations. The third type,
2222
## Supported MQs
2323

2424
- [x] AWS SQS
25-
- [ ] GCP PubSub
26-
- [ ] Azure ServiceBus
25+
- [x] GCP PubSub
26+
- [x] Azure ServiceBus
2727
- [x] RabbitMQ (AMQP 0.9.1)
2828

2929
publishmq only:

0 commit comments

Comments
 (0)