Skip to content

Commit 501a598

Browse files
committed
Refactor kafka producer and add email templates
1 parent 4d0a1f1 commit 501a598

File tree

4 files changed

+114
-97
lines changed

4 files changed

+114
-97
lines changed

kafka/config.go

Lines changed: 0 additions & 14 deletions
This file was deleted.

kafka/producer.go

Lines changed: 0 additions & 83 deletions
This file was deleted.

mailer/kafka.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package mailer
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"github.com/confluentinc/confluent-kafka-go/kafka"
9+
)
10+
11+
type Config struct {
12+
KafkaBrokers []string `envconfig:"TIDEPOOL_KAFKA_BROKERS" validate:"required"`
13+
KafkaFlushTimeout int `envconfig:"TIDEPOOL_KAFKA_FLUSH_TIMEOUT" default:"30s" validate:"required"`
14+
KafkaTopic string `envconfig:"TIDEPOOL_KAFKA_EMAILS_TOPIC" validate:"required"`
15+
}
16+
17+
type KafkaMailer struct {
18+
cfg *Config
19+
deliveryChan chan kafka.Event
20+
producer *kafka.Producer
21+
}
22+
23+
var _ Mailer = &KafkaMailer{}
24+
25+
func NewKafkaMailer(cfg *Config, deliveryChan chan kafka.Event) (*KafkaMailer, error) {
26+
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": cfg.KafkaBrokers})
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
return &KafkaMailer{
32+
cfg: cfg,
33+
deliveryChan: deliveryChan,
34+
producer: producer,
35+
}, nil
36+
}
37+
38+
func (k *KafkaMailer) Send(ctx context.Context, email *Email) error {
39+
b, err := json.Marshal(email)
40+
if err != nil {
41+
return err
42+
}
43+
44+
err = k.producer.Produce(&kafka.Message{
45+
TopicPartition: kafka.TopicPartition{Topic: &k.cfg.KafkaTopic, Partition: kafka.PartitionAny},
46+
Value: b,
47+
}, k.deliveryChan)
48+
return err
49+
}
50+
51+
func (k *KafkaMailer) Close(timeoutMs int) (err error) {
52+
outstandingEvents := k.producer.Flush(timeoutMs)
53+
if outstandingEvents != 0 {
54+
err = errors.New(fmt.Sprintf("%v events were not delivered", outstandingEvents))
55+
}
56+
k.producer.Close()
57+
close(k.deliveryChan)
58+
return
59+
}

mailer/template.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package mailer
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"github.com/pkg/errors"
7+
"html/template"
8+
)
9+
10+
type EmailTemplate struct {
11+
body *template.Template
12+
name string
13+
subject *template.Template
14+
}
15+
16+
func NewEmailTemplate(name string, subject string, body string) (*EmailTemplate, error) {
17+
if name == "" {
18+
return nil, errors.New("email template name cannot be empty")
19+
}
20+
if subject == "" {
21+
return nil, errors.New("email template subject cannot be empty")
22+
}
23+
if body == "" {
24+
return nil, errors.New("email template body cannot be empty")
25+
}
26+
precompiledSubject, err := template.New(fmt.Sprintf("%s_subject", name)).Parse(subject)
27+
if err != nil {
28+
return nil, err
29+
}
30+
precompiledBody, err := template.New(fmt.Sprintf("%s_body", name)).Parse(body)
31+
if err != nil {
32+
return nil, err
33+
}
34+
return &EmailTemplate{
35+
body: precompiledBody,
36+
name: name,
37+
subject: precompiledSubject,
38+
}, nil
39+
}
40+
41+
func (e *EmailTemplate) RenderToEmail(params interface{}, email *Email) error {
42+
var subject bytes.Buffer
43+
var body bytes.Buffer
44+
45+
if err := e.subject.Execute(&subject, params); err != nil {
46+
return err
47+
}
48+
if err := e.body.Execute(&body, params); err != nil {
49+
return err
50+
}
51+
52+
email.Subject = subject.String()
53+
email.Body = body.String()
54+
return nil
55+
}

0 commit comments

Comments
 (0)