Skip to content

Commit a88a343

Browse files
committed
feat: add JobD helper
1 parent 7223e96 commit a88a343

File tree

3 files changed

+177
-0
lines changed

3 files changed

+177
-0
lines changed

sipgate_e2e_test_utils/jobd.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import os
2+
from typing import Any
3+
4+
import aiohttp
5+
import socket
6+
from http_request_recorder import HttpRequestRecorder
7+
8+
from sipgate_e2e_test_utils.rpc_matchers import xml_rpc
9+
from sipgate_e2e_test_utils.xml_rpc import XmlRpcRequest, XmlRpcResponse
10+
11+
with open(os.path.join(os.path.dirname(__file__), 'jobd_functions.xml')) as file:
12+
JOBD_FUNCTIONS_XML = file.read()
13+
14+
15+
class JobD:
16+
def __init__(self, system_hostname: str, system_port: int) -> None:
17+
self.notification_url = f"http://{socket.gethostname()}:8777/RPC2"
18+
self.system_url = f"http://{system_hostname}:{system_port}/RPC2"
19+
20+
async def __aenter__(self) -> "JobD":
21+
self.recorder = HttpRequestRecorder(name='JobD', port=8777)
22+
self.recorder.expect_path(path='/functions.xml',
23+
responses=(JOBD_FUNCTIONS_XML for _ in range(100)))
24+
25+
self.session = aiohttp.ClientSession()
26+
27+
await self.recorder.__aenter__()
28+
return self
29+
30+
async def trigger_job_and_record_answer(self, job_name: str, timeout: int = 10) -> bytes:
31+
expectation = self.recorder.expect(
32+
xml_rpc('jobd.updateEvent'), responses=XmlRpcResponse(200, 'ok').serialize(), timeout=timeout)
33+
34+
response = await self.session.post(self.system_url, data=XmlRpcRequest('cron.triggerJob', {
35+
'jobName': job_name,
36+
'notificationUrl': self.notification_url,
37+
'uniqueid': 42
38+
}).serialize())
39+
assert 200 == response.status
40+
41+
recorded_request: bytes = await expectation.wait()
42+
return recorded_request
43+
44+
async def __aexit__(self, *args: tuple[Any]) -> None:
45+
await self.session.close()
46+
await_aexit__: None = await self.recorder.__aexit__(*args)
47+
return await_aexit__
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<collection version="0.01">
3+
4+
<!-- NAGIOS PING FUNCTION -->
5+
<interface id="selfTest.testAll">
6+
<description>veranlasst selbsttest und liefert fehler-/erfolgsprotokoll</description>
7+
<input />
8+
<output>
9+
<scalar name="health" type="boolean" />
10+
<array name="protocol">
11+
<struct name="*">
12+
<scalar name="name" type="string" />
13+
<scalar name="exception" type="string" />
14+
<scalar name="success" type="boolean" />
15+
<scalar name="description" type="string" />
16+
<scalar name="critical" type="boolean" />
17+
</struct>
18+
</array>
19+
</output>
20+
</interface>
21+
22+
<interface id="daemon.shutdown">
23+
<input />
24+
<output />
25+
</interface><interface id="daemon.uptime">
26+
<input />
27+
<output>
28+
<struct name="uptime" optional="no">
29+
<scalar name="seconds" type="int" optional="no" />
30+
</struct>
31+
</output>
32+
</interface><interface id="jobd.updateEvent">
33+
<input>
34+
<scalar name="systemName" type="string" optional="no" />
35+
<scalar name="eventName" type="string" optional="no" />
36+
<scalar name="uniqueid" type="string" optional="no" />
37+
<scalar name="status" type="string" optional="no" />
38+
</input>
39+
<output />
40+
</interface><interface id="jobd.directTouch">
41+
<input>
42+
<scalar name="systemName" type="string" optional="no" />
43+
<scalar name="uniqueid" type="string" optional="no" />
44+
<scalar name="serviceIP" type="string" optional="yes" />
45+
</input>
46+
<output />
47+
</interface><interface id="jobd.listSystems">
48+
<input>
49+
</input>
50+
<output>
51+
<array name="systems" optional="no">
52+
<scalar type="string" />
53+
</array>
54+
</output>
55+
</interface><interface id="jobd.listCriticalSystems">
56+
<input>
57+
<scalar type="boolean" name="confirmJobs" />
58+
</input>
59+
<output>
60+
<struct name="systems" optional="no">
61+
<struct name="*">
62+
<array name="criticalJobs">
63+
<scalar type="string" />
64+
</array>
65+
</struct>
66+
</struct>
67+
</output>
68+
</interface></collection>

tests/test_jobd.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import asyncio
2+
import unittest
3+
from asyncio import create_task
4+
5+
from aiohttp import web, ClientSession
6+
7+
from sipgate_e2e_test_utils.jobd import JobD
8+
9+
from sipgate_e2e_test_utils.xml_rpc import XmlRpcRequest, XmlRpcResponse
10+
11+
12+
class TestJobD(unittest.IsolatedAsyncioTestCase):
13+
async def test_serves_functions_xml(self):
14+
any_port = 42
15+
async with (JobD('localhost', any_port), ClientSession() as http):
16+
response = await http.get('http://localhost:8777/functions.xml')
17+
18+
self.assertIn('jobd.updateEvent', await response.text())
19+
20+
async def test_returns_success(self):
21+
mock_service = MockService(46968)
22+
await mock_service.start()
23+
24+
async with (JobD('localhost', mock_service.port) as jobd):
25+
job_result = await jobd.trigger_job_and_record_answer('any_job')
26+
27+
self.assertIn(b'any_value', job_result)
28+
29+
await mock_service.stop()
30+
31+
32+
class MockService:
33+
def __init__(self, port: int) -> None:
34+
self.port = port
35+
36+
app = web.Application()
37+
app.add_routes([web.post('/RPC2', self.__handle_request)])
38+
self.runner = web.AppRunner(app)
39+
40+
async def start(self):
41+
await self.runner.setup()
42+
site = web.TCPSite(self.runner, '127.0.0.1', self.port)
43+
await site.start()
44+
45+
async def __handle_request(self, request: web.BaseRequest) -> web.Response:
46+
xml_rpc_request = XmlRpcRequest.parse(await request.text())
47+
assert 'cron.triggerJob' == xml_rpc_request.method_name
48+
49+
create_task(self.__send_result_to_jobd_after_some_time(xml_rpc_request.members['notificationUrl']))
50+
51+
return web.Response(status=200, body=XmlRpcResponse(200, 'OK', {}).serialize())
52+
53+
async def __send_result_to_jobd_after_some_time(self, notification_url: str):
54+
await asyncio.sleep(0.25)
55+
56+
async with ClientSession() as http:
57+
await http.post(notification_url, data=XmlRpcRequest('jobd.updateEvent', {
58+
'any_field': 'any_value'
59+
}).serialize())
60+
61+
async def stop(self):
62+
await self.runner.cleanup()

0 commit comments

Comments
 (0)