Skip to content

Commit c818dd6

Browse files
committed
Add plunk for marketting emails
1 parent df0a9cd commit c818dd6

File tree

9 files changed

+134
-59
lines changed

9 files changed

+134
-59
lines changed

api/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.30.0
1212
github.com/NdoleStudio/go-otelroundtripper v0.0.13
1313
github.com/NdoleStudio/lemonsqueezy-go v1.2.4
14+
github.com/NdoleStudio/plunk-go v0.0.1
1415
github.com/avast/retry-go v3.0.0+incompatible
1516
github.com/carlmjohnson/requests v0.25.1
1617
github.com/cloudevents/sdk-go/v2 v2.16.2

api/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ github.com/NdoleStudio/go-otelroundtripper v0.0.13 h1:fDgdxcNJov4LTrMhXqJnF/E3jO
5858
github.com/NdoleStudio/go-otelroundtripper v0.0.13/go.mod h1:UIUQ22ErFoBUyLuPDrVNRRKmBHBTfzQO9GF1ztqDvqo=
5959
github.com/NdoleStudio/lemonsqueezy-go v1.2.4 h1:BhWlCUH+DIPfSn4g/V7f2nFkMCQuzno9DXKZ7YDrXXA=
6060
github.com/NdoleStudio/lemonsqueezy-go v1.2.4/go.mod h1:2uZlWgn9sbNxOx3JQWLlPrDOC6NT/wmSTOgL3U/fMMw=
61+
github.com/NdoleStudio/plunk-go v0.0.1 h1:nWPr5pcwFDvhYGZS5n3a3cKGkQvg5re9DSAiFMZCFvs=
62+
github.com/NdoleStudio/plunk-go v0.0.1/go.mod h1:pqG3zKhpn/A2bL1K+WsWzvfTpOeSkYgXhNk5H65uEc8=
6163
github.com/PuerkitoBio/goquery v1.10.3 h1:pFYcNSqHxBD06Fpj/KsbStFRsgRATgnf3LeXiUkhzPo=
6264
github.com/PuerkitoBio/goquery v1.10.3/go.mod h1:tMUX0zDMHXYlAQk6p35XxQMqMweEKB7iK7iLNd4RH4Y=
6365
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=

api/pkg/di/container.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strconv"
1010
"time"
1111

12+
plunk "github.com/NdoleStudio/plunk-go"
1213
"github.com/pusher/pusher-http-go/v5"
1314

1415
"github.com/NdoleStudio/httpsms/docs"
@@ -916,7 +917,7 @@ func (container *Container) MarketingService() (service *services.MarketingServi
916917
container.Logger(),
917918
container.Tracer(),
918919
container.FirebaseAuthClient(),
919-
os.Getenv("BREVO_API_KEY"),
920+
container.PlunkClient(),
920921
)
921922
}
922923

@@ -929,7 +930,6 @@ func (container *Container) UserService() (service *services.UserService) {
929930
container.UserRepository(),
930931
container.Mailer(),
931932
container.UserEmailFactory(),
932-
container.MarketingService(),
933933
container.LemonsqueezyClient(),
934934
container.EventDispatcher(),
935935
container.FirebaseAuthClient(),
@@ -1181,6 +1181,16 @@ func (container *Container) DiscordClient() (client *discord.Client) {
11811181
)
11821182
}
11831183

