Skip to content

Commit 1ab0a3f

Browse files
committed
Send urgent heartbeat when phone doesn't send notification
1 parent 26c1cd9 commit 1ab0a3f

File tree

9 files changed

+154
-79
lines changed

9 files changed

+154
-79
lines changed

android/app/src/main/java/com/httpsms/Constants.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ package com.httpsms
33
class Constants {
44
companion object {
55
const val KEY_MESSAGE_ID = "KEY_MESSAGE_ID"
6+
const val KEY_HEARTBEAT_ID = "KEY_HEARTBEAT_ID"
67
}
78
}

android/app/src/main/java/com/httpsms/FirebaseMessagingService.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import com.google.firebase.messaging.RemoteMessage
1010
import timber.log.Timber
1111
import java.time.ZoneOffset
1212
import java.time.ZonedDateTime
13+
import java.util.Timer
1314

1415

1516
class MyFirebaseMessagingService : FirebaseMessagingService() {
@@ -18,6 +19,12 @@ class MyFirebaseMessagingService : FirebaseMessagingService() {
1819
initTimber()
1920
Timber.d(MyFirebaseMessagingService::onMessageReceived.name)
2021

22+
if (remoteMessage.data.containsKey(Constants.KEY_HEARTBEAT_ID)) {
23+
Timber.w("received heartbeat message with ID [${remoteMessage.data[Constants.KEY_HEARTBEAT_ID]}] and priority [${remoteMessage.priority}] and original priority [${remoteMessage.originalPriority}]")
24+
sendHeartbeat()
25+
return
26+
}
27+
2128
val messageID = remoteMessage.data[Constants.KEY_MESSAGE_ID]
2229
if (messageID == null) {
2330
Timber.e("cannot get message id from notification data with key [${Constants.KEY_MESSAGE_ID}]")
@@ -45,6 +52,23 @@ class MyFirebaseMessagingService : FirebaseMessagingService() {
4552
}
4653
// [END on_new_token]
4754

55+
private fun sendHeartbeat() {
56+
Timber.d("sending heartbeat from FCM notification")
57+
if (!Settings.isLoggedIn(applicationContext)) {
58+
Timber.w("user is not logged in, not sending heartbeat")
59+
return
60+
}
61+
Thread {
62+
try {
63+
HttpSmsApiService.create(applicationContext).storeHeartbeat(Settings.getOwnerOrDefault(applicationContext))
64+
Settings.setHeartbeatTimestampAsync(applicationContext, System.currentTimeMillis())
65+
} catch (exception: Exception) {
66+
Timber.e(exception)
67+
}
68+
Timber.d("finished sending pulse")
69+
}.start()
70+
}
71+
4872
private fun scheduleJob(messageID: String) {
4973
// [START dispatch_job]
5074
val inputData: Data = workDataOf(Constants.KEY_MESSAGE_ID to messageID)

android/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ buildscript {
1212
}
1313

1414
plugins {
15-
id 'com.android.application' version '8.0.0' apply false
16-
id 'com.android.library' version '8.0.0' apply false
15+
id 'com.android.application' version '8.0.1' apply false
16+
id 'com.android.library' version '8.0.1' apply false
1717
id 'org.jetbrains.kotlin.android' version '1.6.21' apply false
1818
}
1919

api/cmd/fcm/main.go

Lines changed: 10 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,14 @@
11
package main
22

33
import (
4-
"bytes"
54
"context"
6-
"encoding/json"
7-
"fmt"
8-
"io"
95
"log"
10-
"math/rand"
11-
"net/http"
126
"os"
137
"time"
148

15-
"github.com/davecgh/go-spew/spew"
16-
179
"firebase.google.com/go/messaging"
10+
"github.com/NdoleStudio/httpsms/pkg/di"
1811
"github.com/joho/godotenv"
19-
"github.com/palantir/stacktrace"
20-
21-
firebase "firebase.google.com/go"
22-
"google.golang.org/api/option"
2312
)
2413

2514
func main() {
@@ -28,70 +17,21 @@ func main() {
2817
log.Fatal("Error loading .env file")
2918
}
3019

31-
opt := option.WithCredentialsFile("serviceAccountKey.json")
32-
app, err := firebase.NewApp(context.Background(), nil, opt)
33-
if err != nil {
34-
log.Fatal(stacktrace.Propagate(err, "cannot create firebase app"))
35-
}
36-
37-
client, err := app.Messaging(context.Background())
38-
if err != nil {
39-
log.Fatal(stacktrace.Propagate(err, "cannot create messaging client"))
40-
}
41-
42-
err = createSmsMessage()
43-
if err != nil {
44-
log.Fatal(stacktrace.Propagate(err, "cannot create sms message"))
45-
}
20+
container := di.NewContainer(os.Getenv("GCP_PROJECT_ID"), "")
21+
client := container.FirebaseMessagingClient()
4622

4723
result, err := client.Send(context.Background(), &messaging.Message{
4824
Data: map[string]string{
49-
"hello": "world",
25+
"KEY_HEARTBEAT_ID": time.Now().UTC().Format(time.RFC3339),
5026
},
51-
Token: os.Getenv("FCM_TOKEN"),
52-
})
53-
if err != nil {
54-
log.Fatal(stacktrace.Propagate(err, "cannot send FCM event"))
55-
}
56-
57-
log.Println(fmt.Sprintf("sent event with response [%s]", result))
58-
}
59-
60-
func createSmsMessage() error {
61-
payload, err := json.Marshal(map[string]string{
62-
"from": os.Getenv("PHONE_NUMBER"),
63-
"to": os.Getenv("PHONE_NUMBER"),
64-
"content": fmt.Sprintf("[%s] random message [%d]", time.Now().String(), rand.Int()),
27+
Android: &messaging.AndroidConfig{
28+
Priority: "high",
29+
},
30+
Token: os.Getenv("FIREBASE_TOKEN"),
6531
})
6632
if err != nil {
67-
return stacktrace.Propagate(err, "cannot convert message to map")
68-
}
69-
70-
client := http.Client{}
71-
req, err := http.NewRequest(http.MethodPost, "https://api.httpsms.com/v1/messages/send", bytes.NewBuffer(payload))
72-
if err != nil {
73-
return stacktrace.Propagate(err, "cannot do http request")
74-
}
75-
76-
req.Header = http.Header{
77-
"Content-Type": {"application/json"},
78-
"X-API-Key": {os.Getenv("API_KEY")},
79-
}
80-
81-
response, err := client.Do(req)
82-
if err != nil {
83-
return stacktrace.Propagate(err, "cannot do http request")
84-
}
85-
86-
body, err := io.ReadAll(response.Body)
87-
if err != nil {
88-
return stacktrace.Propagate(err, "cannot read response body")
89-
}
90-
91-
if response.StatusCode >= 400 {
92-
return stacktrace.NewError(fmt.Sprintf("[%s] %s", response.StatusCode, string(body)))
33+
container.Logger().Fatal(err)
9334
}
9435

95-
spew.Dump(string(body))
96-
return nil
36+
container.Logger().Info(result)
9737
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package events
2+
3+
import (
4+
"time"
5+
6+
"github.com/NdoleStudio/httpsms/pkg/entities"
7+
"github.com/google/uuid"
8+
)
9+
10+
// PhoneHeartbeatMissed is emitted when the phone is missing a heartbeat
11+
const PhoneHeartbeatMissed = "phone.heartbeat.missed"
12+
13+
// PhoneHeartbeatMissedPayload is the payload of the PhoneHeartbeatMissed event
14+
type PhoneHeartbeatMissedPayload struct {
15+
PhoneID uuid.UUID `json:"phone_id"`
16+
UserID entities.UserID `json:"user_id"`
17+
LastHeartbeatTimestamp time.Time `json:"last_heartbeat_timestamp"`
18+
Timestamp time.Time `json:"timestamp"`
19+
MonitorID uuid.UUID `json:"monitor_id"`
20+
Owner string `json:"owner"`
21+
}

api/pkg/listeners/phone_notification_listener.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func NewNotificationListener(
3636
events.EventTypeMessageAPISent: l.onMessageAPISent,
3737
events.EventTypeMessageSendRetry: l.onMessageSendRetry,
3838
events.EventTypeMessageNotificationSend: l.onMessageNotificationSend,
39+
events.PhoneHeartbeatMissed: l.onPhoneHeartbeatMissed,
3940
}
4041
}
4142

@@ -97,6 +98,25 @@ func (listener *PhoneNotificationListener) onMessageSendRetry(ctx context.Contex
9798
return nil
9899
}
99100

101+
// onPhoneHeartbeatMissed handles the events.PhoneHeartbeatMissed event
102+
func (listener *PhoneNotificationListener) onPhoneHeartbeatMissed(ctx context.Context, event cloudevents.Event) error {
103+
ctx, span := listener.tracer.Start(ctx)
104+
defer span.End()
105+
106+
payload := new(events.PhoneHeartbeatMissedPayload)
107+
if err := event.DataAs(payload); err != nil {
108+
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
109+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
110+
}
111+
112+
if err := listener.service.SendHeartbeatFCM(ctx, payload); err != nil {
113+
msg := fmt.Sprintf("cannot schedule send heartbeat FCM with params [%s] for event with ID [%s]", spew.Sdump(payload), event.ID())
114+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
115+
}
116+
117+
return nil
118+
}
119+
100120
// onMessageNotificationSend handles the events.EventTypeMessageNotificationSend event
101121
func (listener *PhoneNotificationListener) onMessageNotificationSend(ctx context.Context, event cloudevents.Event) error {
102122
ctx, span := listener.tracer.Start(ctx)

api/pkg/services/event_dispatcher_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (dispatcher *EventDispatcher) DispatchWithTimeout(ctx context.Context, even
9090
func (dispatcher *EventDispatcher) Dispatch(ctx context.Context, event cloudevents.Event) error {
9191
ctx, span := dispatcher.tracer.Start(ctx)
9292
defer span.End()
93-
_, err := dispatcher.DispatchWithTimeout(ctx, event, time.Nanosecond*-1)
93+
_, err := dispatcher.DispatchWithTimeout(ctx, event, time.Nanosecond*1)
9494
return err
9595
}
9696

api/pkg/services/heartbeat_service.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
const (
2121
// select id, a.timestamp, a.owner, a.timestamp - (SELECT timestamp from heartbeats b where b.timestamp < a.timestamp and a.owner = b.owner and a.user_id = b.user_id order by b.timestamp desc limit 1) as diff from heartbeats a;
22-
heartbeatCheckInterval = 1 * time.Hour
22+
heartbeatCheckInterval = 16 * time.Minute
2323
)
2424

2525
// HeartbeatService is handles heartbeat requests
@@ -202,21 +202,51 @@ func (service *HeartbeatService) Monitor(ctx context.Context, params *HeartbeatM
202202
return nil
203203
}
204204

205-
if time.Now().UTC().Sub(heartbeat.Timestamp) > heartbeatCheckInterval &&
206-
time.Now().UTC().Sub(heartbeat.Timestamp) < (heartbeatCheckInterval*2) {
205+
// send urgent FCM message if the last heartbeat is late
206+
if time.Now().UTC().Sub(heartbeat.Timestamp) > heartbeatCheckInterval && time.Now().UTC().Sub(heartbeat.Timestamp) < (heartbeatCheckInterval*5) {
207+
ctxLogger.Info(fmt.Sprintf("sending missed heartbeat notification for userID [%s] and owner [%s] and monitor ID [%s]", params.UserID, params.Owner, params.MonitorID))
208+
service.handleMissedMonitor(ctx, heartbeat.Timestamp, params)
209+
}
210+
211+
if time.Now().UTC().Sub(heartbeat.Timestamp) > (heartbeatCheckInterval*4) &&
212+
time.Now().UTC().Sub(heartbeat.Timestamp) < (heartbeatCheckInterval*5) {
207213
return service.handleFailedMonitor(ctx, heartbeat.Timestamp, params)
208214
}
209215

210216
return service.scheduleHeartbeatCheck(ctx, heartbeat.Timestamp, params)
211217
}
212218

219+
func (service *HeartbeatService) handleMissedMonitor(ctx context.Context, lastTimestamp time.Time, params *HeartbeatMonitorParams) {
220+
ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
221+
defer span.End()
222+
223+
event, err := service.createPhoneHeartbeatMissedEvent(params.Source, &events.PhoneHeartbeatMissedPayload{
224+
PhoneID: params.PhoneID,
225+
UserID: params.UserID,
226+
MonitorID: params.MonitorID,
227+
LastHeartbeatTimestamp: lastTimestamp,
228+
Timestamp: time.Now().UTC(),
229+
Owner: params.Owner,
230+
})
231+
if err != nil {
232+
msg := fmt.Sprintf("cannot create event when phone monitor [%s] missed heartbeat", params.MonitorID.String())
233+
ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
234+
return
235+
}
236+
237+
if _, err = service.dispatcher.DispatchWithTimeout(ctx, event, heartbeatCheckInterval); err != nil {
238+
msg := fmt.Sprintf("cannot dispatch event [%s] for heartbeat monitor with phone id [%s]", event.Type(), params.PhoneID)
239+
ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
240+
}
241+
}
242+
213243
func (service *HeartbeatService) handleFailedMonitor(ctx context.Context, lastTimestamp time.Time, params *HeartbeatMonitorParams) error {
214244
ctx, span := service.tracer.Start(ctx)
215245
defer span.End()
216246

217247
err := service.scheduleHeartbeatCheck(ctx, time.Now().UTC(), params)
218248
if err != nil {
219-
msg := fmt.Sprintf("canot schedule healthcheck for monitor with owner [%s] and userID [%s]", params.Owner, params.UserID)
249+
msg := fmt.Sprintf("cannot schedule healthcheck for monitor with owner [%s] and userID [%s]", params.Owner, params.UserID)
220250
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
221251
}
222252

@@ -233,7 +263,7 @@ func (service *HeartbeatService) handleFailedMonitor(ctx context.Context, lastTi
233263
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
234264
}
235265

236-
if _, err = service.dispatcher.DispatchWithTimeout(ctx, event, heartbeatCheckInterval); err != nil {
266+
if err = service.dispatcher.Dispatch(ctx, event); err != nil {
237267
msg := fmt.Sprintf("cannot dispatch event [%s] for heartbeat monitor with phone id [%s]", event.Type(), params.PhoneID)
238268
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
239269
}
@@ -271,6 +301,10 @@ func (service *HeartbeatService) scheduleHeartbeatCheck(ctx context.Context, las
271301
return nil
272302
}
273303

304+
func (service *HeartbeatService) createPhoneHeartbeatMissedEvent(source string, payload *events.PhoneHeartbeatMissedPayload) (cloudevents.Event, error) {
305+
return service.createEvent(events.PhoneHeartbeatMissed, source, payload)
306+
}
307+
274308
func (service *HeartbeatService) createPhoneHeartbeatDeadEvent(source string, payload *events.PhoneHeartbeatDeadPayload) (cloudevents.Event, error) {
275309
return service.createEvent(events.EventTypePhoneHeartbeatDead, source, payload)
276310
}

api/pkg/services/phone_notification_service.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,40 @@ func NewNotificationService(
4747
}
4848
}
4949

50+
// SendHeartbeatFCM sends a heartbeat message so the phone can request a heartbeat
51+
func (service *PhoneNotificationService) SendHeartbeatFCM(ctx context.Context, payload *events.PhoneHeartbeatMissedPayload) error {
52+
ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
53+
defer span.End()
54+
55+
phone, err := service.phoneRepository.LoadByID(ctx, payload.UserID, payload.PhoneID)
56+
if err != nil {
57+
msg := fmt.Sprintf("cannot load phone with userID [%s] and phoneID [%s]", payload.UserID, payload.PhoneID)
58+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
59+
}
60+
61+
if phone.FcmToken == nil {
62+
msg := fmt.Sprintf("phone with id [%s] has no FCM token", phone.ID)
63+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
64+
}
65+
66+
result, err := service.messagingClient.Send(ctx, &messaging.Message{
67+
Data: map[string]string{
68+
"KEY_HEARTBEAT_ID": time.Now().UTC().Format(time.RFC3339),
69+
},
70+
Android: &messaging.AndroidConfig{
71+
Priority: "high",
72+
},
73+
Token: *phone.FcmToken,
74+
})
75+
if err != nil {
76+
msg := fmt.Sprintf("cannot send heartbeat FCM to phone with id [%s]", phone.ID)
77+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
78+
}
79+
80+
ctxLogger.Info(fmt.Sprintf("successfully sent heartbeat FCM [%s] to phone with ID [%s] for user [%s] and monitor [%s]", result, payload.PhoneID, payload.UserID, payload.MonitorID))
81+
return nil
82+
}
83+
5084
// PhoneNotificationSendParams are parameters for sending a notification
5185
type PhoneNotificationSendParams struct {
5286
UserID entities.UserID
@@ -79,7 +113,8 @@ func (service *PhoneNotificationService) Send(ctx context.Context, params *Phone
79113
"KEY_MESSAGE_ID": params.MessageID.String(),
80114
},
81115
Android: &messaging.AndroidConfig{
82-
TTL: &ttl,
116+
Priority: "normal",
117+
TTL: &ttl,
83118
},
84119
Token: *phone.FcmToken,
85120
})

0 commit comments

Comments
 (0)