Skip to content

Commit a758a15

Browse files
committed
Implemented register microservice worker for testing
1 parent aa1919a commit a758a15

File tree

1 file changed

+63
-1
lines changed

1 file changed

+63
-1
lines changed

tests/fixtures.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import os
2+
import json
3+
4+
from sage_utils.amqp.base import AmqpWorker
5+
from sage_utils.constants import VALIDATION_ERROR
6+
from sage_utils.wrappers import Response
27

38

49
class FakeConfig(object):
@@ -11,4 +16,61 @@ class Application(object):
1116

1217
def __init__(self, config=None):
1318
super(Application, self).__init__()
14-
self.config = config
19+
self.config = config
20+
21+
22+
class BaseWorker(AmqpWorker):
23+
QUEUE_NAME = 'process-request'
24+
REQUEST_EXCHANGE_NAME = 'amqp.direct'
25+
RESPONSE_EXCHANGE_NAME = 'amqp.direct'
26+
CONTENT_TYPE = 'application/json'
27+
28+
def generate_response(self, data):
29+
raise NotImplementedError('`generate_response(data)` method must be implemented.')
30+
31+
async def process_request(self, channel, body, envelope, properties):
32+
response = self.generate_response(body)
33+
await channel.publish(
34+
json.dumps(response.data),
35+
exchange_name=self.RESPONSE_EXCHANGE_NAME,
36+
routing_key=properties.reply_to,
37+
properties={
38+
'content_type': self.CONTENT_TYPE,
39+
'delivery_mode': 2,
40+
'correlation_id': properties.correlation_id
41+
}
42+
)
43+
44+
await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
45+
46+
async def consume_callback(self, channel, body, envelope, properties):
47+
self.app.loop.create_task(self.process_request(channel, body, envelope, properties))
48+
49+
async def run(self, *args, **kwargs):
50+
_transport, protocol = await self.connect()
51+
52+
channel = await protocol.channel()
53+
await channel.queue_declare(
54+
queue_name=self.QUEUE_NAME,
55+
durable=True,
56+
passive=False,
57+
auto_delete=False
58+
)
59+
await channel.queue_bind(
60+
queue_name=self.QUEUE_NAME,
61+
exchange_name=self.REQUEST_EXCHANGE_NAME,
62+
routing_key=self.QUEUE_NAME
63+
)
64+
await channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
65+
await channel.basic_consume(self.consume_callback, queue_name=self.QUEUE_NAME)
66+
67+
68+
class FakeRegisterMicroserviceWorker(BaseWorker):
69+
QUEUE_NAME = 'microservice.register'
70+
71+
def generate_response(self, raw_data):
72+
data = json.loads(raw_data.strip())
73+
74+
if 'name' not in data.keys() and 'version' not in data.keys():
75+
return Response.from_error(VALIDATION_ERROR, "Name and version must be specified.")
76+
return Response.with_content("OK")

0 commit comments

Comments
 (0)