Skip to content

Commit 101628d

Browse files
authored
Types and lib location feature (#255)
* added small int support * change order * spaces between functions * changed functions small_int to usint (unsigned small int) * change in db_row get value y set value changed comparation from sint to usint (0-255) * added to db_row get_value and set_value comparation to sint (-128 to 127) * added support for SMALL INT (-127 to 128) * changes in test_util in bytearray, added numbers to ocuppe all bytes of the data structure in DTL * test passed | changed _bytearray in test to match the lenght for the test * added doc * change space * arreglos de espacios en blanco * added s7areaDB for default * added default to db and db_row -> area=S7AreaDB * same * added read area | write area --- TODO tests * correction to the db1['test'].read() -> added client * read area debe corregirse * correction to client.read_area of function write in class db_row * corrections to def write in class db_row * Small changes - Types.py: * Added enum class to handle areas - Utils: * Added some missing types * Added area parameter to the DB class, this way you could handle the inputs, outputs, or marks as a DB for reading it TODO: add tests - Common.py: * Missing types - Client.py: * Added an optional init parameter for the lib location, this way people who can't instal properly the library, they can specify the path to the .dll file. * Types.py readd S7Areas as alone values * Test fixes 1 * Typing errors fix 2 * White spaces fixes * White spaces * Remove enum Areas, so we stick to the ADict class * Remove it TODOs in util.py * Utils type export fix * _type parameter switch to type_ * Fix typing for areas Adict * Migration to Enum for Areas and WordLen * switch assert date * whitespaces * Fix typing errors in util file * ignore typing * fix test * whitespaces * Added find locally snap7.dll Fix logo errors * fix assert error in date * types.py: * remove it string multiline methods added- common.py * Remove it type ignore from ctypes import * Added common test and change common find_locally from using os to pathlib * Pathlib * Path lib fix to find in the current working directory
1 parent 687e848 commit 101628d

File tree

10 files changed

+341
-203
lines changed

10 files changed

+341
-203
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,9 @@ doc/_build/
5353
venv/
5454

5555
.DS_Store
56+
57+
# VSCode
58+
.vscode/
59+
60+
# DLL
61+
snap7.dll

snap7/client.py

Lines changed: 71 additions & 69 deletions
Large diffs are not rendered by default.

snap7/common.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import logging
2+
import os
23
import platform
34
from ctypes import c_char
45
from ctypes.util import find_library
6+
from typing import Optional, Union
7+
import pathlib
58

69
from snap7.exceptions import Snap7Exception
710

@@ -21,7 +24,7 @@ class ADict(dict):
2124
Accessing dict keys like an attribute.
2225
"""
2326
__getattr__ = dict.__getitem__
24-
__setattr__ = dict.__setitem__
27+
__setattr__ = dict.__setitem__ # type: ignore
2528

2629

2730
class Snap7Library:
@@ -30,6 +33,7 @@ class Snap7Library:
3033
sure the library is loaded only once.
3134
"""
3235
_instance = None
36+
lib_location: Optional[str]
3337

3438
def __new__(cls, *args, **kwargs):
3539
if not cls._instance:
@@ -38,17 +42,17 @@ def __new__(cls, *args, **kwargs):
3842
cls._instance.cdll = None
3943
return cls._instance
4044

41-
def __init__(self, lib_location=None):
42-
if self.cdll:
45+
def __init__(self, lib_location: Optional[str] = None):
46+
if self.cdll: # type: ignore
4347
return
44-
self.lib_location = lib_location or self.lib_location or find_library('snap7')
48+
self.lib_location = lib_location or self.lib_location or find_library('snap7') or find_locally('snap7')
4549
if not self.lib_location:
4650
msg = "can't find snap7 library. If installed, try running ldconfig"
4751
raise Snap7Exception(msg)
4852
self.cdll = cdll.LoadLibrary(self.lib_location)
4953

5054

51-
def load_library(lib_location: str = None):
55+
def load_library(lib_location: Optional[str] = None):
5256
"""
5357
:returns: a ctypes cdll object with the snap7 shared library loaded.
5458
"""
@@ -87,3 +91,10 @@ def error_text(error, context: str = "client") -> bytes:
8791
elif context == "partner":
8892
library.Par_ErrorText(error, text, len_)
8993
return text.value
94+
95+
96+
def find_locally(fname):
97+
file = pathlib.Path.cwd() / f"{fname}.dll"
98+
if file.exists():
99+
return str(file)
100+
return None

snap7/logo.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from snap7 import types
1212
from snap7.common import check_error, load_library, ipv4
1313
from snap7.exceptions import Snap7Exception
14-
from snap7.types import S7Object
14+
from snap7.types import S7Object, WordLen
1515
from snap7.types import param_types
1616

1717
logger = logging.getLogger(__name__)
@@ -88,11 +88,11 @@ def read(self, vm_address: str):
8888
:param vm_address: of Logo memory (e.g. V30.1, VW32, V24)
8989
:returns: integer
9090
"""
91-
area = types.S7AreaDB
91+
area = types.Areas.DB
9292
db_number = 1
9393
size = 1
9494
start = 0
95-
wordlen = 0
95+
wordlen: WordLen
9696
logger.debug(f"read, vm_address:{vm_address}")
9797
if re.match(r"V[0-9]{1,4}\.[0-7]", vm_address):
9898
# bit value
@@ -102,42 +102,42 @@ def read(self, vm_address: str):
102102
address_byte = int(address[0])
103103
address_bit = int(address[1])
104104
start = (address_byte * 8) + address_bit
105-
wordlen = types.S7WLBit
105+
wordlen = WordLen.Bit
106106
elif re.match("V[0-9]+", vm_address):
107107
# byte value
108108
logger.info(f"Byte address: {vm_address}")
109109
start = int(vm_address[1:])
110-
wordlen = types.S7WLByte
110+
wordlen = WordLen.Byte
111111
elif re.match("VW[0-9]+", vm_address):
112112
# byte value
113113
logger.info(f"Word address: {vm_address}")
114114
start = int(vm_address[2:])
115-
wordlen = types.S7WLWord
115+
wordlen = WordLen.Word
116116
elif re.match("VD[0-9]+", vm_address):
117117
# byte value
118118
logger.info(f"DWord address: {vm_address}")
119119
start = int(vm_address[2:])
120-
wordlen = types.S7WLDWord
120+
wordlen = WordLen.DWord
121121
else:
122122
logger.info("Unknown address format")
123123
return 0
124124

125-
type_ = snap7.types.wordlen_to_ctypes[wordlen]
125+
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
126126
data = (type_ * size)()
127127

128-
logger.debug(f"start:{start}, wordlen:{wordlen}, data-length:{len(data)}")
128+
logger.debug(f"start:{start}, wordlen:{wordlen.name}={wordlen.value}, data-length:{len(data)}")
129129

130-
result = self.library.Cli_ReadArea(self.pointer, area, db_number, start,
131-
size, wordlen, byref(data))
130+
result = self.library.Cli_ReadArea(self.pointer, area.value, db_number, start,
131+
size, wordlen.value, byref(data))
132132
check_error(result, context="client")
133133
# transform result to int value
134-
if wordlen == types.S7WLBit:
134+
if wordlen == WordLen.Bit:
135135
return data[0]
136-
if wordlen == types.S7WLByte:
136+
if wordlen == WordLen.Byte:
137137
return struct.unpack_from(">B", data)[0]
138-
if wordlen == types.S7WLWord:
138+
if wordlen == WordLen.Word:
139139
return struct.unpack_from(">h", data)[0]
140-
if wordlen == types.S7WLDWord:
140+
if wordlen == WordLen.DWord:
141141
return struct.unpack_from(">l", data)[0]
142142

143143
def write(self, vm_address: str, value: int) -> int:
@@ -148,11 +148,11 @@ def write(self, vm_address: str, value: int) -> int:
148148
:param vm_address: write offset
149149
:param value: integer
150150
"""
151-
area = types.S7AreaDB
151+
area = types.Areas.DB
152152
db_number = 1
153153
start = 0
154154
amount = 1
155-
wordlen = 0
155+
wordlen: WordLen
156156
data = bytearray(0)
157157
logger.debug(f"write, vm_address:{vm_address}, value:{value}")
158158
if re.match(r"^V[0-9]{1,4}\.[0-7]$", vm_address):
@@ -163,7 +163,7 @@ def write(self, vm_address: str, value: int) -> int:
163163
address_byte = int(address[0])
164164
address_bit = int(address[1])
165165
start = (address_byte * 8) + address_bit
166-
wordlen = types.S7WLBit
166+
wordlen = WordLen.Bit
167167
if value > 0:
168168
data = bytearray([1])
169169
else:
@@ -172,34 +172,34 @@ def write(self, vm_address: str, value: int) -> int:
172172
# byte value
173173
logger.info(f"Byte address: {vm_address}")
174174
start = int(vm_address[1:])
175-
wordlen = types.S7WLByte
175+
wordlen = WordLen.Byte
176176
data = bytearray(struct.pack(">B", value))
177177
elif re.match("^VW[0-9]+$", vm_address):
178178
# byte value
179179
logger.info(f"Word address: {vm_address}")
180180
start = int(vm_address[2:])
181-
wordlen = types.S7WLWord
181+
wordlen = WordLen.Word
182182
data = bytearray(struct.pack(">h", value))
183183
elif re.match("^VD[0-9]+$", vm_address):
184184
# byte value
185185
logger.info(f"DWord address: {vm_address}")
186186
start = int(vm_address[2:])
187-
wordlen = types.S7WLDWord
187+
wordlen = WordLen.DWord
188188
data = bytearray(struct.pack(">l", value))
189189
else:
190190
logger.info(f"write, Unknown address format: {vm_address}")
191191
return 1
192192

193-
if wordlen == types.S7WLBit:
194-
type_ = snap7.types.wordlen_to_ctypes[types.S7WLByte]
193+
if wordlen == WordLen.Bit:
194+
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
195195
else:
196-
type_ = snap7.types.wordlen_to_ctypes[wordlen]
196+
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
197197

198198
cdata = (type_ * amount).from_buffer_copy(data)
199199

200200
logger.debug(f"write, vm_address:{vm_address} value:{value}")
201201

202-
result = self.library.Cli_WriteArea(self.pointer, area, db_number, start, amount, wordlen, byref(cdata))
202+
result = self.library.Cli_WriteArea(self.pointer, area.value, db_number, start, amount, wordlen.value, byref(cdata))
203203
check_error(result, context="client")
204204
return result
205205

@@ -214,7 +214,7 @@ def db_read(self, db_number: int, start: int, size: int) -> bytearray:
214214
"""
215215
logger.debug(f"db_read, db_number:{db_number}, start:{start}, size:{size}")
216216

217-
type_ = snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte]
217+
type_ = snap7.types.wordlen_to_ctypes[WordLen.Byte.value]
218218
data = (type_ * size)()
219219
result = (self.library.Cli_DBRead(
220220
self.pointer, db_number, start, size,
@@ -230,8 +230,8 @@ def db_write(self, db_number: int, start: int, data: bytearray) -> int:
230230
:param start: start address for Logo7 0..951 / Logo8 0..1469
231231
:param data: bytearray
232232
"""
233-
wordlen = snap7.types.S7WLByte
234-
type_ = snap7.types.wordlen_to_ctypes[wordlen]
233+
wordlen = WordLen.Byte
234+
type_ = snap7.types.wordlen_to_ctypes[wordlen.value]
235235
size = len(data)
236236
cdata = (type_ * size).from_buffer_copy(data)
237237
logger.debug(f"db_write db_number:{db_number} start:{start} size:{size} data:{data}")

snap7/server.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,10 @@ def clear_events(self) -> int:
293293
def mainloop(tcpport: int = 1102):
294294
server = snap7.server.Server()
295295
size = 100
296-
DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
297-
PAdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
298-
TMdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
299-
CTdata = (snap7.types.wordlen_to_ctypes[snap7.types.S7WLByte] * size)()
296+
DBdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
297+
PAdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
298+
TMdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
299+
CTdata = (snap7.types.wordlen_to_ctypes[snap7.types.WordLen.Byte.value] * size)()
300300
server.register_area(snap7.types.srvAreaDB, 1, DBdata)
301301
server.register_area(snap7.types.srvAreaPA, 1, PAdata)
302302
server.register_area(snap7.types.srvAreaTM, 1, TMdata)

snap7/types.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import ctypes
55

66
from snap7.common import ADict
7+
from enum import Enum
78

89
S7Object = ctypes.c_void_p
910
buffer_size = 65536
@@ -51,14 +52,26 @@
5152
mkEvent = 0
5253
mkLog = 1
5354

55+
5456
# Area ID
57+
class Areas(Enum):
58+
PE = 0x81
59+
PA = 0x82
60+
MK = 0x83
61+
DB = 0x84
62+
CT = 0x1C
63+
TM = 0x1D
64+
65+
66+
# Leave it for now
5567
S7AreaPE = 0x81
5668
S7AreaPA = 0x82
5769
S7AreaMK = 0x83
5870
S7AreaDB = 0x84
5971
S7AreaCT = 0x1C
6072
S7AreaTM = 0x1D
6173

74+
6275
areas = ADict({
6376
'PE': 0x81,
6477
'PA': 0x82,
@@ -68,7 +81,19 @@
6881
'TM': 0x1D,
6982
})
7083

84+
7185
# Word Length
86+
class WordLen(Enum):
87+
Bit = 0x01
88+
Byte = 0x02
89+
Word = 0x04
90+
DWord = 0x06
91+
Real = 0x08
92+
Counter = 0x1C
93+
Timer = 0x1D
94+
95+
96+
# Leave it for now
7297
S7WLBit = 0x01
7398
S7WLByte = 0x02
7499
S7WLWord = 0x04
@@ -212,6 +237,10 @@ class S7DataItem(ctypes.Structure):
212237
('pData', ctypes.POINTER(ctypes.c_uint8))
213238
]
214239

