1
1
from request import sogs_get , sogs_post
2
- from sogs import crypto
3
-
4
- from base64 import b64encode
2
+ from sogs import crypto , config
3
+ from sogs .hashing import blake2b
4
+ from sogs .utils import encode_base64
5
+ from sogs .model .user import SystemUser
6
+ import nacl .bindings as salt
7
+ from nacl .utils import random
8
+ import time
5
9
6
10
7
11
def test_dm_default_empty (client , blind_user ):
@@ -15,34 +19,52 @@ def test_dm_banned_user(client, banned_user):
15
19
assert r .status_code == 403
16
20
17
21
18
- def make_post (data , user ):
19
- privkey = crypto .compute_derived_key_bytes (user .privkey .encode ())
20
- sig = crypto .xed25519_sign (privkey , data )
21
- return {'message' : b64encode (data ).decode ('ascii' ), 'signature' : b64encode (sig ).decode ('ascii' )}
22
+ def make_post (message , sender , to ):
23
+ assert sender .is_blinded
24
+ assert to .is_blinded
25
+ a = sender .ed_key .to_curve25519_private_key ().encode ()
26
+ kA = bytes .fromhex (sender .session_id [2 :])
27
+ kB = bytes .fromhex (to .session_id [2 :])
28
+ key = blake2b (salt .crypto_scalarmult_ed25519_noclamp (a , kB ) + kA + kB , digest_size = 32 )
29
+
30
+ # MESSAGE || UNBLINDED_ED_PUBKEY
31
+ plaintext = message + sender .ed_key .verify_key .encode ()
32
+ nonce = random (24 )
33
+ ciphertext = salt .crypto_aead_xchacha20poly1305_ietf_encrypt (
34
+ plaintext , aad = None , nonce = nonce , key = key )
35
+ data = b'\x00 ' + ciphertext + nonce
36
+ return {'message' : encode_base64 (data )}
22
37
23
38
24
- def test_dm_send_from_bannend_user (client , blind_user , banned_user ):
39
+ def test_dm_send_from_banned_user (client , blind_user , blind_user2 ):
40
+ blind_user2 .ban (banned_by = SystemUser ())
25
41
r = sogs_post (
26
- client , f'/inbox/{ blind_user .session_id } ' , make_post (b'beep' , banned_user ), banned_user
42
+ client , f'/inbox/{ blind_user .session_id } ' , make_post (b'beep' , sender = blind_user2 , to = blind_user ), blind_user2
27
43
)
28
44
assert r .status_code == 403
29
45
30
46
31
- def test_dm_send_to_bannend_user (client , blind_user , banned_user ):
47
+ def test_dm_send_to_banned_user (client , blind_user , blind_user2 ):
48
+ blind_user2 .ban (banned_by = SystemUser ())
32
49
r = sogs_post (
33
- client , f'/inbox/{ banned_user .session_id } ' , make_post (b'beep' , blind_user ), blind_user
50
+ client , f'/inbox/{ blind_user2 .session_id } ' , make_post (b'beep' , sender = blind_user , to = blind_user2 ), blind_user
34
51
)
35
52
assert r .status_code == 404
36
53
37
54
38
55
def test_dm_send (client , blind_user , blind_user2 ):
39
- post = make_post (b'bep' , blind_user )
56
+ post = make_post (b'bep' , sender = blind_user , to = blind_user2 )
40
57
r = sogs_post (client , f'/inbox/{ blind_user2 .session_id } ' , post , blind_user )
41
58
assert r .status_code == 201
42
59
r = sogs_get (client , '/inbox' , blind_user2 )
43
60
assert r .status_code == 200
44
- for entry in r .json :
45
- if entry ['message' ] == post ['message' ]:
46
- break
47
- else :
48
- assert False
61
+ assert len (r .json ) == 1
62
+ data = r .json [0 ]
63
+ now = time .time ()
64
+ assert - 1 < data .pop ('posted_at' ) - time .time () < 1
65
+ assert - 1 < data .pop ('expires_at' ) - config .DM_EXPIRY_DAYS * 86400 - time .time () < 1
66
+ assert data == {
67
+ 'id' : 1 ,
68
+ 'message' : post ['message' ],
69
+ 'sender' : blind_user .session_id ,
70
+ }
0 commit comments