Skip to content

Commit 146bcc8

Browse files
authored
chore: support timestamp-ms delivery metadata + fix load test scripts (#507)
* chore: support timestamp-ms delivery metadata * fix: update loadtest script to use ms timestamp * fix: loadtest event verification * docs: loadtest contributing documentation * fix: destawss3 publisher
1 parent 0551d43 commit 146bcc8

31 files changed

+303
-97
lines changed

contributing/loadtest/overview.md

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# Load Testing Overview
2+
3+
## Prerequisites
4+
5+
- k6 installed
6+
- Docker running
7+
- Outpost deployment with API key
8+
- Node.js (for TypeScript compilation)
9+
10+
## Two-Phase Load Test
11+
12+
### 1. Throughput Test
13+
Creates a tenant with one webhook destination and publishes events at a configured rate. Event IDs are stored in Redis for verification.
14+
15+
### 2. Verification Test
16+
Queries the mock webhook to confirm delivery and measure latency metrics:
17+
- End-to-end latency (publish to delivery)
18+
- Receive latency (publish to Outpost receipt)
19+
- Internal latency (Outpost processing time)
20+
21+
## Setup
22+
23+
### Start Supporting Services
24+
25+
```bash
26+
cd loadtest
27+
docker-compose up -d
28+
```
29+
30+
This starts:
31+
- **Redis** (`localhost:46379`): Coordinates test state between throughput and verification phases
32+
- **Mock Webhook** (`localhost:48080`): Receives webhook deliveries and stores them for verification
33+
34+
### Configure Environment
35+
36+
Use `loadtest/config/environments/local.json` or create a new one (e.g., `staging.json`):
37+
38+
```json
39+
{
40+
"name": "local",
41+
"api": {
42+
"baseUrl": "http://localhost:3333",
43+
"timeout": "30s"
44+
},
45+
"mockWebhook": {
46+
"url": "http://localhost:48080",
47+
"destinationUrl": "http://host.docker.internal:48080",
48+
"verificationPollTimeout": "5s"
49+
},
50+
"redis": "redis://localhost:46379"
51+
}
52+
```
53+
54+
**Critical:** `mockWebhook.destinationUrl` must be accessible from your Outpost deployment:
55+
- **Local Outpost in Docker**: `http://host.docker.internal:48080`
56+
- **Local Outpost in Kubernetes**: `http://host.docker.internal:48080`
57+
- **Remote Outpost**: Expose mock webhook publicly (e.g., ngrok tunnel) and use that URL
58+
59+
The mock webhook must be reachable by Outpost for event delivery to succeed.
60+
61+
### Configure Scenario
62+
63+
Use the default `basic.json` scenario, edit it locally, or create a new one.
64+
65+
Default scenario at `loadtest/config/scenarios/events-throughput/basic.json`:
66+
67+
```json
68+
{
69+
"options": {
70+
"scenarios": {
71+
"events": {
72+
"rate": 100,
73+
"timeUnit": "1s",
74+
"duration": "30s",
75+
"preAllocatedVUs": 20
76+
}
77+
}
78+
}
79+
}
80+
```
81+
82+
To create a new scenario, add a file (e.g., `high-load.json`) in the same directory and reference it with `--scenario high-load`.
83+
84+
## Running Tests
85+
86+
### Throughput Test
87+
88+
```bash
89+
export API_KEY=your-api-key
90+
export TESTID=$(date +%s)
91+
92+
./run-test.sh events-throughput --environment local --scenario basic
93+
```
94+
95+
### Verification Test
96+
97+
```bash
98+
# Use same TESTID from throughput test
99+
# MAX_ITERATIONS = rate × duration (e.g., 100 × 30 = 3000)
100+
MAX_ITERATIONS=3000 ./run-test.sh events-verify --environment local --scenario basic
101+
```
102+
103+
## Mock Webhook
104+
105+
The mock webhook service provides:
106+
- `POST /webhook`: Receives event deliveries from Outpost
107+
- `GET /events/{eventId}`: Returns event details for verification
108+
- `GET /health`: Service status
109+
110+
Events are stored in an LRU cache with 10-minute expiration.
111+
112+
**Network Requirements:**
113+
- k6 must reach mock webhook at `mockWebhook.url` to verify deliveries
114+
- Outpost must reach mock webhook at `mockWebhook.destinationUrl` to deliver events
115+
- For remote Outpost deployments, expose the mock webhook via tunnel or public endpoint
116+
117+
## Cleanup
118+
119+
```bash
120+
docker-compose down
121+
```

internal/config/destinations.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99

1010
// DestinationsConfig is the main configuration for all destination types
1111
type DestinationsConfig struct {
12-
MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set." required:"N"`
13-
Webhook DestinationWebhookConfig `yaml:"webhook" desc:"Configuration specific to webhook destinations."`
14-
AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis" desc:"Configuration specific to AWS Kinesis destinations."`
12+
MetadataPath string `yaml:"metadata_path" env:"DESTINATIONS_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set." required:"N"`
13+
IncludeMillisecondTimestamp bool `yaml:"include_millisecond_timestamp" env:"DESTINATIONS_INCLUDE_MILLISECOND_TIMESTAMP" desc:"If true, includes a 'timestamp-ms' field with millisecond precision in destination metadata. Useful for load testing and debugging." required:"N"`
14+
Webhook DestinationWebhookConfig `yaml:"webhook" desc:"Configuration specific to webhook destinations."`
15+
AWSKinesis DestinationAWSKinesisConfig `yaml:"aws_kinesis" desc:"Configuration specific to AWS Kinesis destinations."`
1516
}
1617

1718
func (c *DestinationsConfig) ToConfig(cfg *Config) destregistrydefault.RegisterDefaultDestinationOptions {
@@ -25,9 +26,10 @@ func (c *DestinationsConfig) ToConfig(cfg *Config) destregistrydefault.RegisterD
2526
}
2627

2728
return destregistrydefault.RegisterDefaultDestinationOptions{
28-
UserAgent: userAgent,
29-
Webhook: c.Webhook.toConfig(),
30-
AWSKinesis: c.AWSKinesis.toConfig(),
29+
UserAgent: userAgent,
30+
IncludeMillisecondTimestamp: c.IncludeMillisecondTimestamp,
31+
Webhook: c.Webhook.toConfig(),
32+
AWSKinesis: c.AWSKinesis.toConfig(),
3133
}
3234
}
3335

