From 9ef842deff6015b12200f7aa2dc084235186c432 Mon Sep 17 00:00:00 2001 From: lo5twind Date: Thu, 23 Nov 2017 14:23:03 +0800 Subject: [PATCH 1/2] add threadpool server which server with a fixed size pool of threads which service requests. --- thriftpy/server.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/thriftpy/server.py b/thriftpy/server.py index 664e4ec..4792524 100644 --- a/thriftpy/server.py +++ b/thriftpy/server.py @@ -4,6 +4,7 @@ import logging import threading +from six.moves import queue from thriftpy.protocol import TBinaryProtocolFactory from thriftpy.transport import ( @@ -103,3 +104,74 @@ def handle(self, client): def close(self): self.closed = True + + +class TThreadPoolServer(TServer): + """Server with a fixed size pool of threads which service requests.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.clients = queue.Queue() + self.threads = 10 + self.daemon = kwargs.get("daemon", False) + self.closed = False + + def setNumThreads(self, num): + """Set the number of worker threads that should be created""" + self.threads = num + + def serveThread(self): + """Loop around getting clients from the queue and process them.""" + while True: + if self.closed: + break + try: + client = self.clients.get() + self.serveClient(client) + except Exception as x: + logger.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.itrans_factory.get_transport(client) + otrans = self.otrans_factory.get_transport(client) + iprot = self.iprot_factory.get_protocol(itrans) + oprot = self.oprot_factory.get_protocol(otrans) + try: + while True: + if self.closed: + break + self.processor.process(iprot, oprot) + except TTransportException as x: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start a fixed number of threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target=self.serveThread) + t.setDaemon(self.daemon) + t.start() + except Exception as x: + logger.exception(x) + + # Pump the socket for clients + self.trans.listen() + while True: + if self.closed: + break + try: + client = self.trans.accept() + if not client: + continue + self.clients.put(client) + except Exception as x: + logger.exception(x) + + def close(self): + self.closed = True From dfefb8552b82e5bdcaa0344da104895284fff5b4 Mon Sep 17 00:00:00 2001 From: lo5twind Date: Thu, 7 Dec 2017 11:13:22 +0800 Subject: [PATCH 2/2] fix Travis-CI bug: 1.inet6 support, 2.dose not support python 3.3.x --- .travis.yml | 6 ++++-- tests/test_socket.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0d2878e..f4ae948 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,9 +2,7 @@ sudo: false language: python python: - - 2.6 - 2.7 - - 3.3 - 3.4 - 3.5 - pypy @@ -20,3 +18,7 @@ install: script: - tox -e py + +env: + global: + - TRAVIS="true" \ No newline at end of file diff --git a/tests/test_socket.py b/tests/test_socket.py index f0015b8..762f015 100644 --- a/tests/test_socket.py +++ b/tests/test_socket.py @@ -39,6 +39,8 @@ def test_inet_socket(): _test_socket(server_socket, client_socket) +@pytest.mark.skipif(os.getenv('TRAVIS', '') != 'true', + reason='Travis CI dose not support IPv6') def test_inet6_socket(): server_socket = TServerSocket(host="::1", port=12345, socket_family=socket.AF_INET6)