Skip to content

Commit 52abf36

Browse files
committed
Improve automatic tests
1 parent d96d545 commit 52abf36

File tree

2 files changed

+95
-56
lines changed

2 files changed

+95
-56
lines changed

test/mock_modbus_server.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,9 @@ def run_async_server(self):
8484
# store = ModbusSlaveContext(..., zero_mode=True)
8585
# ----------------------------------------------------------------------- #
8686
store = ModbusSlaveContext(
87-
di=ModbusSequentialDataBlock(0, [200]*3000),
88-
co=ModbusSequentialDataBlock(0, [200]*3000),
89-
hr=ModbusSequentialDataBlock(0, [200]*3000),
90-
ir=ModbusSequentialDataBlock(0, [200]*3000))
91-
context = ModbusServerContext(slaves=store, single=True)
87+
hr=ModbusSequentialDataBlock(0, [0]*3000),
88+
ir=ModbusSequentialDataBlock(0, [0]*3000))
89+
self.context = ModbusServerContext(slaves=store, single=True)
9290

9391
# ----------------------------------------------------------------------- #
9492
# initialize the server information
@@ -108,11 +106,49 @@ def run_async_server(self):
108106
# ----------------------------------------------------------------------- #
109107

110108
# TCP Server
111-
StartTcpServer(context, identity=identity, address=("localhost", 5020))
109+
StartTcpServer(self.context, identity=identity, address=("localhost", 5020))
112110

113111
def stop_async_server(self):
114112
StopServer()
115113

114+
def update_context(self, register, address, values):
115+
""" Update values of the active context. It should be noted
116+
that there is a race condition for the update.
117+
118+
:param register: Type of register to update,
119+
3: holding register
120+
4: input register
121+
:param address: The starting address of the value to be changed
122+
:param values: List of values
123+
"""
124+
assert register == 3 or register == 4
125+
slave_id = 0x00
126+
old_values = self.context[slave_id].getValues(register,
127+
address, count=1)
128+
self.log.debug("Change value at address {} from {} to {}".format(
129+
address, old_values, values))
130+
self.context[slave_id].setValues(register, address, values)
131+
132+
def update_holding_register(self, address, value):
133+
""" Update value of a holding register.
134+
135+
:param address: Address to update
136+
:param value: Value to save
137+
"""
138+
self.log.debug("Update holding register: {}:{}".format(address,
139+
int(value)))
140+
self.update_context(3, address, [int(value)])
141+
142+
def update_input_register(self, address, value):
143+
""" Update value of an input register.
144+
145+
:param address: Address to update
146+
:param value: Value to save
147+
"""
148+
self.log.debug("Update input register: {}:{}".format(address,
149+
int(value)))
150+
self.update_context(4, address, [int(value)])
151+
116152

117153
if __name__ == "__main__":
118154
mms = MockModbusServer()

test/test_pystiebeleltron.py

Lines changed: 53 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,13 @@ class TestStiebelEltronApi:
2121
#__slots__ = 'api'
2222

2323
@pytest.fixture(scope="module")
24-
def pyse_api(self, request):
25-
24+
def pymb_s(self, request):
2625
mb_s = ModbusServer()
27-
mb_c = ModbusClient(host=host_ip, port=host_port, timeout=2)
28-
api = pyse.StiebelEltronAPI(mb_c, slave, update_on_read=False)
2926

3027
# Cleanup after last test did run (will run as well, if something fails in setup).
3128
def fin():
32-
mb_c.close()
33-
#time.sleep(0.5)
34-
35-
stop_thread = Thread(target=mb_s.stop_async_server, name="StopReactor")
29+
stop_thread = Thread(target=mb_s.stop_async_server,
30+
name="StopReactor")
3631
stop_thread.start()
3732
if stop_thread.is_alive():
3833
stop_thread.join()
@@ -43,10 +38,26 @@ def fin():
4338
request.addfinalizer(fin)
4439

4540
# Start Mock Modbus server
46-
mms_thread = Thread(target=mb_s.run_async_server, name="MockModbusServer")
41+
mms_thread = Thread(target=mb_s.run_async_server,
42+
name="MockModbusServer")
4743
mms_thread.start()
4844
time.sleep(0.1)
4945

