Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.4.0
3.5.0-test1
11 changes: 11 additions & 0 deletions assets/docs/configuration/monitoring/metadata-full-example.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
monitoring {
metadata_reporter {
# An actual HTTP endpoint where metadata events would be sent
endpoint = "https://webhook.metadata.com"

# Set of arbitrary key-value pairs attached to the payload
tags = {
pipeline = "production"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
monitoring {
metadata_reporter {
# An actual HTTP endpoint where metadata events would be sent
endpoint = "https://webhook.metadata.com"
}
}
4 changes: 4 additions & 0 deletions assets/docs/configuration/targets/pubsub-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@ target {

# Name of the topic to send data into
topic_name = "some-acme-topic"

# Optional: Path to service account JSON credentials file
# If not provided, uses Google Application Default Credentials
credentials_path = "/path/to/service-account.json"
}
}
3 changes: 2 additions & 1 deletion cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup
if err != nil {
return err
}
observer, err := cfg.GetObserver(tags)

observer, err := cfg.GetObserver(cmd.AppName, cmd.AppVersion, tags)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func run(input []*models.Message, targetMocks targetMocks, transformation transf
filterTarget := testTarget{results: targetMocks.filterTarget}

failure, _ := failure.NewSnowplowFailure(&failureTarget, "test-processor", "test-version")
obs := observer.New(&testStatsReceiver{}, time.Minute, time.Second)
obs := observer.New(&testStatsReceiver{}, time.Minute, time.Second, &testMetadataReporter{})

f := sourceWriteFunc(&goodTarget, failure, &filterTarget, transformation, obs, config, nil)
err := f(input)
Expand Down Expand Up @@ -427,3 +427,11 @@ type testStatsReceiver struct {
func (r *testStatsReceiver) Send(buffer *models.ObserverBuffer) {
r.stats = append(r.stats, buffer)
}

type testMetadataReporter struct {
stats []*models.ObserverBuffer
}

func (r *testMetadataReporter) Send(buffer *models.ObserverBuffer, _, _ time.Time) {
r.stats = append(r.stats, buffer)
}
2 changes: 1 addition & 1 deletion cmd/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package cmd

const (
// AppVersion is the current version of the app
AppVersion = "3.4.0"
AppVersion = "3.5.0-test1"

// AppName is the name of the application to use in logging / places that require the artifact
AppName = "snowbridge"
Expand Down
10 changes: 5 additions & 5 deletions config/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ package config
type ComponentConfigurable interface {
// ProvideDefault returns a pointer to a structure that will be
// written with the decoded configuration.
ProvideDefault() (interface{}, error)
ProvideDefault() (any, error)
}

// ComponentCreator is the interface that wraps the Create method.
type ComponentCreator interface {
// Create returns a pointer to an output structure given a pointer
// to an input structure. This interface is expected to be implemented
// by components that are creatable through a configuration.
Create(i interface{}) (interface{}, error)
Create(i any) (any, error)
}

// Pluggable is the interface that groups
Expand All @@ -35,17 +35,17 @@ type Pluggable interface {

// decodingHandler is the type of any function that, given a ComponentConfigurable
// and a Decoder, returns a pointer to a structure that was decoded.
type decodingHandler func(c ComponentConfigurable, d Decoder) (interface{}, error)
type decodingHandler func(c ComponentConfigurable, d Decoder) (any, error)

// withDecoderOptions returns a decodingHandler closed over some DecoderOptions.
func withDecoderOptions(opts *DecoderOptions) decodingHandler {
return func(c ComponentConfigurable, d Decoder) (interface{}, error) {
return func(c ComponentConfigurable, d Decoder) (any, error) {
return configure(c, d, opts)
}
}

// Configure returns the decoded target.
func configure(c ComponentConfigurable, d Decoder, opts *DecoderOptions) (interface{}, error) {
func configure(c ComponentConfigurable, d Decoder, opts *DecoderOptions) (any, error) {
target, err := c.ProvideDefault() // target is ptr
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
testCases := []struct {
File string
Plug Pluggable
Expected interface{}
Expected any
}{
{
File: "targets/sqs-minimal-example.hcl",
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestCreateObserverComponentHCL(t *testing.T) {
testCases := []struct {
File string
Plug Pluggable
Expected interface{}
Expected any
}{
{
File: "observer.hcl",
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestCreateObserverComponentHCL(t *testing.T) {
// Test Helpers
// SQS
func testSQSTargetAdapter(f func(c *target.SQSTargetConfig) (*target.SQSTargetConfig, error)) target.SQSTargetAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*target.SQSTargetConfig)
if !ok {
return nil, errors.New("invalid input, expected SQSTargetConfig")
Expand All @@ -331,7 +331,7 @@ func testSQSTargetFunc(c *target.SQSTargetConfig) (*target.SQSTargetConfig, erro

// EventHub
func testEventHubTargetAdapter(f func(c *target.EventHubConfig) (*target.EventHubConfig, error)) target.EventHubTargetAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*target.EventHubConfig)
if !ok {
return nil, errors.New("invalid input, expected EventHubTargetConfig")
Expand All @@ -349,7 +349,7 @@ func testEventHubTargetFunc(c *target.EventHubConfig) (*target.EventHubConfig, e

// HTTP
func testHTTPTargetAdapter(f func(c *target.HTTPTargetConfig) (*target.HTTPTargetConfig, error)) target.HTTPTargetAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*target.HTTPTargetConfig)
if !ok {
return nil, errors.New("invalid input, expected HTTPTargetConfig")
Expand All @@ -367,7 +367,7 @@ func testHTTPTargetFunc(c *target.HTTPTargetConfig) (*target.HTTPTargetConfig, e

// Kafka
func testKafkaTargetAdapter(f func(c *target.KafkaConfig) (*target.KafkaConfig, error)) target.KafkaTargetAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*target.KafkaConfig)
if !ok {
return nil, errors.New("invalid input, expected KafkaTargetConfig")
Expand All @@ -385,7 +385,7 @@ func testKafkaTargetFunc(c *target.KafkaConfig) (*target.KafkaConfig, error) {

// Kinesis
func testKinesisTargetAdapter(f func(c *target.KinesisTargetConfig) (*target.KinesisTargetConfig, error)) target.KinesisTargetAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*target.KinesisTargetConfig)
if !ok {
return nil, errors.New("invalid input, expected KinesisTargetConfig")
Expand All @@ -403,7 +403,7 @@ func testKinesisTargetFunc(c *target.KinesisTargetConfig) (*target.KinesisTarget

// PubSub
func testPubSubTargetAdapter(f func(c *target.PubSubTargetConfig) (*target.PubSubTargetConfig, error)) target.PubSubTargetAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*target.PubSubTargetConfig)
if !ok {
return nil, errors.New("invalid input, expected PubSubTargetConfig")
Expand All @@ -421,7 +421,7 @@ func testPubSubTargetFunc(c *target.PubSubTargetConfig) (*target.PubSubTargetCon

// StatsD
func testStatsDAdapter(f func(c *statsreceiver.StatsDStatsReceiverConfig) (*statsreceiver.StatsDStatsReceiverConfig, error)) statsreceiver.StatsDStatsReceiverAdapter {
return func(i interface{}) (interface{}, error) {
return func(i any) (any, error) {
cfg, ok := i.(*statsreceiver.StatsDStatsReceiverConfig)
if !ok {
return nil, errors.New("invalid input, expected StatsDStatsReceiverConfig")
Expand Down
41 changes: 36 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ type metricsConfig struct {
}

type monitoringConfig struct {
Webhook *webhookConfig `hcl:"webhook,block"`
Webhook *webhookConfig `hcl:"webhook,block"`
MetadataReporter *metadataReporterConfig `hcl:"metadata_reporter,block"`
}

type webhookConfig struct {
Expand All @@ -120,6 +121,11 @@ type webhookConfig struct {
HeartbeatInterval int `hcl:"heartbeat_interval_seconds,optional"`
}

type metadataReporterConfig struct {
Endpoint string `hcl:"endpoint"`
Tags map[string]string `hcl:"tags,optional"`
}

type transientRetryConfig struct {
Delay int `hcl:"delay_ms,optional"`
MaxAttempts int `hcl:"max_attempts,optional"`
Expand Down Expand Up @@ -171,6 +177,9 @@ func defaultConfigData() *configurationData {
Tags: map[string]string{},
HeartbeatInterval: 300,
},
MetadataReporter: &metadataReporterConfig{
Tags: map[string]string{},
},
},
}
}
Expand Down Expand Up @@ -226,7 +235,7 @@ func NewHclConfig(fileContents []byte, filename string) (*Config, error) {
}

// CreateComponent creates a pluggable component given the Decoder options.
func (c *Config) CreateComponent(p Pluggable, opts *DecoderOptions) (interface{}, error) {
func (c *Config) CreateComponent(p Pluggable, opts *DecoderOptions) (any, error) {
componentConfigure := withDecoderOptions(opts)

decodedConfig, err := componentConfigure(p, c.Decoder)
Expand Down Expand Up @@ -430,13 +439,19 @@ func (c *Config) GetTags() (map[string]string, error) {
}

// GetObserver builds and returns the observer with the embedded
// optional stats receiver
func (c *Config) GetObserver(tags map[string]string) (*observer.Observer, error) {
// optional stats receiver & metadata reporter
func (c *Config) GetObserver(appName, appVersion string, tags map[string]string) (*observer.Observer, error) {
sr, err := c.getStatsReceiver(tags)
if err != nil {
return nil, err
}
return observer.New(sr, time.Duration(c.Data.StatsReceiver.TimeoutSec)*time.Second, time.Duration(c.Data.StatsReceiver.BufferSec)*time.Second), nil

metadataReporter, err := c.getMetadataReporter(appName, appVersion)
if err != nil {
return nil, err
}

return observer.New(sr, time.Duration(c.Data.StatsReceiver.TimeoutSec)*time.Second, time.Duration(c.Data.StatsReceiver.BufferSec)*time.Second, metadataReporter), nil
}

func (c *Config) GetWebhookMonitoring(appName, appVersion string) (*monitoring.WebhookMonitoring, chan error, error) {
Expand Down Expand Up @@ -486,3 +501,19 @@ func (c *Config) getStatsReceiver(tags map[string]string) (statsreceiveriface.St
return nil, errors.New(fmt.Sprintf("Invalid stats receiver found; expected one of 'statsd' and got '%s'", useReceiver.Name))
}
}

func (c *Config) getMetadataReporter(appName, appVersion string) (monitoring.MetadataReporterer, error) {
if c.Data.Monitoring.MetadataReporter.Endpoint == "" {
return nil, nil
}

if err := common.CheckURL(c.Data.Monitoring.MetadataReporter.Endpoint); err != nil {
return nil, err
}

client := http.DefaultClient
endpoint := c.Data.Monitoring.MetadataReporter.Endpoint
tags := c.Data.Monitoring.MetadataReporter.Tags

return monitoring.NewMetadataReporter(appName, appVersion, client, endpoint, tags), nil
}
17 changes: 14 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,13 @@ func TestNewConfig_Hcl_invalids(t *testing.T) {
})

t.Run("invalid_stats_receiver", func(t *testing.T) {
statsReceiver, err := c.GetObserver(map[string]string{})
statsReceiver, err := c.GetObserver("testAppName", "0.0.0", map[string]string{})
assert.Nil(statsReceiver)
assert.NotNil(err)
if err != nil {
assert.Equal("Invalid stats receiver found; expected one of 'statsd' and got 'fakeHCL'", err.Error())
}
})

}

func TestNewConfig_Hcl_NoExt_defaults(t *testing.T) {
Expand Down Expand Up @@ -220,7 +219,7 @@ func TestNewConfig_HclTransformationOrder(t *testing.T) {
assert.Equal("five", c.Data.Transformations[4].Use.Name)
}

func TestNewConfig_GetWebhookMonitoring(t *testing.T) {
func TestNewConfig_GetMonitoring(t *testing.T) {
assert := assert.New(t)

filename := filepath.Join(assets.AssetsRootDir, "test", "config", "configs", "empty.hcl")
Expand Down Expand Up @@ -250,4 +249,16 @@ func TestNewConfig_GetWebhookMonitoring(t *testing.T) {
assert.NotNil(monitoring)
assert.NotNil(alertChan)
assert.Nil(err)

// Should be able to build observer with metadata reporter
c.Data.Monitoring.MetadataReporter.Endpoint = "http://example.com"
observer, err := c.GetObserver("", "", map[string]string{})
assert.NotNil(observer)
assert.Nil(err)

// Should fail to build observer with metadata reporter
c.Data.Monitoring.MetadataReporter.Endpoint = "http:/example.com"
observer, err = c.GetObserver("", "", map[string]string{})
assert.Nil(observer)
assert.NotNil(err)
}
6 changes: 3 additions & 3 deletions config/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type Decoder interface {
// Decode decodes onto target given DecoderOptions.
// The target argument must be a pointer to an allocated structure.
Decode(opts *DecoderOptions, target interface{}) error
Decode(opts *DecoderOptions, target any) error
}

// DecoderOptions represent the options for a Decoder.
Expand All @@ -45,7 +45,7 @@ type hclDecoder struct {
// The target argument must be a pointer to an allocated structure.
// If the HCL input is nil, we assume there is nothing to do and the target
// stays unaffected. If the target is nil, we assume is not decodable.
func (h *hclDecoder) Decode(opts *DecoderOptions, target interface{}) error {
func (h *hclDecoder) Decode(opts *DecoderOptions, target any) error {
// Decoder Options cannot be missing
if opts == nil {
return errors.New("missing DecoderOptions for hclDecoder")
Expand Down Expand Up @@ -155,6 +155,6 @@ type defaultsDecoder struct{}
// Decode for defaultsDecoder leaves the target unaffected.
// The target argument must be a pointer to an allocated structure.
// If the target is nil, we assume is not decodable.
func (d *defaultsDecoder) Decode(opts *DecoderOptions, target interface{}) error {
func (d *defaultsDecoder) Decode(opts *DecoderOptions, target any) error {
return nil
}
8 changes: 4 additions & 4 deletions config/decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ test_string = "ateststring"
testCases := []struct {
TestName string
DecoderOpts *DecoderOptions
Target interface{}
Expected interface{}
Target any
Expected any
}{
{
"nil_target",
Expand Down Expand Up @@ -114,8 +114,8 @@ test_int = env("TEST_INT")
testCases := []struct {
TestName string
DecoderOpts *DecoderOptions
Target interface{}
Expected interface{}
Target any
Expected any
}{
{
"Hcl_eval_context_with_env_fun_and_var",
Expand Down
2 changes: 1 addition & 1 deletion docs/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMain(m *testing.M) {

var jsScriptPath = filepath.Join(assets.AssetsRootDir, "docs", "configuration", "transformations", "custom-scripts", "create-a-script-filter-example.js")

func checkComponentForZeros(t *testing.T, component interface{}) {
func checkComponentForZeros(t *testing.T, component any) {
assert := assert.New(t)

// Indirect dereferences the pointer for us
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration_source_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func testSourceConfig(t *testing.T, filepath string, fullExample bool) {

use := c.Data.Source.Use

var configObject interface{}
var configObject any
switch use.Name {
case "kafka":
configObject = &kafkasource.Configuration{}
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration_target_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func testFilterTargetConfig(t *testing.T, filepath string, fullExample bool) {

func testTargetComponent(t *testing.T, name string, body hcl.Body, fullExample bool) {
assert := assert.New(t)
var configObject interface{}
var configObject any
switch name {
case "eventhub":
configObject = &target.EventHubConfig{}
Expand Down
Loading
Loading