@@ -14,19 +14,21 @@ import (
1414 "time"
1515
1616 "github.com/linkedin/goavro"
17- kafka "github.com/segmentio/kafka-go"
17+ "github.com/segmentio/kafka-go"
1818
1919 appctx "github.com/brave-intl/bat-go/utils/context"
2020 errorutils "github.com/brave-intl/bat-go/utils/errors"
2121 "github.com/brave-intl/bat-go/utils/logging"
2222)
2323
24- type KafkaRead struct {
24+ // Reader - implements KafkaReader
25+ type Reader struct {
2526 kafkaReader * kafka.Reader
2627 kafkaDialer * kafka.Dialer
2728}
2829
29- func NewKafkaReader (ctx context.Context , groupID string , topic string ) (* KafkaRead , error ) {
30+ // NewKafkaReader - creates a new kafka reader for groupID and topic
31+ func NewKafkaReader (ctx context.Context , groupID string , topic string ) (* Reader , error ) {
3032 _ , logger := logging .SetupLogger (ctx )
3133
3234 dialer , x509Cert , err := TLSDialer ()
@@ -40,20 +42,23 @@ func NewKafkaReader(ctx context.Context, groupID string, topic string) (*KafkaRe
4042 kafkaBrokers := ctx .Value (appctx .KafkaBrokersCTXKey ).(string )
4143
4244 kafkaReader := kafka .NewReader (kafka.ReaderConfig {
43- Brokers : strings .Split (kafkaBrokers , "," ),
44- GroupID : groupID ,
45- Topic : topic ,
46- Dialer : dialer ,
47- Logger : kafka .LoggerFunc (logger .Printf ), // FIXME
45+ Brokers : strings .Split (kafkaBrokers , "," ),
46+ GroupID : groupID ,
47+ Topic : topic ,
48+ Dialer : dialer ,
49+ StartOffset : 0 ,
50+ RetentionTime : 2 * time .Hour ,
51+ Logger : kafka .LoggerFunc (logger .Printf ), // FIXME
4852 })
4953
50- return & KafkaRead {
54+ return & Reader {
5155 kafkaReader : kafkaReader ,
5256 kafkaDialer : dialer ,
5357 }, nil
5458}
5559
56- func (k * KafkaRead ) ReadMessage (ctx context.Context ) (kafka.Message , error ) {
60+ // ReadMessage - reads kafka messages
61+ func (k * Reader ) ReadMessage (ctx context.Context ) (kafka.Message , error ) {
5762 return k .kafkaReader .ReadMessage (ctx )
5863}
5964
0 commit comments