Skip to content

Commit b560d40

Browse files
authored
Merge pull request #4 from OpenMatchmaking/feature-update-game-server
Updating available slots of the game server
2 parents 0eb0519 + da5408d commit b560d40

File tree

5 files changed

+320
-2
lines changed

5 files changed

+320
-2
lines changed

game-servers-pool/app/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from sanic_mongodb_ext import MongoDbExtension
44
from sanic_amqp_ext import AmqpExtension
55

6-
from app.workers import GetServerWorker, RegisterServerWorker
6+
from app.workers import GetServerWorker, RegisterServerWorker, UpdateServerWorker
77

88

99
app = Sanic('microservice-auth')
@@ -17,7 +17,7 @@
1717
# RabbitMQ workers
1818
app.amqp.register_worker(GetServerWorker(app))
1919
app.amqp.register_worker(RegisterServerWorker(app))
20-
20+
app.amqp.register_worker(UpdateServerWorker(app))
2121

2222
# Public API
2323
async def health_check(request):

game-servers-pool/app/game_servers/schemas.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@ class RequestGetServerSchema(Schema):
2727
]
2828
)
2929

30+
class Meta:
31+
ordered = True
32+
3033

3134
class RetrieveGameServerSchema(GameServer.schema.as_marshmallow_schema()):
3235

