|
18 | 18 | from core.trad_crypto import ( |
19 | 19 | decrypt_xchacha20poly1305 |
20 | 20 | ) |
21 | | -from base64 import urlsafe_b64encode |
| 21 | +from base64 import b64decode, urlsafe_b64encode |
22 | 22 | import copy |
23 | 23 | import logging |
24 | 24 |
|
@@ -67,17 +67,43 @@ def parse_blobs(blobs: list[bytes]) -> dict: |
67 | 67 | return parsed_messages |
68 | 68 |
|
69 | 69 | def background_worker(user_data, user_data_lock, ui_queue, stop_flag): |
70 | | - # Incase we received a SMP question request last time and user did not answer it. |
71 | | - # NOTE: this is not needed anymore, as we have implemented acknowlegements |
72 | | - # smp_unanswered_questions(user_data, user_data_lock, ui_queue) |
73 | 70 |
|
74 | 71 | # Acknowledgements |
75 | 72 | acks = {} |
76 | 73 |
|
77 | 74 | while not stop_flag.is_set(): |
| 75 | + |
78 | 76 | with user_data_lock: |
79 | 77 | server_url = user_data["server_url"] |
80 | 78 | auth_token = user_data["token"] |
| 79 | + session_headers = user_data["tmp"]["session_headers"] |
| 80 | + |
| 81 | + user_data_copied = copy.deepcopy(user_data) |
| 82 | + |
| 83 | + |
| 84 | + for i, v in user_data_copied["contacts"].items(): |
| 85 | + for msg_payload in v["staged_messages"]: |
| 86 | + try: |
| 87 | + http_request(f"{server_url}/data/send", "POST", metadata = { |
| 88 | + "recipient": i |
| 89 | + }, |
| 90 | + blob = b64decode(msg_payload), |
| 91 | + headers = session_headers, |
| 92 | + auth_token = auth_token |
| 93 | + ) |
| 94 | + logger.info("Successfuly recovered and sent the message to contact (%s)", i) |
| 95 | + |
| 96 | + with user_data_lock: |
| 97 | + user_data["contacts"][i]["staged_messages"].pop(0) |
| 98 | + |
| 99 | + if not user_data["contacts"][i]["staged_messages"]: |
| 100 | + user_data["contacts"][i]["locked"] = False |
| 101 | + |
| 102 | + except Exception as e: |
| 103 | + logger.error("Failed to send recovered message to contact (%s), error: %s", i, str(e)) |
| 104 | + break |
| 105 | + |
| 106 | + |
81 | 107 |
|
82 | 108 | try: |
83 | 109 | # Random longpoll number to help obfsucate traffic against analysis |
@@ -109,20 +135,28 @@ def background_worker(user_data, user_data_lock, ui_queue, stop_flag): |
109 | 135 | sender = message["sender"] |
110 | 136 | blob = message["blob"] |
111 | 137 |
|
112 | | - ack_id = urlsafe_b64encode(message["ack_id"]).decode().rstrip("=") |
113 | | - if "acks" not in acks: |
114 | | - acks["acks"] = [ack_id] |
115 | | - else: |
116 | | - acks["acks"].append(ack_id) |
117 | | - |
118 | 138 | with user_data_lock: |
119 | 139 | try: |
120 | | - user_data["contacts"][sender]["locked"] = True |
| 140 | + |
| 141 | + if user_data["contacts"][sender]["locked"] is False: |
| 142 | + user_data["contacts"][sender]["locked"] = True |
| 143 | + else: |
| 144 | + logger.info("Skipping data message from contact (%s) as contact is locked.", contact_id) |
| 145 | + continue |
| 146 | + |
121 | 147 | except Exception: |
122 | 148 | pass |
123 | 149 |
|
124 | 150 | user_data_copied = copy.deepcopy(user_data) |
125 | 151 |
|
| 152 | + |
| 153 | + |
| 154 | + ack_id = urlsafe_b64encode(message["ack_id"]).decode().rstrip("=") |
| 155 | + if "acks" not in acks: |
| 156 | + acks["acks"] = [ack_id] |
| 157 | + else: |
| 158 | + acks["acks"].append(ack_id) |
| 159 | + |
126 | 160 | # Everything from here is not validated by server |
127 | 161 |
|
128 | 162 | blob_plaintext = None |
|
0 commit comments