internal/destregistry/baseprovider.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,28 @@ func ObfuscateValue(value string) string {
2525

2626
// BaseProvider provides common functionality for all destination providers
2727
type BaseProvider struct {
28-
metadata *metadata.ProviderMetadata
28+
metadata *metadata.ProviderMetadata
29+
basePublisherOpts []BasePublisherOption
2930
}
3031

3132
// NewBaseProvider creates a new base provider with loaded metadata
32-
func NewBaseProvider(loader metadata.MetadataLoader, providerType string) (*BaseProvider, error) {
33+
func NewBaseProvider(loader metadata.MetadataLoader, providerType string, opts ...BasePublisherOption) (*BaseProvider, error) {
3334
meta, err := loader.Load(providerType)
3435
if err != nil {
3536
return nil, fmt.Errorf("loading provider metadata: %w", err)
3637
}
3738

3839
return &BaseProvider{
39-
metadata: meta,
40+
metadata: meta,
41+
basePublisherOpts: opts,
4042
}, nil
4143
}
4244

45+
// NewPublisher creates a BasePublisher with provider-configured options
46+
func (p *BaseProvider) NewPublisher() *BasePublisher {
47+
return NewBasePublisher(p.basePublisherOpts...)
48+
}
49+
4350
// Metadata returns the provider metadata
4451
func (p *BaseProvider) Metadata() *metadata.ProviderMetadata {
4552
return p.metadata

internal/destregistry/basepublisher.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,28 @@ import (
1111

1212
// BasePublisher provides common publisher functionality
1313
type BasePublisher struct {
14-
active sync.WaitGroup
15-
closed atomic.Bool
14+
active sync.WaitGroup
15+
closed atomic.Bool
16+
includeMillisecondTimestamp bool
17+
}
18+
19+
// BasePublisherOption is a functional option for configuring BasePublisher
20+
type BasePublisherOption func(*BasePublisher)
21+
22+
// WithMillisecondTimestamp enables millisecond-precision timestamp in metadata
23+
func WithMillisecondTimestamp(enabled bool) BasePublisherOption {
24+
return func(p *BasePublisher) {
25+
p.includeMillisecondTimestamp = enabled
26+
}
27+
}
28+
29+
// NewBasePublisher creates a new BasePublisher with the given options
30+
func NewBasePublisher(opts ...BasePublisherOption) *BasePublisher {
31+
p := &BasePublisher{}
32+
for _, opt := range opts {
33+
opt(p)
34+
}
35+
return p
1636
}
1737

1838
// StartPublish returns error if publisher is closed, otherwise adds to waitgroup
@@ -41,6 +61,12 @@ func (p *BasePublisher) MakeMetadata(event *models.Event, timestamp time.Time) m
4161
"event-id": event.ID,
4262
"topic": event.Topic,
4363
}
64+
65+
// Add millisecond timestamp if enabled
66+
if p.includeMillisecondTimestamp {
67+
systemMetadata["timestamp-ms"] = fmt.Sprintf("%d", timestamp.UnixMilli())
68+
}
69+
4470
metadata := make(map[string]string)
4571
for k, v := range systemMetadata {
4672
metadata[k] = v

internal/destregistry/providers/default.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ type DestAWSKinesisConfig struct {
2828
}
2929

3030
type RegisterDefaultDestinationOptions struct {
31-
UserAgent string
32-
Webhook *DestWebhookConfig
33-
AWSKinesis *DestAWSKinesisConfig
31+
UserAgent string
32+
IncludeMillisecondTimestamp bool
33+
Webhook *DestWebhookConfig
34+
AWSKinesis *DestAWSKinesisConfig
3435
}
3536

3637
// RegisterDefault registers the default destination providers with the registry.
@@ -39,6 +40,12 @@ type RegisterDefaultDestinationOptions struct {
3940
func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestinationOptions) error {
4041
loader := registry.MetadataLoader()
4142

43+
// Build base publisher options that apply to all providers
44+
basePublisherOpts := []destregistry.BasePublisherOption{}
45+
if opts.IncludeMillisecondTimestamp {
46+
basePublisherOpts = append(basePublisherOpts, destregistry.WithMillisecondTimestamp(opts.IncludeMillisecondTimestamp))
47+
}
48+
4249
webhookOpts := []destwebhook.Option{
4350
destwebhook.WithUserAgent(opts.UserAgent),
4451
}
@@ -55,20 +62,20 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina
5562
destwebhook.WithSignatureAlgorithm(opts.Webhook.SignatureAlgorithm),
5663
)
5764
}
58-
webhook, err := destwebhook.New(loader, webhookOpts...)
65+
webhook, err := destwebhook.New(loader, basePublisherOpts, webhookOpts...)
5966
if err != nil {
6067
return err
6168
}
6269
registry.RegisterProvider("webhook", webhook)
6370

64-
hookdeck, err := desthookdeck.New(loader,
71+
hookdeck, err := desthookdeck.New(loader, basePublisherOpts,
6572
desthookdeck.WithUserAgent(opts.UserAgent))
6673
if err != nil {
6774
return err
6875
}
6976
registry.RegisterProvider("hookdeck", hookdeck)
7077

71-
awsSQS, err := destawssqs.New(loader)
78+
awsSQS, err := destawssqs.New(loader, basePublisherOpts)
7279
if err != nil {
7380
return err
7481
}
@@ -80,25 +87,25 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina
8087
destawskinesis.WithMetadataInPayload(opts.AWSKinesis.MetadataInPayload),
8188
)
8289
}
83-
awsKinesis, err := destawskinesis.New(loader, awsKinesisOpts...)
90+
awsKinesis, err := destawskinesis.New(loader, basePublisherOpts, awsKinesisOpts...)
8491
if err != nil {
8592
return err
8693
}
8794
registry.RegisterProvider("aws_kinesis", awsKinesis)
8895

89-
awsS3, err := destawss3.New(loader)
96+
awsS3, err := destawss3.New(loader, basePublisherOpts)
9097
if err != nil {
9198
return err
9299
}
93100
registry.RegisterProvider("aws_s3", awsS3)
94101

95-
azureServiceBus, err := destazureservicebus.New(loader)
102+
azureServiceBus, err := destazureservicebus.New(loader, basePublisherOpts)
96103
if err != nil {
97104
return err
98105
}
99106
registry.RegisterProvider("azure_servicebus", azureServiceBus)
100107

101-
rabbitmq, err := destrabbitmq.New(loader)
108+
rabbitmq, err := destrabbitmq.New(loader, basePublisherOpts)
102109
if err != nil {
103110
return err
104111
}

internal/destregistry/providers/destawskinesis/destawskinesis.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ func WithMetadataInPayload(include bool) Option {
5151
}
5252

5353
// Constructor
54-
func New(loader metadata.MetadataLoader, opts ...Option) (*AWSKinesisProvider, error) {
55-
base, err := destregistry.NewBaseProvider(loader, "aws_kinesis")
54+
func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption, opts ...Option) (*AWSKinesisProvider, error) {
55+
base, err := destregistry.NewBaseProvider(loader, "aws_kinesis", basePublisherOpts...)
5656
if err != nil {
5757
return nil, err
5858
}
@@ -102,7 +102,7 @@ func (p *AWSKinesisProvider) CreatePublisher(ctx context.Context, destination *m
102102
})
103103

104104
return &AWSKinesisPublisher{
105-
BasePublisher: &destregistry.BasePublisher{},
105+
BasePublisher: p.BaseProvider.NewPublisher(),
106106
client: kinesisClient,
107107
streamName: config.StreamName,
108108
partitionKeyTemplate: config.PartitionKeyTemplate,

internal/destregistry/providers/destawskinesis/destawskinesis_publish_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (s *AWSKinesisSuite) SetupSuite() {
326326
require.NoError(t, err)
327327

328328
// Create provider
329-
provider, err := destawskinesis.New(testutil.Registry.MetadataLoader())
329+
provider, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil)
330330
require.NoError(t, err)
331331

332332
// Create destination with partition key template

internal/destregistry/providers/destawskinesis/destawskinesis_validate_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestAWSKinesisDestination_Validate(t *testing.T) {
2929
}),
3030
)
3131

32-
awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader())
32+
awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil)
3333
require.NoError(t, err)
3434

