Skip to content

Commit 4388bfc

Browse files
authored
Merge pull request #7 from OpenMatchmaking/feature-migrate-to-amqp
Migration of HTTP API onto AMQP-based approach
2 parents 864ce50 + c9285d8 commit 4388bfc

27 files changed

+1273
-1117
lines changed

auth/app/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
from sanic_amqp_ext import AmqpExtension
66

77
from app.rabbitmq.workers import RegisterMicroserviceWorker
8-
from app.token.api.blueprints import token_bp
9-
from app.users.api.blueprints import users_bp_v1
8+
from app.token.api.workers.generate_token import GenerateTokenWorker
9+
from app.token.api.workers.refresh_token import RefreshTokenWorker
10+
from app.token.api.workers.verify_token import VerifyTokenWorker
11+
from app.users.api.workers.register_game_client import RegisterGameClientWorker
12+
from app.users.api.workers.user_profile import UserProfileWorker
1013

1114

1215
app = Sanic('microservice-auth')
@@ -21,13 +24,16 @@
2124

2225
# RabbitMQ workers
2326
app.amqp.register_worker(RegisterMicroserviceWorker(app))
27+
app.amqp.register_worker(GenerateTokenWorker(app))
28+
app.amqp.register_worker(RefreshTokenWorker(app))
29+
app.amqp.register_worker(VerifyTokenWorker(app))
30+
app.amqp.register_worker(RegisterGameClientWorker(app))
31+
app.amqp.register_worker(UserProfileWorker(app))
2432

2533

2634
# Public API
2735
async def health_check(request):
2836
return text('OK')
2937

3038

31-
app.blueprint(token_bp)
32-
app.blueprint(users_bp_v1)
3339
app.add_route(health_check, '/auth/api/health-check', methods=['GET', ], name='health-check')

auth/app/commands/run_tests.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class RunTestsCommand(Command):
1414

1515
option_list = (
1616
Option('--app-name', '-a', dest='application', default='app'),
17+
Option('--tests', '-t', dest='tests', default=None),
1718
)
1819

1920
def setup_environ_for_pytest_cov(self):
@@ -25,5 +26,9 @@ def setup_environ_for_pytest_cov(self):
2526
def run(self, *args, **kwargs):
2627
app = kwargs.get('application')
2728
self.setup_environ_for_pytest_cov()
28-
pytest.main(args=["-q", "-v", "--cov", app, "--cov-report",
29-
"term-missing", "--tb=native"])
29+
pytest_options = ["-q", "-v", "--cov", app, "--cov-report", "term-missing", "--tb=native"]
30+
31+
if kwargs.get('tests') is not None:
32+
pytest_options.insert(0, kwargs['tests'])
33+
34+
pytest.main(args=pytest_options)

auth/app/generic/views.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

auth/app/token/api/blueprints.py

Lines changed: 0 additions & 9 deletions
This file was deleted.

auth/app/token/api/schemas.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,41 @@ class Meta:
2626
)
2727

2828

29+
class VerifyTokenSchema(Schema):
30+
31+
access_token = String(
32+
load_only=True,
33+
required=True,
34+
allow_none=False,
35+
description='Access token.',
36+
validate=validate.Length(min=1, error='Field cannot be blank.')
37+
)
38+
39+
class Meta:
40+
fields = (
41+
'access_token',
42+
)
43+
44+
2945
class RefreshTokenSchema(Schema):
3046

47+
access_token = String(
48+
load_only=True,
49+
required=True,
50+
allow_none=False,
51+
description='Access token.',
52+
validate=validate.Length(min=1, error='Field cannot be blank.')
53+
)
3154
refresh_token = String(
3255
load_only=True,
3356
required=True,
3457
allow_none=False,
35-
description='Refresh token',
58+
description='Refresh token.',
3659
validate=validate.Length(min=1, error='Field cannot be blank.')
3760
)
3861

3962
class Meta:
4063
fields = (
64+
'access_token',
4165
'refresh_token',
4266
)

auth/app/token/api/views.py

Lines changed: 0 additions & 122 deletions
This file was deleted.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import json
2+
3+
from aioamqp import AmqpClosedConnection
4+
from marshmallow import ValidationError
5+
from sanic_amqp_ext import AmqpWorker
6+
from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR
7+
from sage_utils.wrappers import Response
8+
9+
10+
from app.token.json_web_token import build_payload, generate_token_pair
11+
12+
13+
class GenerateTokenWorker(AmqpWorker):
14+
QUEUE_NAME = 'auth.token.new'
15+
REQUEST_EXCHANGE_NAME = 'open-matchmaking.auth.token.new.direct'
16+
RESPONSE_EXCHANGE_NAME = 'open-matchmaking.responses.direct'
17+
CONTENT_TYPE = 'application/json'
18+
19+
def __init__(self, app, *args, **kwargs):
20+
super(GenerateTokenWorker, self).__init__(app, *args, **kwargs)
21+
from app.users.documents import User
22+
from app.token.api.schemas import LoginSchema
23+
self.user_document = User
24+
self.schema = LoginSchema
25+
26+
def validate_data(self, raw_data):
27+
try:
28+
data = json.loads(raw_data.strip())
29+
except json.decoder.JSONDecodeError:
30+
data = {}
31+
32+
deserializer = self.schema()
33+
result = deserializer.load(data)
34+
if result.errors:
35+
raise ValidationError(result.errors)
36+
37+
return result.data
38+
39+
async def generate_token(self, raw_data):
40+
try:
41+
data = self.validate_data(raw_data)
42+
except ValidationError as exc:
43+
return Response.from_error(VALIDATION_ERROR, exc.normalized_messages())
44+
45+
user = await self.user_document.find_one({"username": data["username"]})
46+
if not user or (user and not user.verify_password(data["password"])):
47+
return Response.from_error(
48+
NOT_FOUND_ERROR, "User wasn't found or specified an invalid password."
49+
)
50+
51+
payload = build_payload(self.app, extra_data={"user_id": str(user.pk)})
52+
response = await generate_token_pair(self.app, payload, user.username)
53+
return Response.with_content(response)
54+
55+
async def process_request(self, channel, body, envelope, properties):
56+
response = await self.generate_token(body)
57+
response.data[Response.EVENT_FIELD_NAME] = properties.correlation_id
58+
59+
if properties.reply_to:
60+
await channel.publish(
61+
json.dumps(response.data),
62+
exchange_name=self.RESPONSE_EXCHANGE_NAME,
63+
routing_key=properties.reply_to,
64+
properties={
65+
'content_type': self.CONTENT_TYPE,
66+
'delivery_mode': 2,
67+
'correlation_id': properties.correlation_id
68+
},
69+
mandatory=True
70+
)
71+
72+
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
73+
74+
async def consume_callback(self, channel, body, envelope, properties):
75+
self.app.loop.create_task(self.process_request(channel, body, envelope, properties))
76+
77+
async def run(self, *args, **kwargs):
78+
try:
79+
_transport, protocol = await self.connect()
80+
except AmqpClosedConnection as exc:
81+
print(exc)
82+
return
83+
84+
channel = await protocol.channel()
85+
await channel.queue_declare(
86+
queue_name=self.QUEUE_NAME,
87+
durable=True,
88+
passive=False,
89+
auto_delete=False
90+
)
91+
await channel.queue_bind(
92+
queue_name=self.QUEUE_NAME,
93+
exchange_name=self.REQUEST_EXCHANGE_NAME,
94+
routing_key=self.QUEUE_NAME
95+
)
96+
await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
97+
await channel.basic_consume(self.consume_callback, queue_name=self.QUEUE_NAME)

0 commit comments

Comments
 (0)