-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
112 lines (84 loc) · 3.55 KB
/
client.py
File metadata and controls
112 lines (84 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from snap7.client import Client as SnapClient
from opcua import Client
from threading import Thread
from snap7.snap7types import areas
from snap7.util import *
import sys,time
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
opc_url = "opc.tcp://localhost:4840/elevator/pid/"
opc_client = Client( opc_url )
opc_uri = "http://thaintersect.com"
updateThread = True
global plc
plc = SnapClient()
def updates():
while True:
# Set Speed Value on OPC Server
lspd.set_value(get_real( plc.read_area(areas['MK'], 0, 40, 4), 0)) # MD4
direction = get_bool( plc.read_area( areas['MK'], 0, 20,1 ),0,1 ) # M20.1
goingup.set_value(direction)
# if direction:
# lspd.set_value(aspd.get_value() )
# else:
# lspd.set_value( -aspd.get_value() )
# Set Distance variable in the PLC
# disp_reading = plc.read_area(areas['MK'], 0, 0, 4) # read MD0
# set_real( disp_reading , 0, disp.get_value() )
# plc.write_area( areas['MK'], 0, 0, disp_reading )
# Set Setpoint in PLC PID
# dest_reading = plc.read_area(areas['MK'], 0, 8, 4) # read MD8/old setpoint/destinateion
# set_real( dest_reading , 0, dest.get_value() ) # set the destination value
# plc.write_area( areas['MK'], 0, 8, dest_reading ) # Write Back the new value
'''
The next few lines of code assume that a Limit Switch is used for each individual
floor
'''
read_b12 = plc.read_area( areas['MK'],0,12,1 )
set_bool(read_b12,0,0,f_floor.get_value())
plc.write_area( areas['MK'], 0,12,read_b12 )
set_bool(read_b12,0,1,s_floor.get_value())
plc.write_area( areas['MK'], 0,12,read_b12 )
set_bool(read_b12,0,2,t_floor.get_value())
plc.write_area( areas['MK'], 0,12,read_b12 )
time.sleep(.01)
if __name__ == "__main__":
try:
opc_client.connect()
except Exception as identifier:
print("OPCUA Error:")
print(identifier)
else:
opc_client.load_type_definitions()
root = opc_client.get_root_node()
objects = opc_client.get_objects_node()
idx = opc_client.get_namespace_index(opc_uri)
obj = root.get_child(["0:Objects", "{}:PLC_S71200".format(idx)])
# dest = root.get_child(["0:Objects", "{}:PLC_S71200".format(idx), "{}:Destination".format(idx)])
dest = opc_client.get_node("ns=2;i=6")
# disp = root.get_child(["0:Objects", "{}:PLC_S71200".format(idx), "{}:Displacement".format(idx)])
disp = opc_client.get_node("ns=2;i=3")
# spd = root.get_child(["0:Objects", "{}:PLC_S71200".format(idx), "{}:Speed".format(idx)])
aspd = opc_client.get_node("ns=2;i=4")
lspd = opc_client.get_node("ns=2;i=5")
f_floor = opc_client.get_node("ns=2;i=13")
s_floor = opc_client.get_node("ns=2;i=14")
t_floor = opc_client.get_node("ns=2;i=15")
goingup = opc_client.get_node("ns=2;i=7")
try:
plc.connect('192.168.0.1', 0, 1)
plc.get_connected()
except Exception as msg:
print(msg)
sys.exit()
else:
t = Thread(target=updates,args=() )
t.start()
embed()