Skip to content

Commit dd56d89

Browse files
committed
Version 1.3 - cleanup some stuff, better more reliable termination
1 parent 7f51577 commit dd56d89

File tree

4 files changed

+120
-76
lines changed

4 files changed

+120
-76
lines changed

PumpkinLB.py

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#!/usr/bin/python2
22

3-
import os
3+
import math
44
import multiprocessing
5+
import os
56
import platform
67
import socket
78
import sys
89
import signal
10+
import threading
911
import traceback
1012
import time
1113

@@ -14,6 +16,7 @@
1416
from pumpkinlb.listener import PumpkinListener
1517

1618
if __name__ == '__main__':
19+
1720
configFilename = None
1821
for arg in sys.argv[1:]:
1922
if arg == '--help':
@@ -69,43 +72,70 @@ def handleSigTerm(*args):
6972
pass
7073
sys.stderr.write('Sent signal to children, waiting up to 4 seconds then trying to clean up\n')
7174
time.sleep(1)
75+
startTime = time.time()
7276
remainingListeners = listeners
73-
for i in xrange(3):
74-
remainingListeners2 = []
75-
for listener in remainingListeners:
76-
sys.stderr.write('Waiting on %d...\n' %(listener.pid,))
77-
sys.stderr.flush()
78-
listener.join(.005)
79-
if listener.is_alive() is True:
80-
remainingListeners2.append(listener)
81-
remainingListeners = remainingListeners2
82-
sys.stderr.write('Remaining (%d) listeners are: %s\n' %(len(remainingListeners), [listener.pid for listener in remainingListeners]))
77+
remainingListeners2 = []
78+
for listener in remainingListeners:
79+
sys.stderr.write('Waiting on %d...\n' %(listener.pid,))
8380
sys.stderr.flush()
84-
if len(remainingListeners) == 0:
85-
break
86-
time.sleep(1)
87-
88-
if len(remainingListeners) > 0:
89-
sys.stderr.write('After trying to clean up, %d listeners remain.\n' %(len(remainingListeners)))
90-
for listener in remainingListeners:
91-
try:
92-
os.kill(listener.pid, signal.SIGKILL)
93-
except:
94-
pass
95-
time.sleep(.1)
96-
sys.stderr.write('Starting final join\n')
97-
sys.stderr.flush()
98-
for listener in remainingListeners:
99-
listener.join()
100-
sys.stderr.write('Done\n')
101-
sys.stderr.flush()
102-
103-
if '_NT' in platform.system():
104-
# Some issue on windows, or at least cygwin on windows, causes an infinite loop in this signal handler when trying to kill the process. CThe only way out is to cause an exception between here and the next call, so provide a function with invalid arguments.
105-
sys.stderr.write('You can ignore the below exception, it is used to quit on NT\n')
106-
signal.signal(signal.SIGTERM, sys.exit)
107-
signal.signal(signal.SIGINT, sys.exit)
81+
listener.join(.05)
82+
if listener.is_alive() is True:
83+
remainingListeners2.append(listener)
84+
remainingListeners = remainingListeners2
85+
sys.stderr.write('Remaining (%d) listeners are: %s\n' %(len(remainingListeners), [listener.pid for listener in remainingListeners]))
86+
sys.stderr.flush()
87+
88+
afterJoinTime = time.time()
89+
90+
if remainingListeners:
91+
delta = afterJoinTime - startTime
92+
remainingSleep = int(6 - math.floor(afterJoinTime - startTime))
93+
if remainingSleep > 0:
94+
anyAlive = False
95+
# If we still have time left, see if we are just done or if there are children to clean up using remaining time allotment
96+
if threading.activeCount() > 1 or len(multiprocessing.active_children()) > 0:
97+
sys.stderr.write('Listener closed in %1.2f seconds. Waiting up to %d seconds before terminating.\n' %(delta, remainingSleep))
98+
sys.stderr.flush()
99+
thisThread = threading.current_thread()
100+
for i in range(remainingSleep):
101+
allThreads = threading.enumerate()
102+
anyAlive = False
103+
for thread in allThreads:
104+
if thread is thisThread or thread.name == 'MainThread':
105+
continue
106+
thread.join(.05)
107+
if thread.is_alive() == True:
108+
anyAlive = True
109+
110+
allChildren = multiprocessing.active_children()
111+
for child in allChildren:
112+
child.join(.05)
113+
if child.is_alive() == True:
114+
anyAlive = True
115+
if anyAlive is False:
116+
break
117+
time.sleep(1)
118+
119+
if anyAlive is True:
120+
sys.stderr.write('Could not kill in time.\n')
121+
else:
122+
sys.stderr.write('Shutdown successful after %1.2f seconds.\n' %( time.time() - startTime))
123+
sys.stderr.flush()
124+
125+
else:
126+
sys.stderr.write('Listener timed out in closing, exiting uncleanly.\n')
127+
sys.stderr.flush()
128+
time.sleep(.05) # Why not? :P
129+
130+
sys.stdout.write('exiting...\n')
131+
sys.stdout.flush()
132+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
133+
signal.signal(signal.SIGINT, signal.SIG_DFL)
108134
sys.exit(0)
135+
os.kill(os.getpid(), signal.SIGTERM)
136+
return 0
137+
# END handleSigTerm
138+
109139