3535
t.Run("should validate valid destination", func(t *testing.T) {
@@ -122,7 +122,7 @@ func TestAWSKinesisDestination_Validate(t *testing.T) {
122122
func TestAWSKinesisDestination_ComputeTarget(t *testing.T) {
123123
t.Parallel()
124124

125-
awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader())
125+
awsKinesisDestination, err := destawskinesis.New(testutil.Registry.MetadataLoader(), nil)
126126
require.NoError(t, err)
127127

128128
t.Run("should return stream and region as target", func(t *testing.T) {

internal/destregistry/providers/destawss3/destawss3.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ type AWSS3Provider struct {
4949
var _ destregistry.Provider = (*AWSS3Provider)(nil)
5050

5151
// New creates a new AWSS3Provider
52-
func New(loader metadata.MetadataLoader) (*AWSS3Provider, error) {
53-
base, err := destregistry.NewBaseProvider(loader, "aws_s3")
52+
func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*AWSS3Provider, error) {
53+
base, err := destregistry.NewBaseProvider(loader, "aws_s3", basePublisherOpts...)
5454
if err != nil {
5555
return nil, err
5656
}
@@ -102,6 +102,7 @@ func (p *AWSS3Provider) CreatePublisher(ctx context.Context, destination *models
102102
}
103103

104104
return NewAWSS3Publisher(
105+
p.BaseProvider.NewPublisher(),
105106
client,
106107
cfg.Bucket,
107108
cfg.KeyTemplate,
@@ -349,6 +350,7 @@ func parseStorageClass(storageClass string) (types.StorageClass, error) {
349350

350351
// NewAWSS3Publisher exposed for testing
351352
func NewAWSS3Publisher(
353+
basePublisher *destregistry.BasePublisher,
352354
client *s3.Client,
353355
bucket, keyTemplateStr, storageClass string,
354356
) *AWSS3Publisher {
@@ -360,7 +362,7 @@ func NewAWSS3Publisher(
360362
}
361363

362364
return &AWSS3Publisher{
363-
BasePublisher: &destregistry.BasePublisher{},
365+
BasePublisher: basePublisher,
364366
client: client,
365367
bucket: bucket,
366368
keyTemplate: tmpl,

0 commit comments

Comments
 (0)