1184+
// PlunkClient creates a new instance of plunk.Client
1185+
func (container *Container) PlunkClient() (client *plunk.Client) {
1186+
container.logger.Debug(fmt.Sprintf("creating %T", client))
1187+
return plunk.New(
1188+
plunk.WithHTTPClient(container.HTTPClient("plunk")),
1189+
plunk.WithSecretKey(os.Getenv("PLUNK_SECRET_KEY")),
1190+
plunk.WithPublicKey(os.Getenv("PLUNK_PUBLIC_KEY")),
1191+
)
1192+
}
1193+
11841194
// RegisterLemonsqueezyRoutes registers routes for the /lemonsqueezy prefix
11851195
func (container *Container) RegisterLemonsqueezyRoutes() {
11861196
container.logger.Debug(fmt.Sprintf("registering %T routes", &handlers.LemonsqueezyHandler{}))
@@ -1306,6 +1316,12 @@ func (container *Container) RegisterDiscordListeners() {
13061316
// RegisterMarketingListeners registers event listeners for listeners.MarketingListener
13071317
func (container *Container) RegisterMarketingListeners() {
13081318
container.logger.Debug(fmt.Sprintf("registering listeners for %T", listeners.MarketingListener{}))
1319+
1320+
if os.Getenv("PLUNK_SECRET_KEY") == "" {
1321+
container.logger.Debug("skipping marketing listeners because the PLUNK_SECRET_KEY env variable is not set")
1322+
return
1323+
}
1324+
13091325
_, routes := listeners.NewMarketingListener(
13101326
container.Logger(),
13111327
container.Tracer(),
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package events
2+
3+
import (
4+
"time"
5+
6+
"github.com/NdoleStudio/httpsms/pkg/entities"
7+
)
8+
9+
// UserAccountCreated is raised when a user's account is created.
10+
const UserAccountCreated = "user.account.created"
11+
12+
// UserAccountCreatedPayload stores the data for the UserAccountCreated event
13+
type UserAccountCreatedPayload struct {
14+
UserID entities.UserID `json:"user_id"`
15+
Timestamp time.Time `json:"timestamp"`
16+
}

api/pkg/events/user_account_deleted_event.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import (
99
// UserAccountDeleted is raised when a user's account is deleted.
1010
const UserAccountDeleted = "user.account.deleted"
1111

12-
// UserAccountDeletedPayload stores the data for the UserAccountDeletedPayload event
12+
// UserAccountDeletedPayload stores the data for the UserAccountDeleted event
1313
type UserAccountDeletedPayload struct {
1414
UserID entities.UserID `json:"user_id"`
15+
UserEmail string `json:"user_email"`
1516
Timestamp time.Time `json:"timestamp"`
1617
}

api/pkg/handlers/user_handler.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,11 @@ func (h *UserHandler) RegisterRoutes(router fiber.Router, middlewares ...fiber.H
6262
// @Failure 500 {object} responses.InternalServerError
6363
// @Router /users/me [get]
6464
func (h *UserHandler) Show(c *fiber.Ctx) error {
65-
ctx, span := h.tracer.StartFromFiberCtx(c)
65+
ctx, span, ctxLogger := h.tracer.StartFromFiberCtxWithLogger(c, h.logger)
6666
defer span.End()
6767

68-
ctxLogger := h.tracer.CtxLogger(h.logger, span)
69-
7068
authUser := h.userFromContext(c)
71-
72-
user, err := h.service.Get(ctx, authUser)
69+
user, err := h.service.Get(ctx, c.OriginalURL(), authUser)
7370
if err != nil {
7471
msg := fmt.Sprintf("cannot get user with ID [%s]", authUser.ID)
7572
ctxLogger.Error(stacktrace.Propagate(err, msg))
@@ -112,7 +109,7 @@ func (h *UserHandler) Update(c *fiber.Ctx) error {
112109
return h.responseUnprocessableEntity(c, errors, "validation errors while updating user")
113110
}
114111

115-
user, err := h.service.Update(ctx, h.userFromContext(c), request.ToUpdateParams())
112+
user, err := h.service.Update(ctx, c.OriginalURL(), h.userFromContext(c), request.ToUpdateParams())
116113
if err != nil {
117114
msg := fmt.Sprintf("cannot update user with params [%+#v]", request)
118115
ctxLogger.Error(stacktrace.Propagate(err, msg))

api/pkg/listeners/marketing_listener.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,28 @@ func NewMarketingListener(
3232

3333
return l, map[string]events.EventListener{
3434
events.UserAccountDeleted: l.onUserAccountDeleted,
35+
events.UserAccountCreated: l.onUserAccountCreated,
3536
}
3637
}
3738

39+
func (listener *MarketingListener) onUserAccountCreated(ctx context.Context, event cloudevents.Event) error {
40+
ctx, span := listener.tracer.Start(ctx)
41+
defer span.End()
42+
43+
var payload events.UserAccountCreatedPayload
44+
if err := event.DataAs(&payload); err != nil {
45+
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
46+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
47+
}
48+
49+
if err := listener.service.CreateContact(ctx, payload.UserID); err != nil {
50+
msg := fmt.Sprintf("cannot create [contact] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
51+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
52+
}
53+
54+
return nil
55+
}
56+
3857
func (listener *MarketingListener) onUserAccountDeleted(ctx context.Context, event cloudevents.Event) error {
3958
ctx, span := listener.tracer.Start(ctx)
4059
defer span.End()
@@ -45,8 +64,8 @@ func (listener *MarketingListener) onUserAccountDeleted(ctx context.Context, eve
4564
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
4665
}
4766

48-
if err := listener.service.DeleteUser(ctx, payload.UserID); err != nil {
49-
msg := fmt.Sprintf("cannot delete [sendgrid contact] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
67+
if err := listener.service.DeleteContact(ctx, payload.UserEmail); err != nil {
68+
msg := fmt.Sprintf("cannot delete [contact] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
5069
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
5170
}
5271

api/pkg/services/marketting_service.go

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55
"fmt"
66
"strings"
77

8-
"github.com/carlmjohnson/requests"
8+
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
99

1010
"firebase.google.com/go/auth"
1111
"github.com/NdoleStudio/httpsms/pkg/entities"
1212
"github.com/NdoleStudio/httpsms/pkg/telemetry"
13+
plunk "github.com/NdoleStudio/plunk-go"
1314
"github.com/gofiber/fiber/v2"
1415
"github.com/palantir/stacktrace"
1516
)
@@ -19,90 +20,93 @@ type MarketingService struct {
1920
logger telemetry.Logger
2021
tracer telemetry.Tracer
2122
authClient *auth.Client
22-
brevoAPIKey string
23+
plunkClient *plunk.Client
2324
}
2425

2526
// NewMarketingService creates a new instance of the MarketingService
2627
func NewMarketingService(
2728
logger telemetry.Logger,
2829
tracer telemetry.Tracer,
2930
authClient *auth.Client,
30-
brevoAPIKey string,
31+
plunkClient *plunk.Client,
3132
) *MarketingService {
3233
return &MarketingService{
3334
logger: logger.WithService(fmt.Sprintf("%T", &MarketingService{})),
3435
tracer: tracer,
3536
authClient: authClient,
36-
brevoAPIKey: brevoAPIKey,
37+
plunkClient: plunkClient,
3738
}
3839
}
3940

40-
// DeleteUser a user if exists in the sendgrid list
41-
func (service *MarketingService) DeleteUser(ctx context.Context, userID entities.UserID) error {
41+
// DeleteContact a user if exists as a contact
42+
func (service *MarketingService) DeleteContact(ctx context.Context, email string) error {
4243
ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
4344
defer span.End()
4445

45-
err := requests.URL(fmt.Sprintf("https://api.brevo.com/v3/contacts/%s?identifierType=ext_id", userID)).
46-
Header("api-key", service.brevoAPIKey).
47-
Delete().
48-
CheckStatus(fiber.StatusNoContent).
49-
Fetch(ctx)
46+
response, _, err := service.plunkClient.Contacts.List(ctx, map[string]string{"limit": "1", "search": email})
5047
if err != nil {
51-
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot delete user with id [%s] from brevo list", userID)))
48+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot search for contact with email [%s]", email)))
5249
}
5350

54-
ctxLogger.Info(fmt.Sprintf("deleted user with ID [%s] from brevo list with status [%s]", userID, fiber.StatusNoContent))
51+
if len(response.Contacts) == 0 {
52+
ctxLogger.Info(fmt.Sprintf("no contact found with email [%s], skipping deletion", email))
53+
return nil
54+
}
55+
56+
contact := response.Contacts[0]
57+
if _, err = service.plunkClient.Contacts.Delete(ctx, contact.ID); err != nil {
58+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, fmt.Sprintf("cannot delete user with ID [%s] from contacts", contact.Data[string(semconv.EnduserIDKey)])))
59+
}
60+
61+
ctxLogger.Info(fmt.Sprintf("deleted user with ID [%s] from as marketting contact with ID [%s]", contact.Data[string(semconv.EnduserIDKey)], contact.ID))
5562
return nil
5663
}
5764

58-
// AddToList adds a new user on the onboarding automation.
59-
func (service *MarketingService) AddToList(ctx context.Context, user *entities.User) {
65+
// CreateContact adds a new user on the onboarding automation.
66+
func (service *MarketingService) CreateContact(ctx context.Context, userID entities.UserID) error {
6067
ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
6168
defer span.End()
6269

63-
userRecord, err := service.authClient.GetUser(ctx, string(user.ID))
70+
userRecord, err := service.authClient.GetUser(ctx, userID.String())
6471
if err != nil {
65-
msg := fmt.Sprintf("cannot get auth user with id [%s]", user.ID)
66-
ctxLogger.Error(stacktrace.Propagate(err, msg))
67-
return
72+
msg := fmt.Sprintf("cannot get auth user with id [%s]", userID)
73+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
6874
}
6975

70-
var response string
71-
err = requests.URL("https://api.brevo.com/v3/contacts").
72-
Header("api-key", service.brevoAPIKey).
73-
Post().
74-
BodyJSON(fiber.Map{
75-
"email": userRecord.Email,
76-
"ext_id": userRecord.UID,
77-
"attributes": service.brevoAttributes(userRecord),
78-
"listIds": []int64{9},
79-
"updateEnabled": true,
80-
}).
81-
CheckStatus(fiber.StatusCreated, fiber.StatusNoContent).
82-
ToString(&response).
83-
Fetch(ctx)
76+
data := service.attributes(userRecord)
77+
data[string(semconv.ServiceNameKey)] = "httpsms.com"
78+
data[string(semconv.EnduserIDKey)] = userRecord.UID
79+
80+
event, _, err := service.plunkClient.Tracker.TrackEvent(ctx, &plunk.TrackEventRequest{
81+
Email: userRecord.Email,
82+
Event: "contact.created",
83+
Subscribed: true,
84+
Data: data,
85+
})
8486
if err != nil {
85-
msg := fmt.Sprintf("cannot add user with id [%s] to brevo list", user.ID)
86-
ctxLogger.Error(stacktrace.Propagate(err, msg))
87-
return
87+
msg := fmt.Sprintf("cannot create contact for user with id [%s]", userID)
88+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
8889
}
8990

90-
ctxLogger.Info(fmt.Sprintf("user [%s] added to list brevo list with brevo response [%s]", user.ID, response))
91+
ctxLogger.Info(fmt.Sprintf("user [%s] added to marketting list with contact ID [%s] and event ID [%s]", userID, event.Data.Contact, event.Data.Event))
92+
return nil
9193
}
9294

93-
func (service *MarketingService) brevoAttributes(user *auth.UserRecord) map[string]any {
95+
func (service *MarketingService) attributes(user *auth.UserRecord) map[string]any {
9496
name := strings.TrimSpace(user.DisplayName)
9597
if name == "" {
9698
return fiber.Map{}
9799
}
98100

99101
parts := strings.Split(name, " ")
100102
if len(parts) == 1 {
101-
return fiber.Map{"FIRSTNAME": name}
103+
return fiber.Map{
104+
"firstName": name,
105+
}
102106
}
103107

104108
return fiber.Map{
105-
"FIRSTNAME": strings.Join(parts[0:len(parts)-1], " "),
106-
"LASTNAME": parts[len(parts)-1],
109+
"firstName": strings.Join(parts[0:len(parts)-1], " "),
110+
"lastName": parts[len(parts)-1],
107111
}
108112
}

0 commit comments

Comments
 (0)