240+
def __str__(self) -> str:
241+
return f"<S7DataItem Area: {self.Area} WordLen: {self.WordLen} Result: {self.Result} "\
242+
f"DBNumber: {self.DBNumber} Start: {self.Start} Amount: {self.Amount} pData: {self.pData}>"
243+
215244

216245
class S7CpuInfo(ctypes.Structure):
217246
_fields_ = [
@@ -222,6 +251,10 @@ class S7CpuInfo(ctypes.Structure):
222251
('ModuleName', ctypes.c_char * 25)
223252
]
224253

254+
def __str__(self):
255+
return f"<S7CpuInfo ModuleTypeName: {self.ModuleTypeName} SerialNumber: {self.SerialNumber} "\
256+
f"ASName: {self.ASName} Copyright: {self.Copyright} ModuleName: {self.ModuleName}>"
257+
225258

226259
class S7SZLHeader(ctypes.Structure):
227260
"""
@@ -233,6 +266,9 @@ class S7SZLHeader(ctypes.Structure):
233266
('NDR', ctypes.c_uint16)
234267
]
235268

269+
def __str__(self) -> str:
270+
return f"<S7SZLHeader LengthDR: {self.LengthDR}, NDR: {self.NDR}>"
271+
236272

237273
class S7SZL(ctypes.Structure):
238274
"""See §33.1 of System Software for S7-300/400 System and Standard Functions"""
@@ -241,6 +277,9 @@ class S7SZL(ctypes.Structure):
241277
('Data', ctypes.c_byte * (0x4000 - 4))
242278
]
243279

280+
def __str__(self) -> str:
281+
return f"<S7SZL Header: {self.S7SZHeader}, Data: {self.Data}>"
282+
244283

245284
class S7SZLList(ctypes.Structure):
246285
_fields_ = [
@@ -266,6 +305,10 @@ class S7CpInfo(ctypes.Structure):
266305
('MaxBusRate', ctypes.c_uint16)
267306
]
268307

308+
def __str__(self) -> str:
309+
return f"<S7CpInfo MaxPduLength: {self.MaxPduLength} MaxConnections: {self.MaxConnections} "\
310+
f"MaxMpiRate: {self.MaxMpiRate} MaxBusRate: {self.MaxBusRate}>"
311+
269312

270313
class S7Protection(ctypes.Structure):
271314
"""See §33.19 of System Software for S7-300/400 System and Standard Functions"""

0 commit comments

Comments
 (0)