3
3
from .. import config
4
4
from ..config import logger
5
5
from ..core import SUBSCRIBE
6
- from .util import encrypt_notify_payload , derive_notifier_key , warn_on_except , NotifyStats
6
+ from .util import (
7
+ encrypt_notify_payload ,
8
+ derive_notifier_key ,
9
+ warn_on_except ,
10
+ NotifyStats ,
11
+ )
7
12
8
- from pyfcm import FCMNotification
13
+ import firebase_admin
14
+ from firebase_admin import messaging
15
+ from firebase_admin .exceptions import *
9
16
10
17
import oxenc
11
18
from oxenmq import OxenMQ , Message , Address , AuthLevel
12
19
20
+ import asyncio
13
21
import datetime
14
22
import time
15
23
import json
16
24
import signal
17
25
import systemd .daemon
18
26
from threading import Lock
19
27
28
+ loop = None
20
29
omq = None
21
30
hivemind = None
22
31
firebase_app = None
@@ -75,18 +84,17 @@ def push_notification(msg: Message):
75
84
# data-too-big messages and config updates which definitely won't notify.
76
85
priority = "high" if b"~" in data and data [b"n" ] in (0 , 11 ) else "normal"
77
86
78
- device_token = data [b"&" ].decode () # unique service id, as we returned from validate
87
+ # unique service id, as we returned from validate
88
+ device_token = data [b"&" ].decode ()
79
89
80
- msg = {
81
- "fcm_token" : device_token ,
82
- "data_payload" : {
90
+ msg = messaging .Message (
91
+ data = {
83
92
"enc_payload" : oxenc .to_base64 (enc_payload ),
84
- "spns" : f"{ SPNS_FIREBASE_VERSION } "
93
+ "spns" : f"{ SPNS_FIREBASE_VERSION } " ,
85
94
},
86
- "android_config" : {
87
- "priority" : priority ,
88
- },
89
- }
95
+ token = device_token ,
96
+ android = messaging .AndroidConfig (priority = priority ),
97
+ )
90
98
91
99
global notify_queue , queue_lock
92
100
with queue_lock :
@@ -95,27 +103,38 @@ def push_notification(msg: Message):
95
103
96
104
@warn_on_except
97
105
def send_pending ():
98
- global notify_queue , queue_lock , firebase_app , stats
106
+ global notify_queue , queue_lock , firebase_app , loop , stats
99
107
with queue_lock :
100
108
queue , notify_queue = notify_queue , []
101
109
102
110
i = 0
111
+ results = []
103
112
while i < len (queue ):
104
- results = firebase_app .async_notify_multiple_devices (params_list = queue [i : i + MAX_NOTIFIES ])
105
- with stats .lock :
106
- stats .notifies += min (len (queue ) - i , MAX_NOTIFIES )
113
+ results .append (
114
+ asyncio .run_coroutine_threadsafe (
115
+ messaging .send_each_async (
116
+ messages = queue [i : i + MAX_NOTIFIES ], app = firebase_app
117
+ ),
118
+ loop ,
119
+ )
120
+ )
121
+ i += MAX_NOTIFIES
107
122
108
- # FIXME: process/reschedule failures?
123
+ results = [f .result () for f in results ]
124
+ # FIXME: process/reschedule failures?
109
125
110
- i += MAX_NOTIFIES
126
+ with stats .lock :
127
+ stats .notifies += len (queue )
111
128
112
129
113
130
@warn_on_except
114
131
def ping ():
115
132
"""Makes sure we are registered and reports updated stats to hivemind; called every few seconds"""
116
133
global omq , hivemind , stats
117
134
omq .send (hivemind , "admin.register_service" , "firebase" )
118
- omq .send (hivemind , "admin.service_stats" , "firebase" , oxenc .bt_serialize (stats .collect ()))
135
+ omq .send (
136
+ hivemind , "admin.service_stats" , "firebase" , oxenc .bt_serialize (stats .collect ())
137
+ )
119
138
systemd .daemon .notify (
120
139
f"WATCHDOG=1\n STATUS=Running; { stats .total_notifies } notifications, "
121
140
f"{ stats .total_retries } retries, { stats .total_failures } failures"
@@ -150,11 +169,13 @@ def start():
150
169
omq .start ()
151
170
152
171
hivemind = omq .connect_remote (
153
- Address (config .config .hivemind_sock ), auth_level = AuthLevel .basic , ephemeral_routing_id = False
172
+ Address (config .config .hivemind_sock ),
173
+ auth_level = AuthLevel .basic ,
174
+ ephemeral_routing_id = False ,
154
175
)
155
176
156
- firebase_app = FCMNotification (
157
- service_account_file = conf ["token_file" ], project_id = "loki-5a81e"
177
+ firebase_app = firebase_admin . initialize_app (
178
+ firebase_admin . credentials . Certificate ( conf ["token_file" ])
158
179
)
159
180
160
181
omq .send (hivemind , "admin.register_service" , "firebase" )
@@ -177,14 +198,15 @@ def disconnect(flush_pending=True):
177
198
def run (startup_delay = 4.0 ):
178
199
"""Runs the firebase notifier, forever."""
179
200
180
- global omq
201
+ global omq , loop
181
202
182
203
if startup_delay > 0 :
183
204
time .sleep (startup_delay )
184
205
185
206
logger .info ("Starting firebase notifier" )
186
207
systemd .daemon .notify ("STATUS=Initializing firebase notifier..." )
187
208
try :
209
+ loop = asyncio .new_event_loop ()
188
210
start ()
189
211
except Exception as e :
190
212
logger .critical (f"Failed to start firebase notifier: { e } " )
@@ -194,16 +216,17 @@ def run(startup_delay=4.0):
194
216
systemd .daemon .notify ("READY=1\n STATUS=Started" )
195
217
196
218
def sig_die (signum , frame ):
219
+ loop .stop ()
197
220
raise OSError (f"Caught signal { signal .Signals (signum ).name } " )
198
221
199
222
try :
200
223
signal .signal (signal .SIGHUP , sig_die )
201
224
signal .signal (signal .SIGINT , sig_die )
202
225
203
- while omq is not None :
204
- time . sleep ( 3600 )
226
+ loop . run_forever ()
227
+
205
228
except Exception as e :
206
- logger .error (f"firebase notifier mule died via exception: { e } " )
229
+ logger .error (f"firebase notifier died via exception: { e } " )
207
230
208
231
209
232
if __name__ == "__main__" :
0 commit comments