-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathcrypto_auth_provider.py
More file actions
62 lines (50 loc) · 2.16 KB
/
crypto_auth_provider.py
File metadata and controls
62 lines (50 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# Put this file in a place accessible by the synapse server
# Enable it in homeserver's config with:
# password_providers:
# - module: 'crypto_auth_provider.CryptoAuthProvider'
# config:
# enabled: true
# If desired, disable registration, to only allow auth through this provider:
# enable_registration: false
import logging
import time
from twisted.internet import defer
import pysodium
__version__ = "0.1"
logger = logging.getLogger(__name__)
class CryptoAuthProvider:
__version__ = "0.1"
def __init__(self, config, account_handler):
self.account_handler = account_handler
self.config = config
self.hs_hostname = self.account_handler.hs.hostname
self.log = logging.getLogger(__name__)
@defer.inlineCallbacks
def check_password(self, user_id: str, password: str):
public_key_hash = bytes.fromhex(user_id.split(":", 1)[0][1:])
#signature_type = password.split(":")[0]
signature = bytes.fromhex(password.split(":")[1])
public_key = bytes.fromhex(password.split(":")[2])
public_key_digest = pysodium.crypto_generichash(public_key, outlen=20)
if public_key_hash.hex() == public_key_digest.hex():
try:
message_digest = pysodium.crypto_generichash(
u"login:{}".format(int(time.time()/(5*60))).encode(), outlen=32)
pysodium.crypto_sign_verify_detached(
signature, message_digest, public_key)
if not (yield self.account_handler.check_user_exists(user_id)):
self.log.info(
"First user login, registering: user=%r", user_id.lower())
yield self.account_handler.register(localpart=public_key_digest.hex())
defer.returnValue(True)
except Exception as exception:
self.log.info(
"Got exception while verifying signature: "+str(exception))
defer.returnValue(False)
else:
self.log.info(
"pubkey hash did not match pubkey")
defer.returnValue(False)
@staticmethod
def parse_config(config):
return config