Skip to content

Commit d196ead

Browse files
committed
new kafka updates
1 parent 1aa9d4a commit d196ead

File tree

4 files changed

+46
-195
lines changed

4 files changed

+46
-195
lines changed

cmd/migrate_valid.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ type Migrator struct {
104104
targetConn *storage.ClickHouseConnector
105105
migrationBatchSize int
106106
rpcBatchSize int
107-
newkafka *newkafka.Publisher
107+
newkafka *newkafka.NewKafka
108108
psql *storage.PostgresConnector
109109
}
110110

cmd/root.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ func init() {
149149
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
150150
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
151151
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
152+
rootCmd.PersistentFlags().String("newKafka-brokers", "", "New Kafka brokers (comma-separated)")
153+
rootCmd.PersistentFlags().String("newKafka-username", "", "New Kafka username")
154+
rootCmd.PersistentFlags().String("newKafka-password", "", "New Kafka password")
152155
rootCmd.PersistentFlags().Int("workMode-checkIntervalMinutes", 10, "How often to check work mode in minutes")
153156
rootCmd.PersistentFlags().Int64("workMode-liveModeThreshold", 500, "How many blocks the indexer can be behind before switching to live mode")
154157
rootCmd.PersistentFlags().String("validation-mode", "strict", "Validation mode. Strict will validate logsBloom and transactionsRoot. Minimal will validate transaction count and logs existence.")
@@ -265,6 +268,9 @@ func init() {
265268
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
266269
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
267270
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
271+
viper.BindPFlag("newKafka.brokers", rootCmd.PersistentFlags().Lookup("newKafka-brokers"))
272+
viper.BindPFlag("newKafka.username", rootCmd.PersistentFlags().Lookup("newKafka-username"))
273+
viper.BindPFlag("newKafka.password", rootCmd.PersistentFlags().Lookup("newKafka-password"))
268274
viper.BindPFlag("workMode.checkIntervalMinutes", rootCmd.PersistentFlags().Lookup("workMode-checkIntervalMinutes"))
269275
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
270276
viper.BindPFlag("validation.mode", rootCmd.PersistentFlags().Lookup("validation-mode"))

configs/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,12 @@ type PublisherConfig struct {
182182
Events EventPublisherConfig `mapstructure:"events"`
183183
}
184184

185+
type NewKafkaConfig struct {
186+
Brokers string `mapstructure:"brokers"`
187+
Username string `mapstructure:"username"`
188+
Password string `mapstructure:"password"`
189+
}
190+
185191
type WorkModeConfig struct {
186192
CheckIntervalMinutes int `mapstructure:"checkIntervalMinutes"`
187193
LiveModeThreshold int64 `mapstructure:"liveModeThreshold"`
@@ -201,6 +207,7 @@ type Config struct {
201207
Storage StorageConfig `mapstructure:"storage"`
202208
API APIConfig `mapstructure:"api"`
203209
Publisher PublisherConfig `mapstructure:"publisher"`
210+
NewKafka NewKafkaConfig `mapstructure:"newKafka"`
204211
WorkMode WorkModeConfig `mapstructure:"workMode"`
205212
Validation ValidationConfig `mapstructure:"validation"`
206213
}

internal/publisher/newkafka/publisherNewKafka.go

Lines changed: 32 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ import (
1818
"github.com/twmb/franz-go/pkg/sasl/plain"
1919
)
2020

21-
type Publisher struct {
21+
type NewKafka struct {
2222
client *kgo.Client
2323
mu sync.RWMutex
2424
}
2525

2626
var (
27-
instance *Publisher
27+
instance *NewKafka
2828
once sync.Once
2929
)
3030

@@ -34,31 +34,26 @@ type PublishableMessage[T common.BlockModel | common.TransactionModel | common.L
3434
}
3535

3636
// GetInstance returns the singleton Publisher instance
37-
func GetInstance() *Publisher {
37+
func GetInstance() *NewKafka {
3838
once.Do(func() {
39-
instance = &Publisher{}
39+
instance = &NewKafka{}
4040
if err := instance.initialize(); err != nil {
4141
log.Error().Err(err).Msg("Failed to initialize publisher")
4242
}
4343
})
4444
return instance
4545
}
4646

47-
func (p *Publisher) initialize() error {
48-
if !config.Cfg.Publisher.Enabled {
49-
log.Debug().Msg("Publisher is disabled, skipping initialization")
50-
return nil
51-
}
52-
47+
func (p *NewKafka) initialize() error {
5348
p.mu.Lock()
5449
defer p.mu.Unlock()
5550

56-
if config.Cfg.Publisher.Brokers == "" {
51+
if config.Cfg.NewKafka.Brokers == "" {
5752
log.Info().Msg("No Kafka brokers configured, skipping publisher initialization")
5853
return nil
5954
}
6055

61-
brokers := strings.Split(config.Cfg.Publisher.Brokers, ",")
56+
brokers := strings.Split(config.Cfg.NewKafka.Brokers, ",")
6257
opts := []kgo.Opt{
6358
kgo.SeedBrokers(brokers...),
6459
kgo.AllowAutoTopicCreation(),
@@ -71,10 +66,10 @@ func (p *Publisher) initialize() error {
7166
kgo.DialTimeout(10 * time.Second),
7267
}
7368

74-
if config.Cfg.Publisher.Username != "" && config.Cfg.Publisher.Password != "" {
69+
if config.Cfg.NewKafka.Username != "" && config.Cfg.NewKafka.Password != "" {
7570
opts = append(opts, kgo.SASL(plain.Auth{
76-
User: config.Cfg.Publisher.Username,
77-
Pass: config.Cfg.Publisher.Password,
71+
User: config.Cfg.NewKafka.Username,
72+
Pass: config.Cfg.NewKafka.Password,
7873
}.AsMechanism()))
7974
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
8075
opts = append(opts, kgo.Dialer(tlsDialer.DialContext))
@@ -96,22 +91,11 @@ func (p *Publisher) initialize() error {
9691
return nil
9792
}
9893

99-
func (p *Publisher) PublishBlockData(blockData []common.BlockData) error {
94+
func (p *NewKafka) PublishBlockData(blockData []common.BlockData) error {
10095
return p.publishBlockData(blockData, false)
10196
}
10297

103-
func (p *Publisher) PublishReorg(oldData []common.BlockData, newData []common.BlockData) error {
104-
if err := p.publishBlockData(oldData, true); err != nil {
105-
return fmt.Errorf("failed to publish old block data: %v", err)
106-
}
107-
108-
if err := p.publishBlockData(newData, false); err != nil {
109-
return fmt.Errorf("failed to publish new block data: %v", err)
110-
}
111-
return nil
112-
}
113-
114-
func (p *Publisher) Close() error {
98+
func (p *NewKafka) Close() error {
11599
p.mu.Lock()
116100
defer p.mu.Unlock()
117101

@@ -122,40 +106,7 @@ func (p *Publisher) Close() error {
122106
return nil
123107
}
124108

125-
func (p *Publisher) publishMessages(ctx context.Context, messages []*kgo.Record) error {
126-
if len(messages) == 0 {
127-
return nil
128-
}
129-
130-
if !config.Cfg.Publisher.Enabled {
131-
log.Debug().Msg("Publisher is disabled, skipping publish")
132-
return nil
133-
}
134-
135-
p.mu.RLock()
136-
defer p.mu.RUnlock()
137-
138-
if p.client == nil {
139-
return nil // Skip if no client configured
140-
}
141-
142-
var wg sync.WaitGroup
143-
wg.Add(len(messages))
144-
// Publish to all configured producers
145-
for _, msg := range messages {
146-
p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
147-
defer wg.Done()
148-
if err != nil {
149-
log.Error().Err(err).Msg("Failed to publish message to Kafka")
150-
}
151-
})
152-
}
153-
wg.Wait()
154-
155-
return nil
156-
}
157-
158-
func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool) error {
109+
func (p *NewKafka) publishBlockData(blockData []common.BlockData, isReorg bool) error {
159110
if p.client == nil || len(blockData) == 0 {
160111
return nil
161112
}
@@ -176,7 +127,7 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool)
176127
return fmt.Errorf("failed to marshal block data: %v", err)
177128
}
178129
blockdataMessages[i] = &kgo.Record{
179-
Topic: p.getTopicName("block_data"),
130+
Topic: "block_data",
180131
Key: []byte(fmt.Sprintf("block-%s-%s-%s", status, data.Block.ChainId.String(), data.Block.Hash)),
181132
Value: msgJson,
182133
}
@@ -198,143 +149,30 @@ func (p *Publisher) publishBlockData(blockData []common.BlockData, isReorg bool)
198149
return nil
199150
}
200151

201-
func (p *Publisher) createBlockMessage(block common.Block, status string) (*kgo.Record, error) {
202-
msg := PublishableMessage[common.BlockModel]{
203-
Data: block.Serialize(),
204-
Status: status,
205-
}
206-
msgJson, err := json.Marshal(msg)
207-
if err != nil {
208-
return nil, fmt.Errorf("failed to marshal block data: %v", err)
209-
}
210-
return &kgo.Record{
211-
Topic: p.getTopicName("blocks"),
212-
Key: []byte(fmt.Sprintf("block-%s-%s-%s", status, block.ChainId.String(), block.Hash)),
213-
Value: msgJson,
214-
}, nil
215-
}
216-
217-
func (p *Publisher) createTransactionMessage(tx common.Transaction, status string) (*kgo.Record, error) {
218-
msg := PublishableMessage[common.TransactionModel]{
219-
Data: tx.Serialize(),
220-
Status: status,
221-
}
222-
msgJson, err := json.Marshal(msg)
223-
if err != nil {
224-
return nil, fmt.Errorf("failed to marshal transaction data: %v", err)
225-
}
226-
return &kgo.Record{
227-
Topic: p.getTopicName("transactions"),
228-
Key: []byte(fmt.Sprintf("transaction-%s-%s-%s", status, tx.ChainId.String(), tx.Hash)),
229-
Value: msgJson,
230-
}, nil
231-
}
232-
233-
func (p *Publisher) createTraceMessage(trace common.Trace, status string) (*kgo.Record, error) {
234-
msg := PublishableMessage[common.TraceModel]{
235-
Data: trace.Serialize(),
236-
Status: status,
237-
}
238-
msgJson, err := json.Marshal(msg)
239-
if err != nil {
240-
return nil, fmt.Errorf("failed to marshal trace data: %v", err)
241-
}
242-
traceAddressStr := make([]string, len(trace.TraceAddress))
243-
for i, addr := range trace.TraceAddress {
244-
traceAddressStr[i] = fmt.Sprint(addr)
245-
}
246-
return &kgo.Record{
247-
Topic: p.getTopicName("traces"),
248-
Key: []byte(fmt.Sprintf("trace-%s-%s-%s-%v", status, trace.ChainID.String(), trace.TransactionHash, strings.Join(traceAddressStr, ","))),
249-
Value: msgJson,
250-
}, nil
251-
}
252-
253-
func (p *Publisher) createEventMessage(event common.Log, status string) (*kgo.Record, error) {
254-
msg := PublishableMessage[common.LogModel]{
255-
Data: event.Serialize(),
256-
Status: status,
257-
}
258-
msgJson, err := json.Marshal(msg)
259-
if err != nil {
260-
return nil, fmt.Errorf("failed to marshal event data: %v", err)
261-
}
262-
return &kgo.Record{
263-
Topic: p.getTopicName("events"),
264-
Key: []byte(fmt.Sprintf("event-%s-%s-%s-%d", status, event.ChainId.String(), event.TransactionHash, event.LogIndex)),
265-
Value: msgJson,
266-
}, nil
267-
}
268-
269-
func (p *Publisher) shouldPublishEvent(event common.Log) bool {
270-
if len(config.Cfg.Publisher.Events.AddressFilter) > 0 {
271-
for _, addr := range config.Cfg.Publisher.Events.AddressFilter {
272-
if addr == event.Address {
273-
return true
274-
}
275-
}
276-
return false
152+
func (p *NewKafka) publishMessages(ctx context.Context, messages []*kgo.Record) error {
153+
if len(messages) == 0 {
154+
return nil
277155
}
278156

279-
if len(config.Cfg.Publisher.Events.Topic0Filter) > 0 {
280-
for _, topic0 := range config.Cfg.Publisher.Events.Topic0Filter {
281-
if topic0 == event.Topic0 {
282-
return true
283-
}
284-
}
285-
return false
286-
}
287-
return true
288-
}
157+
p.mu.RLock()
158+
defer p.mu.RUnlock()
289159

290-
func (p *Publisher) shouldPublishTransaction(tx common.Transaction) bool {
291-
if len(config.Cfg.Publisher.Transactions.ToFilter) > 0 {
292-
for _, addr := range config.Cfg.Publisher.Transactions.ToFilter {
293-
if addr == tx.ToAddress {
294-
return true
295-
}
296-
}
297-
return false
160+
if p.client == nil {
161+
return nil // Skip if no client configured
298162
}
299163

300-
if len(config.Cfg.Publisher.Transactions.FromFilter) > 0 {
301-
for _, addr := range config.Cfg.Publisher.Transactions.FromFilter {
302-
if addr == tx.FromAddress {
303-
return true
164+
var wg sync.WaitGroup
165+
wg.Add(len(messages))
166+
// Publish to all configured producers
167+
for _, msg := range messages {
168+
p.client.Produce(ctx, msg, func(_ *kgo.Record, err error) {
169+
defer wg.Done()
170+
if err != nil {
171+
log.Error().Err(err).Msg("Failed to publish message to Kafka")
304172
}
305-
}
306-
return false
173+
})
307174
}
308-
return true
309-
}
175+
wg.Wait()
310176

311-
func (p *Publisher) getTopicName(entity string) string {
312-
chainIdSuffix := ""
313-
if config.Cfg.RPC.ChainID != "" {
314-
chainIdSuffix = fmt.Sprintf(".%s", config.Cfg.RPC.ChainID)
315-
}
316-
switch entity {
317-
case "blocks":
318-
if config.Cfg.Publisher.Blocks.TopicName != "" {
319-
return config.Cfg.Publisher.Blocks.TopicName
320-
}
321-
return fmt.Sprintf("insight.blocks%s", chainIdSuffix)
322-
case "transactions":
323-
if config.Cfg.Publisher.Transactions.TopicName != "" {
324-
return config.Cfg.Publisher.Transactions.TopicName
325-
}
326-
return fmt.Sprintf("insight.transactions%s", chainIdSuffix)
327-
case "traces":
328-
if config.Cfg.Publisher.Traces.TopicName != "" {
329-
return config.Cfg.Publisher.Traces.TopicName
330-
}
331-
return fmt.Sprintf("insight.traces%s", chainIdSuffix)
332-
case "events":
333-
if config.Cfg.Publisher.Events.TopicName != "" {
334-
return config.Cfg.Publisher.Events.TopicName
335-
}
336-
return fmt.Sprintf("insight.events%s", chainIdSuffix)
337-
default:
338-
return ""
339-
}
177+
return nil
340178
}

0 commit comments

Comments
 (0)