3336
class Meta:
3437
model = GameServer
38+
ordered = True
3539
fields = (
3640
'host',
3741
'port',
@@ -87,6 +91,7 @@ def validate_id(self, value):
8791

8892
class Meta:
8993
model = GameServer
94+
ordered = True
9095
fields = (
9196
'id',
9297
'host',
@@ -95,3 +100,51 @@ class Meta:
95100
'credentials',
96101
'game_mode',
97102
)
103+
104+
105+
class UpdateGameServerSchema(Schema):
106+
id = fields.String(
107+
required=True
108+
)
109+
freed_slots = fields.Integer(
110+
load_from="freed-slots",
111+
allow_none=False,
112+
required=True,
113+
validate=[
114+
validate.Range(min=1, error="The value must be positive integer.")
115+
]
116+
)
117+
118+
@validates('id')
119+
def validate_id(self, value):
120+
if not ObjectId.is_valid(value):
121+
raise ValidationError(
122+
"'{}' is not a valid ObjectId, it must be a 12-byte "
123+
"input or a 24-character hex string.".format(value)
124+
)
125+
126+
class Meta:
127+
model = GameServer
128+
ordered = True
129+
fields = (
130+
'id',
131+
'freed_slots',
132+
)
133+
134+
135+
class SimpleGameServerSchema(Schema):
136+
id = fields.String(
137+
dump_only=True
138+
)
139+
available_slots = fields.Integer(
140+
dump_only=True,
141+
dump_to="available-slots",
142+
)
143+
144+
class Meta:
145+
model = GameServer
146+
ordered = True
147+
fields = (
148+
'id',
149+
'available_slots',
150+
)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from app.workers.get_server import GetServerWorker # NOQA
22
from app.workers.microservice_register import MicroserviceRegisterWorker # NOQA
33
from app.workers.register_server import RegisterServerWorker # NOQA
4+
from app.workers.update_server import UpdateServerWorker # NOQA
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import json
2+
3+
from aioamqp import AmqpClosedConnection
4+
from bson import ObjectId
5+
from marshmallow import ValidationError
6+
from sanic_amqp_ext import AmqpWorker
7+
from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR
8+
from sage_utils.wrappers import Response
9+
10+
11+
class UpdateServerWorker(AmqpWorker):
12+
QUEUE_NAME = 'game-servers-pool.server.update'
13+
REQUEST_EXCHANGE_NAME = 'open-matchmaking.game-server-pool.server.update.direct'
14+
RESPONSE_EXCHANGE_NAME = 'open-matchmaking.responses.direct'
15+
CONTENT_TYPE = 'application/json'
16+
17+
def __init__(self, app, *args, **kwargs):
18+
super(UpdateServerWorker, self).__init__(app, *args, **kwargs)
19+
from app.game_servers.documents import GameServer
20+
from app.game_servers.schemas import UpdateGameServerSchema, SimpleGameServerSchema
21+
self.game_server_document = GameServer
22+
self.schema = UpdateGameServerSchema
23+
self.response_schema = SimpleGameServerSchema
24+
25+
async def validate_data(self, raw_data):
26+
try:
27+
data = json.loads(raw_data.strip())
28+
except json.decoder.JSONDecodeError:
29+
data = {}
30+
deserializer = self.schema()
31+
result = deserializer.load(data)
32+
if result.errors:
33+
raise ValidationError(result.errors)
34+
35+
return result.data
36+
37+
async def update_game_server(self, raw_data):
38+
try:
39+
data = await self.validate_data(raw_data)
40+
except ValidationError as exc:
41+
return Response.from_error(VALIDATION_ERROR, exc.normalized_messages())
42+
43+
document_id = ObjectId(data['id'])
44+
document = await self.game_server_document.find_one({'_id': document_id})
45+
46+
if not document:
47+
return Response.from_error(
48+
NOT_FOUND_ERROR,
49+
"The requested game server was not found."
50+
)
51+
52+
document.available_slots += data['freed_slots']
53+
await document.commit()
54+
55+
serializer = self.response_schema()
56+
return Response.with_content(serializer.dump(document).data)
57+
58+
async def process_request(self, channel, body, envelope, properties):
59+
response = await self.update_game_server(body)
60+
response.data[Response.EVENT_FIELD_NAME] = properties.correlation_id
61+
62+
if properties.reply_to:
63+
await channel.publish(
64+
json.dumps(response.data),
65+
exchange_name=self.RESPONSE_EXCHANGE_NAME,
66+
routing_key=properties.reply_to,
67+
properties={
68+
'content_type': self.CONTENT_TYPE,
69+
'delivery_mode': 2,
70+
'correlation_id': properties.correlation_id
71+
},
72+
mandatory=True
73+
)
74+
75+
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
76+
77+
async def consume_callback(self, channel, body, envelope, properties):
78+
self.app.loop.create_task(self.process_request(channel, body, envelope, properties))
79+
80+
async def run(self, *args, **kwargs):
81+
try:
82+
_transport, protocol = await self.connect()
83+
except AmqpClosedConnection as exc:
84+
print(exc)
85+
return
86+
87+
channel = await protocol.channel()
88+
await channel.queue_declare(
89+
queue_name=self.QUEUE_NAME,
90+
durable=True,
91+
passive=False,
92+
auto_delete=False
93+
)
94+
await channel.queue_bind(
95+
queue_name=self.QUEUE_NAME,
96+
exchange_name=self.REQUEST_EXCHANGE_NAME,
97+
routing_key=self.QUEUE_NAME
98+
)
99+
await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
100+
await channel.basic_consume(self.consume_callback, queue_name=self.QUEUE_NAME)
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import pytest
2+
from sage_utils.amqp.clients import RpcAmqpClient
3+
from sage_utils.constants import VALIDATION_ERROR, NOT_FOUND_ERROR
4+
from sage_utils.wrappers import Response
5+
6+
from app.game_servers.documents import GameServer
7+
from app.workers.update_server import UpdateServerWorker
8+
9+
10+
REQUEST_QUEUE = UpdateServerWorker.QUEUE_NAME
11+
REQUEST_EXCHANGE = UpdateServerWorker.REQUEST_EXCHANGE_NAME
12+
RESPONSE_EXCHANGE = UpdateServerWorker.RESPONSE_EXCHANGE_NAME
13+
14+
15+
@pytest.mark.asyncio
16+
async def test_worker_returns_an_updated_information_about_slots(sanic_server):
17+
await GameServer.collection.delete_many({})
18+
19+
game_server = GameServer(**{
20+
'host': '127.0.0.1',
21+
'port': 9000,
22+
'available_slots': 100,
23+
'credentials': {
24+
'token': 'super_secret_token'
25+
},
26+
'game_mode': '1v1'
27+
})
28+
await game_server.commit()
29+
30+
client = RpcAmqpClient(
31+
sanic_server.app,
32+
routing_key=REQUEST_QUEUE,
33+
request_exchange=REQUEST_EXCHANGE,
34+
response_queue='',
35+
response_exchange=RESPONSE_EXCHANGE
36+
)
37+
data = {
38+
'id': str(game_server.id),
39+
'freed-slots': 10
40+
}
41+
response = await client.send(payload=data)
42+
43+
assert Response.EVENT_FIELD_NAME in response.keys()
44+
assert Response.CONTENT_FIELD_NAME in response.keys()
45+
content = response[Response.CONTENT_FIELD_NAME]
46+
47+
assert set(content.keys()) == {'id', 'available-slots'}
48+
assert content['id'] == str(game_server.id)
49+
assert content['available-slots'] == game_server.available_slots + data['freed-slots']
50+
51+
servers_count = await GameServer.collection.count_documents({})
52+
assert servers_count == 1
53+
54+
await GameServer.collection.delete_many({})
55+
56+
57+
@pytest.mark.asyncio
58+
async def test_worker_returns_not_found_error_for_non_existing_game_server(sanic_server):
59+
await GameServer.collection.delete_many({})
60+
61+
client = RpcAmqpClient(
62+
sanic_server.app,
63+
routing_key=REQUEST_QUEUE,
64+
request_exchange=REQUEST_EXCHANGE,
65+
response_queue='',
66+
response_exchange=RESPONSE_EXCHANGE
67+
)
68+
data = {
69+
'id': '5b6a085123cf24aef53b4c78',
70+
'freed-slots': 10
71+
}
72+
response = await client.send(payload=data)
73+
74+
assert Response.ERROR_FIELD_NAME in response.keys()
75+
error = response[Response.ERROR_FIELD_NAME]
76+
77+
assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
78+
assert error[Response.ERROR_TYPE_FIELD_NAME] == NOT_FOUND_ERROR
79+
80+
assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
81+
assert error[Response.ERROR_DETAILS_FIELD_NAME] == 'The requested game server ' \
82+
'was not found.'
83+
84+
servers_count = await GameServer.collection.count_documents({})
85+
assert servers_count == 0
86+
87+
await GameServer.collection.delete_many({})
88+
89+
90+
@pytest.mark.asyncio
91+
async def test_worker_returns_a_validation_error_for_missing_fields(sanic_server):
92+
await GameServer.collection.delete_many({})
93+
94+
client = RpcAmqpClient(
95+
sanic_server.app,
96+
routing_key=REQUEST_QUEUE,
97+
request_exchange=REQUEST_EXCHANGE,
98+
response_queue='',
99+
response_exchange=RESPONSE_EXCHANGE
100+
)
101+
response = await client.send(payload={})
102+
103+
assert Response.ERROR_FIELD_NAME in response.keys()
104+
error = response[Response.ERROR_FIELD_NAME]
105+
106+
assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
107+
assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR
108+
109+
assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
110+
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2
111+
112+
for field in ['id', 'freed-slots']:
113+
assert field in error[Response.ERROR_DETAILS_FIELD_NAME]
114+
assert len(error[Response.ERROR_DETAILS_FIELD_NAME][field]) == 1
115+
assert error[Response.ERROR_DETAILS_FIELD_NAME][field][0] == 'Missing data for ' \
116+
'required field.'
117+
118+
servers_count = await GameServer.collection.count_documents({})
119+
assert servers_count == 0
120+
121+
await GameServer.collection.delete_many({})
122+
123+
124+
@pytest.mark.asyncio
125+
async def test_worker_returns_a_validation_error_for_invalid_id_and_slots_type(sanic_server):
126+
await GameServer.collection.delete_many({})
127+
128+
client = RpcAmqpClient(
129+
sanic_server.app,
130+
routing_key=REQUEST_QUEUE,
131+
request_exchange=REQUEST_EXCHANGE,
132+
response_queue='',
133+
response_exchange=RESPONSE_EXCHANGE
134+
)
135+
data = {
136+
'id': 'INVALID_OBJECT_ID',
137+
'freed-slots': 'INVALID_VALUE'
138+
}
139+
response = await client.send(payload=data)
140+
141+
assert Response.ERROR_FIELD_NAME in response.keys()
142+
error = response[Response.ERROR_FIELD_NAME]
143+
144+
assert Response.ERROR_TYPE_FIELD_NAME in error.keys()
145+
assert error[Response.ERROR_TYPE_FIELD_NAME] == VALIDATION_ERROR
146+
147+
assert Response.ERROR_DETAILS_FIELD_NAME in error.keys()
148+
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]) == 2
149+
150+
assert 'id' in error[Response.ERROR_DETAILS_FIELD_NAME]
151+
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]['id']) == 1
152+
assert error[Response.ERROR_DETAILS_FIELD_NAME]['id'][0] == "'INVALID_OBJECT_ID' is not a " \
153+
"valid ObjectId, it must be a " \
154+
"12-byte input or a " \
155+
"24-character hex string."
156+
157+
assert 'freed-slots' in error[Response.ERROR_DETAILS_FIELD_NAME]
158+
assert len(error[Response.ERROR_DETAILS_FIELD_NAME]['freed-slots']) == 1
159+
assert error[Response.ERROR_DETAILS_FIELD_NAME]['freed-slots'][0] == 'Not a valid integer.'
160+
161+
servers_count = await GameServer.collection.count_documents({})
162+
assert servers_count == 0
163+
164+
await GameServer.collection.delete_many({})

0 commit comments

Comments
 (0)