1
+ import jwt
1
2
import pytest
2
3
from aiohttp import web
3
- from aiohttp_security import AbstractAuthorizationPolicy
4
+
4
5
from aiohttp_security import setup as _setup
5
- from aiohttp_security . jwt_identity import JWTIdentityPolicy
6
+ from aiohttp_security import AbstractAuthorizationPolicy
6
7
from aiohttp_security .api import IDENTITY_KEY
7
- import jwt
8
+ from aiohttp_security .jwt_identity import JWTIdentityPolicy
9
+
10
+
11
+ @pytest .fixture
12
+ def make_token ():
13
+ def factory (payload , secret ):
14
+ return jwt .encode (
15
+ payload ,
16
+ secret ,
17
+ algorithm = 'HS256' ,
18
+ )
19
+
20
+ return factory
8
21
9
22
10
23
class Autz (AbstractAuthorizationPolicy ):
@@ -22,35 +35,48 @@ async def test_no_pyjwt_installed(mocker):
22
35
JWTIdentityPolicy ('secret' )
23
36
24
37
25
- async def test_identify (loop , test_client ):
38
+ async def test_identify (loop , make_token , test_client ):
26
39
kwt_secret_key = 'Key'
27
40
28
- async def create (request ):
29
- response = web .Response ()
30
- data = await request .post ()
41
+ token = make_token ({'login' : 'Andrew' }, kwt_secret_key )
31
42
32
- encoded_identity = jwt .encode ({'identity' : data ['login' ]},
33
- kwt_secret_key ,
34
- algorithm = 'HS256' )
43
+ async def check (request ):
44
+ policy = request .app [IDENTITY_KEY ]
45
+ identity = await policy .identify (request )
46
+ assert 'Andrew' == identity ['login' ]
47
+ return web .Response ()
35
48
36
- response .text = encoded_identity .decode ('utf-8' )
37
- return response
49
+ app = web .Application (loop = loop )
50
+ _setup (app , JWTIdentityPolicy (kwt_secret_key ), Autz ())
51
+ app .router .add_route ('GET' , '/' , check )
52
+
53
+ client = await test_client (app )
54
+ headers = {'Authorization' : 'Bearer {}' .format (token .decode ('utf-8' ))}
55
+ resp = await client .get ('/' , headers = headers )
56
+ assert 200 == resp .status
57
+
58
+
59
+ async def test_identify_broken_scheme (loop , make_token , test_client ):
60
+ kwt_secret_key = 'Key'
61
+
62
+ token = make_token ({'login' : 'Andrew' }, kwt_secret_key )
38
63
39
64
async def check (request ):
40
65
policy = request .app [IDENTITY_KEY ]
41
- user_id = await policy .identify (request )
42
- assert 'Andrew' == user_id
66
+
67
+ try :
68
+ await policy .identify (request )
69
+ except ValueError as exc :
70
+ raise web .HTTPBadRequest (reason = exc )
71
+
43
72
return web .Response ()
44
73
45
74
app = web .Application (loop = loop )
46
75
_setup (app , JWTIdentityPolicy (kwt_secret_key ), Autz ())
47
76
app .router .add_route ('GET' , '/' , check )
48
- app . router . add_route ( 'POST' , '/' , create )
77
+
49
78
client = await test_client (app )
50
- resp = await client .post ('/' , data = {'login' : 'Andrew' })
51
- jwt_token = await resp .content .read ()
52
- assert 200 == resp .status
53
- await resp .release ()
54
- headers = {'Authorization' : str (jwt_token .decode ('utf-8' ))}
79
+ headers = {'Authorization' : 'Token {}' .format (token .decode ('utf-8' ))}
55
80
resp = await client .get ('/' , headers = headers )
56
- assert 200 == resp .status
81
+ assert 400 == resp .status
82
+ assert 'Invalid authorization scheme' in resp .reason
0 commit comments