Skip to content

Commit 4a661e0

Browse files
committed
[BACK-3478] Add email notifications work system processors to email users on specific events.
1 parent 3b13dea commit 4a661e0

File tree

9 files changed

+693
-27
lines changed

9 files changed

+693
-27
lines changed

clinics/service.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Client interface {
2828
ListEHREnabledClinics(ctx context.Context) ([]clinic.Clinic, error)
2929
SyncEHRData(ctx context.Context, clinicID string) error
3030
GetPatients(ctx context.Context, clinicId string, userToken string, params *clinic.ListPatientsParams, injectedParams url.Values) ([]clinic.Patient, error)
31+
GetPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error)
3132
}
3233

3334
type config struct {
@@ -155,7 +156,7 @@ func (d *defaultClient) SharePatientAccount(ctx context.Context, clinicID, patie
155156
}
156157
if response.StatusCode() == http.StatusConflict {
157158
// User is already shared with the clinic
158-
return d.getPatient(ctx, clinicID, patientID)
159+
return d.GetPatient(ctx, clinicID, patientID)
159160
}
160161
if response.StatusCode() != http.StatusOK {
161162
err = errors.Preparedf(ErrorCodeClinicClientFailure,
@@ -182,7 +183,7 @@ func (d *defaultClient) SyncEHRData(ctx context.Context, clinicID string) error
182183
return nil
183184
}
184185

185-
func (d *defaultClient) getPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
186+
func (d *defaultClient) GetPatient(ctx context.Context, clinicID, patientID string) (*clinic.Patient, error) {
186187
response, err := d.httpClient.GetPatientWithResponse(ctx, clinic.ClinicId(clinicID), clinic.PatientId(patientID))
187188
if err != nil {
188189
return nil, err
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package v1
2+
3+
import (
4+
"net/http"
5+
"time"
6+
7+
dataService "github.com/tidepool-org/platform/data/service"
8+
"github.com/tidepool-org/platform/request"
9+
serviceApi "github.com/tidepool-org/platform/service/api"
10+
11+
"github.com/tidepool-org/platform/work/service/emailnotificationsprocessor"
12+
)
13+
14+
func EmailNotificationRoutes() []dataService.Route {
15+
return []dataService.Route{
16+
dataService.Post("/v1/notifications/account/claims", queueClaimAccountNotification, serviceApi.RequireServer),
17+
dataService.Post("/v1/notifications/account/connections", queueConnectAccountNotification, serviceApi.RequireServer),
18+
dataService.Post("/v1/notifications/device/connections/issues", sendDeviceIssuesNotification, serviceApi.RequireServer),
19+
}
20+
}
21+
22+
func queueClaimAccountNotification(dataServiceContext dataService.Context) {
23+
res := dataServiceContext.Response()
24+
req := dataServiceContext.Request()
25+
26+
responder := request.MustNewResponder(res, req)
27+
28+
var data emailnotificationsprocessor.ClaimAccountReminderData
29+
if err := request.DecodeRequestBody(req.Request, &data); err != nil {
30+
request.MustNewResponder(res, req).Error(http.StatusBadRequest, err)
31+
return
32+
}
33+
34+
createDetails := emailnotificationsprocessor.NewClaimAccountWorkCreate(time.Now().Add(time.Hour*24*7), data)
35+
_, err := dataServiceContext.WorkClient().Create(req.Context(), createDetails)
36+
if err != nil {
37+
responder.Error(http.StatusInternalServerError, err)
38+
return
39+
}
40+
41+
responder.Empty(http.StatusCreated)
42+
}
43+
44+
func queueConnectAccountNotification(dataServiceContext dataService.Context) {
45+
res := dataServiceContext.Response()
46+
req := dataServiceContext.Request()
47+
48+
responder := request.MustNewResponder(res, req)
49+
50+
var data emailnotificationsprocessor.ConnectAccountReminderData
51+
if err := request.DecodeRequestBody(req.Request, &data); err != nil {
52+
request.MustNewResponder(res, req).Error(http.StatusBadRequest, err)
53+
return
54+
}
55+
56+
createDetails := emailnotificationsprocessor.NewConnectAccountWorkCreate(time.Now().Add(time.Hour*24*7), data)
57+
_, err := dataServiceContext.WorkClient().Create(req.Context(), createDetails)
58+
if err != nil {
59+
responder.Error(http.StatusInternalServerError, err)
60+
return
61+
}
62+
63+
responder.Empty(http.StatusCreated)
64+
}
65+
66+
func sendDeviceIssuesNotification(dataServiceContext dataService.Context) {
67+
res := dataServiceContext.Response()
68+
req := dataServiceContext.Request()
69+
70+
responder := request.MustNewResponder(res, req)
71+
72+
var data emailnotificationsprocessor.DeviceConnectionIssuesData
73+
if err := request.DecodeRequestBody(req.Request, &data); err != nil {
74+
request.MustNewResponder(res, req).Error(http.StatusBadRequest, err)
75+
return
76+
}
77+
78+
createDetails := emailnotificationsprocessor.NewDeviceConnectionIssuesWorkCreate(data)
79+
_, err := dataServiceContext.WorkClient().Create(req.Context(), createDetails)
80+
if err != nil {
81+
responder.Error(http.StatusInternalServerError, err)
82+
return
83+
}
84+
85+
responder.Empty(http.StatusCreated)
86+
}

data/service/api/v1/v1.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func Routes() []service.Route {
3333
routes = append(routes, SourcesRoutes()...)
3434
routes = append(routes, SummaryRoutes()...)
3535
routes = append(routes, AlertsRoutes()...)
36+
routes = append(routes, EmailNotificationRoutes()...)
3637
routes = append(routes, abbottServiceApiV1.Routes()...)
3738

3839
return routes

data/service/service/standard.go

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,21 @@ package service
33
import (
44
"context"
55
"log"
6+
"net/http"
67
"os"
78

89
"github.com/IBM/sarama"
10+
"github.com/kelseyhightower/envconfig"
911

1012
eventsCommon "github.com/tidepool-org/go-common/events"
1113

1214
abbottClient "github.com/tidepool-org/platform-plugin-abbott/abbott/client"
1315
abbottProvider "github.com/tidepool-org/platform-plugin-abbott/abbott/provider"
1416
abbottWork "github.com/tidepool-org/platform-plugin-abbott/abbott/work"
1517

18+
confirmationClient "github.com/tidepool-org/hydrophone/client"
1619
"github.com/tidepool-org/platform/application"
20+
"github.com/tidepool-org/platform/auth"
1721
"github.com/tidepool-org/platform/clinics"
1822
dataDeduplicatorDeduplicator "github.com/tidepool-org/platform/data/deduplicator/deduplicator"
1923
dataDeduplicatorFactory "github.com/tidepool-org/platform/data/deduplicator/factory"
@@ -29,6 +33,7 @@ import (
2933
"github.com/tidepool-org/platform/errors"
3034
"github.com/tidepool-org/platform/events"
3135
logInternal "github.com/tidepool-org/platform/log"
36+
"github.com/tidepool-org/platform/mailer"
3237
metricClient "github.com/tidepool-org/platform/metric/client"
3338
oauthProvider "github.com/tidepool-org/platform/oauth/provider"
3439
"github.com/tidepool-org/platform/permission"
@@ -41,8 +46,12 @@ import (
4146
summaryClient "github.com/tidepool-org/platform/summary/client"
4247
syncTaskMongo "github.com/tidepool-org/platform/synctask/store/mongo"
4348
"github.com/tidepool-org/platform/twiist"
49+
"github.com/tidepool-org/platform/user"
50+
userClient "github.com/tidepool-org/platform/user/client"
4451
workService "github.com/tidepool-org/platform/work/service"
4552
workStoreStructuredMongo "github.com/tidepool-org/platform/work/store/structured/mongo"
53+
54+
"github.com/tidepool-org/platform/work/service/emailnotificationsprocessor"
4655
)
4756

4857
type Standard struct {
@@ -55,13 +64,16 @@ type Standard struct {
5564
syncTaskStore *syncTaskMongo.Store
5665
workStructuredStore *workStoreStructuredMongo.Store
5766
dataDeduplicatorFactory *dataDeduplicatorFactory.Factory
58-
clinicsClient *clinics.Client
67+
clinicsClient clinics.Client
5968
dataClient *Client
6069
dataRawClient *dataRawService.Client
6170
dataSourceClient *dataSourceServiceClient.Client
71+
mailerClient mailer.Mailer
6272
summaryClient *summaryClient.Client
6373
workClient *workService.Client
6474
abbottClient *abbottClient.Client
75+
userClient user.Client
76+
confirmationClient confirmationClient.ClientWithResponsesInterface
6577
workCoordinator *workService.Coordinator
6678
userEventsHandler events.Runner
6779
twiistServiceAccountAuthorizer *twiist.ServiceAccountAuthorizer
@@ -116,9 +128,18 @@ func (s *Standard) Initialize(provider application.Provider) error {
116128
if err := s.initializeDataSourceClient(); err != nil {
117129
return err
118130
}
131+
if err := s.initializeMailerClient(); err != nil {
132+
return err
133+
}
134+
if err := s.initializeUserClient(); err != nil {
135+
return err
136+
}
119137
if err := s.initializeSummaryClient(); err != nil {
120138
return err
121139
}
140+
if err := s.initializeConfirmationClient(); err != nil {
141+
return err
142+
}
122143
if err := s.initializeWorkClient(); err != nil {
123144
return err
124145
}
@@ -470,7 +491,7 @@ func (s *Standard) initializeClinicsClient() error {
470491
if err != nil {
471492
return errors.Wrap(err, "unable to create clinics client")
472493
}
473-
s.clinicsClient = &clnt
494+
s.clinicsClient = clnt
474495

475496
return nil
476497
}
@@ -511,6 +532,65 @@ func (s *Standard) initializeDataSourceClient() error {
511532
return nil
512533
}
513534

535+
func (s *Standard) initializeMailerClient() error {
536+
s.Logger().Debug("Initializing mailer client")
537+
client, err := mailer.Client()
538+
if err != nil {
539+
return errors.Wrap(err, "unable to create mailer client")
540+
}
541+
s.mailerClient = client
542+
return nil
543+
}
544+
545+
func (s *Standard) initializeUserClient() error {
546+
s.Logger().Debug("Initializing user client")
547+
client, err := userClient.NewDefaultClient(userClient.Params{
548+
ConfigReporter: s.ConfigReporter(),
549+
Logger: s.Logger(),
550+
UserAgent: s.UserAgent(),
551+
})
552+
if err != nil {
553+
return errors.Wrap(err, "unable to create user client")
554+
}
555+
s.userClient = client
556+
return nil
557+
}
558+
559+
type confirmationClientConfig struct {
560+
ServiceAddress string `envconfig:"TIDEPOOL_CONFIRMATION_CLIENT_ADDRESS"`
561+
}
562+
563+
func (c *confirmationClientConfig) Load() error {
564+
return envconfig.Process("", c)
565+
}
566+
567+
func (s *Standard) initializeConfirmationClient() error {
568+
s.Logger().Debug("Initializing confirmation client")
569+
570+
cfg := &confirmationClientConfig{}
571+
if err := cfg.Load(); err != nil {
572+
return errors.Wrap(err, "unable to load confirmations client config")
573+
}
574+
575+
opts := confirmationClient.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
576+
token, err := s.AuthClient().ServerSessionToken()
577+
if err != nil {
578+
return err
579+
}
580+
581+
req.Header.Add(auth.TidepoolSessionTokenHeaderKey, token)
582+
return nil
583+
})
584+
585+
client, err := confirmationClient.NewClientWithResponses(cfg.ServiceAddress, opts)
586+
if err != nil {
587+
return errors.Wrap(err, "unable to create confirmations client")
588+
}
589+
s.confirmationClient = client
590+
591+
return nil
592+
}
593+
514594
func (s *Standard) initializeSummaryClient() error {
515595
s.Logger().Debug("Creating summarizer registry")
516596

@@ -618,6 +698,23 @@ func (s *Standard) initializeWorkCoordinator() error {
618698
return errors.Wrap(err, "unable to register abbott processors")
619699
}
620700

701+
emailDependencies := emailnotificationsprocessor.Dependencies{
702+
DataSources: s.dataSourceStructuredStore.NewDataSourcesRepository(),
703+
Mailer: s.mailerClient,
704+
Auth: s.AuthClient(),
705+
Users: s.userClient,
706+
Clinics: s.clinicsClient,
707+
Confirmations: s.confirmationClient,
708+
}
709+
emailNotifProcessors, err := emailnotificationsprocessor.NewProcessors(emailDependencies)
710+
if err != nil {
711+
return errors.Wrap(err, "unable to create email notifications processor")
712+
}
713+
714+
if err = s.workCoordinator.RegisterProcessors(emailNotifProcessors); err != nil {
715+
return errors.Wrap(err, "unable to register email notifications processor")
716+
}
717+
621718
s.Logger().Debug("Starting work coordinator")
622719

623720
s.workCoordinator.Start()

work/service/coordinator.go

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type Coordinator struct {
4545
managerContext context.Context
4646
managerCancelFunc context.CancelFunc
4747
managerWaitGroup sync.WaitGroup
48-
timer *time.Timer
4948
}
5049

5150
func NewCoordinator(logger log.Logger, serverSessionTokenProvider ServerSessionTokenProvider, workClient WorkClient) (*Coordinator, error) {
@@ -156,9 +155,6 @@ func (c *Coordinator) startManager() {
156155
go func() {
157156
defer c.managerWaitGroup.Done()
158157

159-
c.startTimer()
160-
defer c.stopTimer()
161-
162158
for {
163159
select {
164160
case <-c.managerContext.Done(): // Drain and complete any interrupted tasks
@@ -167,13 +163,10 @@ func (c *Coordinator) startManager() {
167163
}
168164
return
169165
case completion := <-c.workersCompletionChannel:
170-
c.stopTimer()
171166
c.completeWork(completion)
172167
c.requestAndDispatchWork()
173-
c.startTimer()
174-
case <-c.timer.C:
168+
case <-c.tick():
175169
c.requestAndDispatchWork()
176-
c.startTimer()
177170
}
178171
}
179172
}()
@@ -331,23 +324,10 @@ func (c *Coordinator) completeWork(completion *coordinatorProcessingCompletion)
331324
}
332325
}
333326

334-
func (c *Coordinator) startTimer() {
327+
func (c *Coordinator) tick() <-chan time.Time {
335328
jitter := int64(float64(c.frequency) * CoordinatorDelayJitter)
336329
frequencyWithJitter := c.frequency + time.Duration(rand.Int63n(jitter*2+1)-jitter)
337-
if c.timer == nil {
338-
c.timer = time.NewTimer(frequencyWithJitter)
339-
} else {
340-
c.timer.Reset(frequencyWithJitter)
341-
}
342-
}
343-
344-
func (c *Coordinator) stopTimer() {
345-
if c.timer != nil {
346-
if !c.timer.Stop() {
347-
<-c.timer.C
348-
}
349-
c.timer = nil
350-
}
330+
return time.After(frequencyWithJitter)
351331
}
352332

353333
type coordinatorProcessingIdentifier struct {

0 commit comments

Comments
 (0)