diff --git a/pysipp/launch.py b/pysipp/launch.py index d6098cd..5a9ebfe 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -2,7 +2,7 @@ Launchers for invoking SIPp user agents """ import os -import select +import selectors import shlex import signal import subprocess @@ -11,7 +11,6 @@ from collections import namedtuple from collections import OrderedDict from pprint import pformat - from . import utils log = utils.get_logger() @@ -35,12 +34,12 @@ def __init__( self, subprocmod=subprocess, osmod=os, - poller=select.epoll, + poller=None, ): # these could optionally be rpyc proxy objs self.spm = subprocmod self.osm = osmod - self.poller = poller() + self.poller = poller or selectors.DefaultSelector() # collector thread placeholder self._waiter = None # store proc results @@ -67,8 +66,8 @@ def __call__(self, cmds, block=True, rate=300, **kwargs): fd = proc.stderr.fileno() log.debug("registering fd '{}' for pid '{}'".format(fd, proc.pid)) fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + # register for stderr read events (EOF when process closes) + self.poller.register(proc.stderr, selectors.EVENT_READ) # limit launch rate time.sleep(1.0 / rate) @@ -85,11 +84,14 @@ def _wait(self, fds2procs): left = len(fds2procs) collected = 0 while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: + events = self.poller.select() # wait on hangup events + log.debug("received events for '{}'".format(events)) + for key, event_mask in events: collected += 1 + fd = key.fd proc = fds2procs[fd] + # unregister to avoid further events + self.poller.unregister(key.fileobj) # attach streams so they can be read more then once log.debug("collecting streams for {}".format(proc)) proc.streams = Streams(*proc.communicate()) # timeout=2))