Skip to content

Commit 4775d57

Browse files
committed
Error on uninitialize brokers
1 parent cbbea07 commit 4775d57

File tree

2 files changed

+8
-13
lines changed

2 files changed

+8
-13
lines changed

internal/storage/kafka_postgres.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,9 @@ func NewKafkaPostgresConnector(cfg *config.KafkaConfig) (*KafkaPostgresConnector
5555
}
5656

5757
// Initialize Kafka publisher if enabled
58-
var kafkaPublisher *KafkaPublisher
59-
if cfg.Brokers != "" {
60-
kafkaPublisher, err = NewKafkaPublisher(cfg)
61-
if err != nil {
62-
log.Warn().Err(err).Msg("Failed to initialize Kafka publisher, continuing without publishing")
63-
kafkaPublisher = nil
64-
}
58+
kafkaPublisher, err := NewKafkaPublisher(cfg)
59+
if err != nil {
60+
return nil, err
6561
}
6662

6763
return &KafkaPostgresConnector{
@@ -502,11 +498,9 @@ func (kp *KafkaPostgresConnector) ReplaceBlockData(data []common.BlockData) ([]c
502498
oldBlocks := []common.BlockData{}
503499

504500
// Publish reorg event to Kafka
505-
if kp.kafkaPublisher != nil {
506-
// Publish new blocks (the reorg handler will mark old ones as reverted)
507-
if err := kp.kafkaPublisher.PublishBlockData(data); err != nil {
508-
return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
509-
}
501+
// TODO: Publish new blocks (the reorg handler will mark old ones as reverted)
502+
if err := kp.kafkaPublisher.PublishBlockData(data); err != nil {
503+
return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
510504
}
511505

512506
// Update cursor to track the highest block number

internal/storage/kafka_publisher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
126126
}
127127

128128
func (p *KafkaPublisher) publishBlockData(blockData []common.BlockData, isReorg bool) error {
129-
if p.client == nil || len(blockData) == 0 {
129+
if len(blockData) == 0 {
130130
return nil
131131
}
132132

@@ -135,6 +135,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []common.BlockData, isReorg
135135
// Prepare messages for blocks, events, transactions and traces
136136
blockMessages := make([]*kgo.Record, len(blockData))
137137

138+
// TODO: handle reorg
138139
status := "new"
139140
if isReorg {
140141
status = "reverted"

0 commit comments

Comments
 (0)