Skip to content

Commit 8a3f6a0

Browse files
committed
added parallel multiprocessing support
1 parent 0bc8775 commit 8a3f6a0

File tree

10 files changed

+41
-14
lines changed

10 files changed

+41
-14
lines changed

probe.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
from miniprobe import MiniProbe
4040
import sensors
4141
import requests
42+
import worker
43+
import multiprocessing
4244
except Exception as e:
4345
print e
4446
#sys.exit()
@@ -111,16 +113,31 @@ def main():
111113
gc.collect()
112114
if str(json_response) != '[]':
113115
json_response_chunks = [json_response[i:i + 10] for i in range(0, len(json_response), 10)]
116+
procs = []
117+
out_queue = multiprocessing.Queue()
114118
for element in json_response_chunks:
115119
for part in element:
116120
if config['debug']:
117121
logging.debug(part)
118122
for sensor in sensor_list:
119123
if part['kind'] == sensor.get_kind():
120-
json_payload_data.append(sensor.get_data(part))
124+
#json_payload_data.append(sensor.get_data(part))
125+
p = multiprocessing.Process(target=sensor.get_data, args=(part, out_queue),
126+
name=part['kind'])
127+
procs.append(p)
128+
p.start()
129+
print p
121130
else:
122131
pass
123132
gc.collect()
133+
for i in range(len(procs)):
134+
json_payload_data.append(out_queue.get())
135+
for p in procs:
136+
#json_payload_data.append(out_queue.get())
137+
print p
138+
p.join()
139+
procs = []
140+
124141
url_data = mini_probe.create_url(config, 'data')
125142
try:
126143
request_data = requests.post(url_data, data=json.dumps(json_payload_data), verify=False)
@@ -136,6 +153,7 @@ def main():
136153
time.sleep((int(config['baseinterval']) * (9 / len(json_response))))
137154
else:
138155
time.sleep(int(config['baseinterval']) / 2)
156+
del out_queue
139157
else:
140158
logging.info("Nothing to do. Waiting for %s seconds." % (int(config['baseinterval']) / 3))
141159
time.sleep(int(config['baseinterval']) / 3)

sensors/cpuload.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def get_sensordef():
5252
return sensordefinition
5353

5454
@staticmethod
55-
def get_data(data):
55+
def get_data(data, out_queue):
5656
cpuload = CPULoad()
5757
logging.info("Running sensor: %s" % cpuload.get_kind())
5858
try:
@@ -77,6 +77,7 @@ def get_data(data):
7777
}
7878
del cpuload
7979
gc.collect()
80+
out_queue.put(data)
8081
return data
8182

8283
@staticmethod

sensors/diskspace.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def get_sensordef():
5353
return sensordefinition
5454

5555
@staticmethod
56-
def get_data(data):
56+
def get_data(data, out_queue):
5757
diskspace = Diskspace()
5858
try:
5959
disk = diskspace.read_disk()
@@ -76,6 +76,7 @@ def get_data(data):
7676
}
7777
del diskspace
7878
gc.collect()
79+
out_queue.put(data)
7980
return data
8081

8182
def read_disk(self):

sensors/http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def request(self, url, request_method=None, auth_method=None, timeout=None, post
173173
return data
174174

175175
@staticmethod
176-
def get_data(data):
176+
def get_data(data, out_queue):
177177
http = HTTP()
178178
try:
179179
http_data = http.request(data['url'], request_method=data["http_method"], auth_method=data["auth_method"],
@@ -204,4 +204,5 @@ def get_data(data):
204204
}
205205
del http
206206
gc.collect()
207+
out_queue.put(data)
207208
return data

sensors/memory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def get_sensordef():
5252
return sensordefinition
5353

5454
@staticmethod
55-
def get_data(data):
55+
def get_data(data, out_queue):
5656
memory = Memory()
5757
try:
5858
mem = memory.read_memory('/proc/meminfo')
@@ -77,6 +77,7 @@ def get_data(data):
7777
}
7878
del memory
7979
gc.collect()
80+
out_queue.put(data)
8081
return data
8182

8283
def read_memory(self, path):

sensors/ping.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def ping(self, target, count, timeout, packetsize):
135135
return channel_list
136136

137137
@staticmethod
138-
def get_data(data):
138+
def get_data(data, out_queue):
139139
ping = Ping()
140140
try:
141141
pingdata = ping.ping(data['host'], data['pingcount'], data['timeout'], data['packsize'])
@@ -157,4 +157,5 @@ def get_data(data):
157157
}
158158
del ping
159159
gc.collect()
160+
out_queue.put(data)
160161
return data

sensors/port.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,27 @@ def port(self, target, timeout, port):
112112
return channel_list
113113

114114
@staticmethod
115-
def get_data(data):
115+
def get_data(data, out_queue):
116116
port = Port()
117117
try:
118118
port_data = port.port(data['host'], data['timeout'], data['targetport'])
119119
logging.info("Running sensor: %s" % port.get_kind())
120120
except Exception as e:
121121
logging.error("Ooops Something went wrong with '%s' sensor %s. Error: %s" % (port.get_kind(),
122122
data['sensorid'], e))
123-
sensor_data = {
123+
data = {
124124
"sensorid": int(data['sensorid']),
125125
"error": "Exception",
126126
"code": 1,
127127
"message": "Port check failed. See log for details"
128128
}
129-
return sensor_data
130-
sensor_data = {
129+
return data
130+
data = {
131131
"sensorid": int(data['sensorid']),
132132
"message": "OK Port %s available" % data['targetport'],
133133
"channel": port_data
134134
}
135135
del port
136136
gc.collect()
137-
return sensor_data
137+
out_queue.put(data)
138+
return data

sensors/probehealth.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def get_sensordef():
5252
return sensordefinition
5353

5454
@staticmethod
55-
def get_data(data):
55+
def get_data(data, out_queue):
5656
probehealth = Probehealth()
5757
try:
5858
mem = probehealth.read_memory('/proc/meminfo')
@@ -78,6 +78,7 @@ def get_data(data):
7878
"message": "OK",
7979
"channel": probedata
8080
}
81+
out_queue.put(data)
8182
return data
8283

8384
def read_memory(self, path):

sensors/snmpcustom.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def snmp_get(self, oid, target, snmp_type, community, port, unit, multiplication
171171
return channellist
172172

173173
@staticmethod
174-
def get_data(data):
174+
def get_data(data, out_queue):
175175
snmpcustom = SNMPCustom()
176176
try:
177177
snmp_data = snmpcustom.snmp_get(str(data['oid']), data['host'], data['value_type'],
@@ -197,4 +197,5 @@ def get_data(data):
197197
}
198198
del snmpcustom
199199
gc.collect()
200+
out_queue.put(data)
200201
return data

sensors/snmptraffic.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def snmp_get(self, target, countertype, community, port, ifindex):
165165
return channellist
166166

167167
@staticmethod
168-
def get_data(data):
168+
def get_data(data, out_queue):
169169
snmptraffic = SNMPTraffic()
170170
try:
171171
snmp_data = snmptraffic.snmp_get(data['host'], data['snmp_counter'],
@@ -190,4 +190,5 @@ def get_data(data):
190190
}
191191
del snmptraffic
192192
gc.collect()
193+
out_queue.put(data)
193194
return data

0 commit comments

Comments
 (0)