110140
signal.signal(signal.SIGTERM, handleSigTerm)
111141
signal.signal(signal.SIGINT, handleSigTerm)
@@ -116,4 +146,4 @@ def handleSigTerm(*args):
116146
except:
117147
os.kill(os.getpid(), signal.SIGTERM)
118148

119-
# vim: ts=4 sw=4 expandtab
149+
# vim: set ts=4 sw=4 expandtab

pumpkinlb/listener.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ def __init__(self, localAddr, localPort, workers):
2727

2828
self.listenSocket = None
2929

30+
self.cleanupThread = None
31+
3032
self.keepGoing = True
3133

3234
def cleanup(self):
@@ -40,9 +42,10 @@ def cleanup(self):
4042
time.sleep(1.5)
4143

4244
def closeWorkers(self, *args):
43-
sys.stdout.write("GOT SIGNAL on %s:%d\n" %(self.localAddr, self.localPort))
4445
self.keepGoing = False
4546

47+
time.sleep(1)
48+
4649
try:
4750
self.listenSocket.shutdown(socket.SHUT_RDWR)
4851
except:
@@ -53,6 +56,8 @@ def closeWorkers(self, *args):
5356
pass
5457

5558
if not self.pumpkinWorkers:
59+
self.cleanupThread and self.cleanupThread.join(3)
60+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
5661
sys.exit(0)
5762

5863
for pumpkinWorker in self.pumpkinWorkers:
@@ -64,6 +69,7 @@ def closeWorkers(self, *args):
6469

6570
time.sleep(1)
6671

72+
6773
remainingWorkers = []
6874
for pumpkinWorker in self.pumpkinWorkers:
6975
pumpkinWorker.join(.03)
@@ -75,6 +81,10 @@ def closeWorkers(self, *args):
7581
for pumpkinWorker in remainingWorkers:
7682
pumpkinWorker.join(.2)
7783

84+
self.cleanupThread and self.cleanupThread.join(2)
85+
86+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
87+
7888
sys.exit(0)
7989

8090
def retryFailedWorkers(self, *args):
@@ -122,7 +132,7 @@ def run(self):
122132

123133
listenSocket.listen(5)
124134

125-
cleanupThread = threading.Thread(target=self.cleanup)
135+
self.cleanupThread = cleanupThread = threading.Thread(target=self.cleanup)
126136
cleanupThread.start()
127137
if len(self.workers) > 1:
128138
retryThread = threading.Thread(target=self.retryFailedWorkers)
@@ -148,10 +158,9 @@ def run(self):
148158
except Exception as e:
149159
sys.stderr.write('Got exception: %s, shutting down worker on %s:%d\n' %(str(e), self.localAddr, self.localPort))
150160
self.closeWorkers()
161+
return
151162

152163

