44from logic .message import messages_data_handler
55from core .constants import (
66 LONGPOLL_MIN ,
7- LONGPOLL_MAX
7+ LONGPOLL_MAX ,
8+ COLDWIRE_LEN_OFFSET ,
9+ SMP_TYPE ,
10+ PFS_TYPE ,
11+ MSG_TYPE ,
12+ XCHACHA20POLY1305_NONCE_LEN
813)
914from core .crypto import random_number_range
15+ from core .trad_crypto import (
16+ encrypt_xchacha20poly1305 ,
17+ decrypt_xchacha20poly1305
18+ )
19+ from base64 import b64decode
1020import copy
1121import logging
1222import json
1323
1424logger = logging .getLogger (__name__ )
1525
26+
27+ def decode_blob_stream (data : bytes ) -> list :
28+ messages = []
29+
30+ offset = 0
31+ while offset < len (data ):
32+ if offset + COLDWIRE_LEN_OFFSET > len (data ):
33+ raise ValueError ("Incomplete length prefix, malformed or corrupted data." )
34+
35+ msg_len = int .from_bytes (data [offset : offset + COLDWIRE_LEN_OFFSET ], "big" )
36+ offset += COLDWIRE_LEN_OFFSET
37+ if offset + msg_len > len (data ):
38+ raise ValueError ("Incomplete message data" )
39+
40+ messages .append (data [offset :offset + msg_len ])
41+ offset += msg_len
42+ return messages
43+
44+
45+ def parse_blobs (blobs : list [bytes ]) -> dict :
46+ parsed_messages = []
47+
48+ for raw in blobs :
49+ try :
50+ sender , blob = raw .split (b"\0 " , 1 )
51+ sender = sender .decode ("utf-8" )
52+ parsed_messages .append ({
53+ "sender" : sender ,
54+ "blob" : blob
55+ })
56+ except ValueError as e :
57+ logger .error ("Invalid message format! Error: %s" , str (e ))
58+ continue
59+
60+ return parsed_messages
61+
1662def background_worker (user_data , user_data_lock , ui_queue , stop_flag ):
1763 # Incase we received a SMP question request last time right before the background worker was about to exit
1864 smp_unanswered_questions (user_data , user_data_lock , ui_queue )
@@ -24,15 +70,22 @@ def background_worker(user_data, user_data_lock, ui_queue, stop_flag):
2470
2571 try :
2672 # Random longpoll number to help obfsucate traffic against analysis
27- response = http_request (f"{ server_url } /data/longpoll" , "GET" , auth_token = auth_token , longpoll = random_number_range (LONGPOLL_MIN , LONGPOLL_MAX ))
73+ response = http_request (
74+ f"{ server_url } /data/longpoll" ,
75+ "GET" ,
76+ auth_token = auth_token ,
77+ longpoll = random_number_range (LONGPOLL_MIN , LONGPOLL_MAX )
78+ )
2879 except TimeoutError :
2980 logger .debug ("Data longpoll request has timed out, retrying..." )
3081 continue
3182
32- # logger.debug("Data received: %s", json.dumps(response, indent = 2)[:2000])
83+ data = decode_blob_stream (response )
84+ data = parse_blobs (data )
3385
34- for message in response ["messages" ]:
35- logger .debug ("Received data message: %s" , json .dumps (message , indent = 2 )[:5000 ])
86+
87+ for message in data :
88+ logger .debug ("Received data: %s" , str (message )[:3000 ])
3689
3790 # Sanity check universal message fields
3891 if (not "sender" in message ) or (not message ["sender" ].isdigit ()) or (len (message ["sender" ]) != 16 ):
@@ -43,25 +96,78 @@ def background_worker(user_data, user_data_lock, ui_queue, stop_flag):
4396
4497 continue
4598
99+ sender = message ["sender" ]
100+ blob = message ["blob" ]
101+
46102 with user_data_lock :
47103 user_data_copied = copy .deepcopy (user_data )
48104
49- if message ["data_type" ] == "smp" :
50- smp_data_handler (user_data , user_data_lock , user_data_copied , ui_queue , message )
105+ # Everything from here is not validated by server
106+
107+ blob_plaintext = None
108+
109+ if sender in user_data_copied ["contacts" ]:
110+ chacha_key = user_data ["contacts" ][sender ]["lt_sign_key_smp" ]["tmp_key" ]
111+ contact_next_strand_nonce = user_data ["contacts" ][sender ]["contact_next_strand_nonce" ]
112+
113+ if chacha_key is not None :
114+ chacha_key = b64decode (user_data ["contacts" ][sender ]["lt_sign_key_smp" ]["tmp_key" ])
115+
116+ try :
117+ try :
118+ blob_plaintext = decrypt_xchacha20poly1305 (chacha_key , blob [:XCHACHA20POLY1305_NONCE_LEN ], blob [XCHACHA20POLY1305_NONCE_LEN :])
119+ except Exception as e :
120+ logger .debug ("Failed to decrypt blob from contact (%s) probably due to invalid nonce: %s" , sender , str (e ))
121+ if contact_next_strand_nonce is None :
122+ raise Exception ("Unable to decrypt apparent SMP request due to missing contact strand nonce." )
123+
124+ blob_plaintext = decrypt_xchacha20poly1305 (chacha_key , contact_next_strand_nonce , blob )
125+
126+ except Exception as e :
127+ logger .error ("Failed to decrypt blob from contact (%s) with error: %s" , sender , str (e ))
128+ else :
129+ chacha_key = user_data ["contacts" ][sender ]["contact_strand_key" ]
130+
131+ if (chacha_key is None ) and (contact_next_strand_nonce is None ):
132+ # just assume at this point that it's not encrypted.
133+ blob_plaintext = blob
134+ else :
135+ # Under known laws of physics, this should never fail. Unless the contact is acting funny on purpose / invalid implementation of Coldwire + strandlock protocol.
136+ try :
137+ blob_plaintext = decrypt_xchacha20poly1305 (chacha_key , contact_next_strand_nonce , blob )
138+ except Exception as e :
139+ logger .error (
140+ "Impossible error: Failed to decrypt blob from contact (%s)"
141+ "We dont know what caused this, maybe the contact is trying to denial-of-service you"
142+ ". Skipping data because of error: %s" , sender , str (e )
143+ )
144+ continue
145+ else :
146+ logger .debug ("Contact (%s) not saved.. we just gonna assume blob_plaintext = blob" , sender )
147+ blob_plaintext = blob
148+
149+
150+ # SMP
151+ if bytes ([blob_plaintext [0 ]]) == SMP_TYPE :
152+ smp_data_handler (user_data , user_data_lock , user_data_copied , ui_queue , sender , blob_plaintext [1 :])
51153
52- elif message ["data_type" ] == "pfs" :
53- pfs_data_handler (user_data , user_data_lock , user_data_copied , ui_queue , message )
154+ # PFS
155+ elif bytes ([blob_plaintext [0 ]]) == PFS_TYPE :
156+ pfs_data_handler (user_data , user_data_lock , user_data_copied , ui_queue , sender , blob_plaintext [1 :])
157+
158+ # MSG
159+ elif bytes ([blob_plaintext [0 ]]) == MSG_TYPE :
160+ messages_data_handler (user_data , user_data_lock , user_data_copied , ui_queue , sender , blob_plaintext [1 :])
54161
55- elif message ["data_type" ] == "message" :
56- messages_data_handler (user_data , user_data_lock , user_data_copied , ui_queue , message )
57162 else :
58163 logger .error (
59- "Impossible condition, either you have discovered a bug in Coldwire, or the server is attempting to denial-of-service you. Skipping data message with unknown data type (%s)..." ,
60- message ["data_type" ]
164+ "Skipping data with unknown data type (%d) from contact (%s)..." ,
165+ bytes ([blob_plaintext [0 ]]),
166+ sender
61167 )
62168
63169 # *Sigh* I had to put this here because if we rotate before finishing reading all of the messages
64- # we would literally overwrite our own key.
170+ # we would overwrite our own key.
65171 # TODO: We need to keep the last used key and use it when decapsulation with new key gives invalid output
66172 # because it might actually take some time for our keys to be uploaded to server + other servers, and to the contact.
67173 #
0 commit comments