46+
return mb_s
47+
48+
@pytest.fixture(scope="module")
49+
def pyse_api(self, request, pymb_s):
50+
# parameter pymb_s leads to call of fixture
51+
mb_c = ModbusClient(host=host_ip, port=host_port, timeout=2)
52+
api = pyse.StiebelEltronAPI(mb_c, slave, update_on_read=True)
53+
54+
# Cleanup after last test (will run as well, if setup fails).
55+
def fin():
56+
mb_c.close()
57+
time.sleep(0.5)
58+
59+
request.addfinalizer(fin)
60+
5061
# Connect Modbus client
5162
connected = mb_c.connect()
5263
assert connected
@@ -57,52 +68,44 @@ def fin():
5768

5869
return api
5970

60-
def test_temperature_read(self, pyse_api):
61-
temp = pyse_api.get_current_temp()
62-
assert temp >= 20.0 and temp < 25.0
71+
def test_temperature_read(self, pyse_api, pymb_s):
72+
pymb_s.update_input_register(0, 21.5*10)
73+
assert pyse_api.get_current_temp() == 21.5
6374

64-
temp = pyse_api.get_target_temp()
65-
assert temp >= 20.0 and temp < 25.0
75+
pymb_s.update_holding_register(1001, 22.5*10)
76+
assert pyse_api.get_target_temp() == 22.5
6677

67-
#@pytest.mark.skip
6878
def test_temperature_write(self, pyse_api):
69-
# Get old target temperature
70-
old_temp = pyse_api.get_target_temp()
71-
new_temp = 22.5
72-
73-
# Set new target temperature
74-
pyse_api.set_target_temp(new_temp)
79+
temperature = 22.5
80+
pyse_api.set_target_temp(temperature)
7581
time.sleep(3)
76-
pyse_api.update()
77-
78-
mod_temp = pyse_api.get_target_temp()
79-
assert mod_temp == new_temp
8082

81-
# Restore old target temperature
82-
if mod_temp != old_temp:
83-
pyse_api.set_target_temp(old_temp)
84-
time.sleep(3)
85-
pyse_api.update()
83+
assert pyse_api.get_target_temp() == temperature
8684

8785
def test_operation(self, pyse_api):
88-
pyse_api.set_operation('DHW')
86+
operation = 'DHW'
87+
pyse_api.set_operation(operation)
8988
time.sleep(3)
90-
pyse_api.update()
91-
oper = pyse_api.get_operation()
92-
assert oper == 'DHW'
93-
94-
def test_humidity(self, pyse_api):
95-
humidity = pyse_api.get_current_humidity()
96-
assert humidity == 20.0
97-
98-
def test_statuses(self, pyse_api):
99-
status = pyse_api.get_heating_status()
100-
assert status is False
101-
status = pyse_api.get_cooling_status()
102-
assert status is True
103-
status = pyse_api.get_filter_alarm_status()
104-
assert status is False
105-
106-
@pytest.mark.skip
107-
def test_fail(self):
108-
assert 0
89+
90+
assert pyse_api.get_operation() == operation
91+
92+
def test_humidity(self, pyse_api, pymb_s):
93+
humidity = 49.5
94+
pymb_s.update_input_register(2, humidity*10)
95+
assert pyse_api.get_current_humidity() == humidity
96+
97+
def test_statuses(self, pyse_api, pymb_s):
98+
pymb_s.update_input_register(2000, 0x0004)
99+
assert pyse_api.get_heating_status() is True
100+
assert pyse_api.get_cooling_status() is False
101+
assert pyse_api.get_filter_alarm_status() is False
102+
103+
pymb_s.update_input_register(2000, 0x0008)
104+
assert pyse_api.get_heating_status() is False
105+
assert pyse_api.get_cooling_status() is True
106+
assert pyse_api.get_filter_alarm_status() is False
107+
108+
pymb_s.update_input_register(2000, 0x2100)
109+
assert pyse_api.get_heating_status() is False
110+
assert pyse_api.get_cooling_status() is False
111+
assert pyse_api.get_filter_alarm_status() is True

0 commit comments

Comments
 (0)