153-
# If we got here, must have been signlaed to terminate. Stay open for a short bit to try to close children, then tank.
154-
time.sleep(6)
155-
sys.exit(0)
164+
self.closeWorkers()
156165

157-
# vim: ts=4 sw=4 expandtab
166+

pumpkinlb/worker.py

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def closeConnections(self):
4242
self.clientSocket.close()
4343
except:
4444
pass
45+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
4546

4647
def closeConnectionsAndExit(self, *args):
4748
self.closeConnections()
@@ -61,44 +62,47 @@ def run(self):
6162

6263
signal.signal(signal.SIGTERM, self.closeConnectionsAndExit)
6364

64-
dataToClient = ''
65-
dataFromClient = ''
66-
while True:
67-
waitingToWrite = []
65+
try:
66+
dataToClient = ''
67+
dataFromClient = ''
68+
while True:
69+
waitingToWrite = []
6870

69-
if dataToClient:
70-
waitingToWrite.append(clientSocket)
71-
if dataFromClient:
72-
waitingToWrite.append(workerSocket)
73-
71+
if dataToClient:
72+
waitingToWrite.append(clientSocket)
73+
if dataFromClient:
74+
waitingToWrite.append(workerSocket)
75+
7476

75-
(hasDataForRead, readyForWrite, hasError) = select.select( [clientSocket, workerSocket], waitingToWrite, [clientSocket, workerSocket], .3)
77+
(hasDataForRead, readyForWrite, hasError) = select.select( [clientSocket, workerSocket], waitingToWrite, [clientSocket, workerSocket], .3)
7678

77-
if hasError:
78-
break
79-
80-
if clientSocket in hasDataForRead:
81-
nextData = clientSocket.recv(4096)
82-
if not nextData:
79+
if hasError:
8380
break
84-
dataFromClient += nextData
81+
82+
if clientSocket in hasDataForRead:
83+
nextData = clientSocket.recv(4096)
84+
if not nextData:
85+
break
86+
dataFromClient += nextData
87+
88+
if workerSocket in hasDataForRead:
89+
nextData = workerSocket.recv(4096)
90+
if not nextData:
91+
break
92+
dataToClient += nextData
93+
94+
if workerSocket in readyForWrite:
95+
while dataFromClient:
96+
workerSocket.send(dataFromClient[:4096])
97+
dataFromClient = dataFromClient[4096:]
8598

86-
if workerSocket in hasDataForRead:
87-
nextData = workerSocket.recv(4096)
88-
if not nextData:
89-
break
90-
dataToClient += nextData
91-
92-
if workerSocket in readyForWrite:
93-
while dataFromClient:
94-
workerSocket.send(dataFromClient[:4096])
95-
dataFromClient = dataFromClient[4096:]
99+
if clientSocket in readyForWrite:
100+
while dataToClient:
101+
clientSocket.send(dataToClient[:4096])
102+
dataToClient = dataToClient[4096:]
96103

97-
if clientSocket in readyForWrite:
98-
while dataToClient:
99-
clientSocket.send(dataToClient[:4096])
100-
dataToClient = dataToClient[4096:]
104+
except Exception as e:
105+
sys.stderr.write('Error: ' + str(e) + '\n')
101106

102-
self.closeConnections()
107+
self.closeConnectionsAndExit()
103108

104-
# vim: ts=4 sw=4 expandtab

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"""
3838

3939
setup(name='PumpkinLB',
40-
version='1.2.1',
40+
version='1.3',
4141
scripts=['PumpkinLB.py'],
4242
packages=['pumpkinlb'],
4343
author='Tim Savannah',
@@ -46,6 +46,7 @@
4646
maintainer_email='kata198@gmail.com',
4747
provides=['PumpkinLB'],
4848
description='A simple, fast, pure-python load balancer',
49+
url='https://github.com/kata198/PumpkinLB',
4950
long_description=long_description,
5051
license='GPLv3',
5152
keywords=['load balancer', 'load balance', 'python', 'balance', 'lb', 'http', 'socket', 'port', 'forward', 'tcp', 'fast', 'server', 'network'],

0 commit comments

Comments
 (0)