Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions internal/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,13 @@ func TestProvidePublisher(t *testing.T) {

return PublisherIn{
Config: publisher.Config{
Brokers: []string{"localhost:9092"},
Brokers: publisher.Brokers{
RestartOnConfigChange: false,
TargetRegion: "us-east-1",
Regions: map[string][]string{
"us-east-1": {"localhost:9092"},
},
},
TopicRoutes: []publisher.TopicRoute{
{
Topic: "wrp-events",
Expand Down Expand Up @@ -337,7 +343,13 @@ func TestProvidePublisher(t *testing.T) {

return PublisherIn{
Config: publisher.Config{
Brokers: []string{}, // Empty brokers should cause validation error
Brokers: publisher.Brokers{
RestartOnConfigChange: false,
TargetRegion: "nonexistent-region",
Regions: map[string][]string{
"us-east-1": {"localhost:9092"},
},
}, // Target region not in regions map should cause validation error
TopicRoutes: []publisher.TopicRoute{
{
Topic: "wrp-events",
Expand Down Expand Up @@ -382,7 +394,13 @@ func TestProvideConsumer(t *testing.T) {
metricEmitter := observe.NewSubject[metrics.Event]()

pubConfig := publisher.Config{
Brokers: []string{"localhost:9092"},
Brokers: publisher.Brokers{
RestartOnConfigChange: false,
TargetRegion: "us-east-1",
Regions: map[string][]string{
"us-east-1": {"localhost:9092"},
},
},
TopicRoutes: []publisher.TopicRoute{
{
Topic: "wrp-events",
Expand All @@ -394,7 +412,7 @@ func TestProvideConsumer(t *testing.T) {
pub, err := publisher.New(
publisher.WithLogEmitter(logEmitter),
publisher.WithMetricsEmitter(metricEmitter),
publisher.WithBrokers(pubConfig.Brokers...),
publisher.WithBrokers(pubConfig.Brokers),
publisher.WithTopicRoutes(pubConfig.ToWRPKafkaRoutes()...),
)
require.NoError(t, err, "Setup should create publisher successfully")
Expand Down
11 changes: 8 additions & 3 deletions internal/app/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,17 @@ buckets:
missing_partition_key_action: "include" # "include" (default), "drop"

producer:
brokers:
- "localhost:9092"
topic_routes:
- pattern: "*"
topic: "default-events"

brokers:
restart_on_config_change: false
target_region: us-east-1
regions:
us-east-1:
- "localhost:9092"
us-west-2:
- "localhost:9092"
# Kafka Consumer Configuration
consumer:
# Required: List of Kafka broker addresses
Expand Down
2 changes: 1 addition & 1 deletion internal/app/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func providePublisher(in PublisherIn) (PublisherOut, error) {
publisher.WithMetricsEmitter(in.MetricEmitter),

// Required options
publisher.WithBrokers(cfg.Brokers...),
publisher.WithBrokers(cfg.Brokers),
publisher.WithTopicRoutes(wrpRoutes...),

// Optional configurations
Expand Down
8 changes: 7 additions & 1 deletion internal/publisher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// It can be unmarshaled via goschtalt and converted to functional options.
type Config struct {
// Required fields
Brokers []string
Brokers Brokers

// Topic routes for WRP message routing
TopicRoutes []TopicRoute
Expand All @@ -33,6 +33,12 @@ type Config struct {
TLS *TLSConfig
}

type Brokers struct {
RestartOnConfigChange bool
TargetRegion string
Regions map[string][]string
}

// TopicRoute represents a WRP message routing configuration
type TopicRoute struct {
Topic string
Expand Down
62 changes: 44 additions & 18 deletions internal/publisher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,29 @@ import (
"github.com/xmidt-org/wrpkafka"
)

var testBroker = Brokers{
RestartOnConfigChange: false,
TargetRegion: "us-east-1",
Regions: map[string][]string{
"us-east-1": {"localhost:9092"},
},
}

var multiBroker = Brokers{
RestartOnConfigChange: false,
TargetRegion: "us-east-1",
Regions: map[string][]string{
"us-east-1": {"localhost:9092", "localhost:9093", "localhost:9094"},
"us-west-2": {"localhost:9095"},
},
}

var emptyBroker = Brokers{
RestartOnConfigChange: false,
TargetRegion: "us-east-1",
Regions: map[string][]string{},
}

// Test suite for Config
type ConfigTestSuite struct {
suite.Suite
Expand Down Expand Up @@ -147,7 +170,7 @@ func (suite *OptionsTestSuite) TestPublisherConfig_Validate() {
{
name: "valid_config",
config: &publisherConfig{
brokers: []string{"localhost:9092"},
brokers: testBroker,
topicRoutes: []wrpkafka.TopicRoute{
{Topic: "test", Pattern: ".*"},
},
Expand All @@ -158,31 +181,35 @@ func (suite *OptionsTestSuite) TestPublisherConfig_Validate() {
{
name: "missing_brokers",
config: &publisherConfig{
brokers: []string{},
brokers: emptyBroker,
topicRoutes: []wrpkafka.TopicRoute{
{Topic: "test", Pattern: ".*"},
},
},
expectError: true,
expectedErr: ErrMissingBrokers,
description: "Should return error when brokers are empty",
description: "Should return error when brokers regions are empty",
},
{
name: "nil_brokers",
name: "invalid_target_region",
config: &publisherConfig{
brokers: nil,
brokers: Brokers{
RestartOnConfigChange: false,
TargetRegion: "nonexistent-region",
Regions: map[string][]string{
"us-east-1": {"localhost:9092"},
},
},
topicRoutes: []wrpkafka.TopicRoute{
{Topic: "test", Pattern: ".*"},
},
},
expectError: true,
expectedErr: ErrMissingBrokers,
description: "Should return error when brokers are nil",
description: "Should return error when target region is not in regions map",
},
{
name: "missing_topic_routes",
config: &publisherConfig{
brokers: []string{"localhost:9092"},
brokers: testBroker,
topicRoutes: []wrpkafka.TopicRoute{},
},
expectError: true,
Expand All @@ -192,7 +219,7 @@ func (suite *OptionsTestSuite) TestPublisherConfig_Validate() {
{
name: "nil_topic_routes",
config: &publisherConfig{
brokers: []string{"localhost:9092"},
brokers: testBroker,
topicRoutes: nil,
},
expectError: true,
Expand Down Expand Up @@ -228,26 +255,25 @@ func (suite *OptionsTestSuite) TestOptions() {
}{
{
name: "WithBrokers_single",
option: WithBrokers("localhost:9092"),
option: WithBrokers(testBroker),
setupPub: func() *KafkaPublisher {
return &KafkaPublisher{config: &publisherConfig{}}
},
verifyPub: func(p *KafkaPublisher) {
suite.Equal([]string{"localhost:9092"}, p.config.brokers)
suite.Equal(testBroker, p.config.brokers)
},
description: "Should set single broker correctly",
description: "Should set broker configuration correctly",
},
{
name: "WithBrokers_multiple",
option: WithBrokers("localhost:9092", "localhost:9093", "localhost:9094"),
option: WithBrokers(multiBroker),
setupPub: func() *KafkaPublisher {
return &KafkaPublisher{config: &publisherConfig{}}
},
verifyPub: func(p *KafkaPublisher) {
expected := []string{"localhost:9092", "localhost:9093", "localhost:9094"}
suite.Equal(expected, p.config.brokers)
suite.Equal(multiBroker, p.config.brokers)
},
description: "Should set multiple brokers correctly",
description: "Should set multi-region broker configuration correctly",
},
{
name: "WithTopicRoutes_single",
Expand Down Expand Up @@ -628,7 +654,7 @@ func TestOptionsTestSuite(t *testing.T) {
// Benchmark tests for publisher operations
func BenchmarkPublisher_IsStarted(b *testing.B) {
pub, _ := New(
WithBrokers("localhost:9092"),
WithBrokers(testBroker),
WithTopicRoutes(wrpkafka.TopicRoute{Topic: "test", Pattern: ".*"}),
)
pub.started = true
Expand Down
20 changes: 17 additions & 3 deletions internal/publisher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (f optionFunc) apply(p *KafkaPublisher) error {
// publisherConfig holds the configuration for a Publisher.
type publisherConfig struct {
// Required options
brokers []string
brokers Brokers
topicRoutes []wrpkafka.TopicRoute

// Optional publisher config fields (applied directly to wrpkafka.Publisher)
Expand All @@ -61,17 +61,31 @@ type publisherConfig struct {

// validate ensures all required configuration is present.
func (c *publisherConfig) validate() error {
if len(c.brokers) == 0 {
regions := c.brokers.Regions
if len(regions) == 0 {
return ErrMissingBrokers
}
// Check if target_region exists in the regions map
if _, exists := regions[c.brokers.TargetRegion]; !exists {
return fmt.Errorf("target_region '%s' not found in regions map", c.brokers.TargetRegion)
}

// Check if target region has brokers configured
if brokers := regions[c.brokers.TargetRegion]; len(brokers) == 0 {
return fmt.Errorf("target_region '%s' has no brokers configured", c.brokers.TargetRegion)
}
if len(c.topicRoutes) == 0 {
return ErrMissingTopicRoutes
}
if c.brokers.TargetRegion == "" {
return fmt.Errorf("target_region cannot be empty")
}

return nil
}

// WithBrokers sets the Kafka broker addresses.
func WithBrokers(brokers ...string) Option {
func WithBrokers(brokers Brokers) Option {
return optionFunc(func(p *KafkaPublisher) error {
p.config.brokers = brokers
return nil
Expand Down
7 changes: 6 additions & 1 deletion internal/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(opts ...Option) (*KafkaPublisher, error) {

// Create the underlying wrpkafka publisher
wrpPublisher := &wrpkafka.Publisher{
Brokers: publisher.config.brokers,
Brokers: publisher.config.brokers.Regions[publisher.config.brokers.TargetRegion],
InitialDynamicConfig: wrpkafka.DynamicConfig{
TopicMap: publisher.config.topicRoutes,
},
Expand Down Expand Up @@ -99,6 +99,11 @@ func (p *KafkaPublisher) Start() error {
return ErrPublisherAlreadyStarted
}

// log the target broker at startup for visibility
p.logEmitter.Notify(log.NewEvent(log.LevelInfo, "Starting WRP publisher with configured target broker", map[string]any{
"brokers": p.config.brokers.TargetRegion,
}))

// Add event listener for all publish events (success and failure)
p.wrpPublisher.AddPublishEventListener(func(event *wrpkafka.PublishEvent) {

Expand Down
Loading
Loading