Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pysipp/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Launchers for invoking SIPp user agents
"""
import os
import select
import selectors
import shlex
import signal
import subprocess
Expand All @@ -11,7 +11,6 @@
from collections import namedtuple
from collections import OrderedDict
from pprint import pformat

from . import utils

log = utils.get_logger()
Expand All @@ -35,12 +34,12 @@ def __init__(
self,
subprocmod=subprocess,
osmod=os,
poller=select.epoll,
poller=None,
):
# these could optionally be rpyc proxy objs
self.spm = subprocmod
self.osm = osmod
self.poller = poller()
self.poller = poller or selectors.DefaultSelector()
# collector thread placeholder
self._waiter = None
# store proc results
Expand All @@ -67,8 +66,8 @@ def __call__(self, cmds, block=True, rate=300, **kwargs):
fd = proc.stderr.fileno()
log.debug("registering fd '{}' for pid '{}'".format(fd, proc.pid))
fds2procs[fd] = self._procs[cmd] = proc
# register for stderr hangup events
self.poller.register(proc.stderr.fileno(), select.EPOLLHUP)
# register for stderr read events (EOF when process closes)
self.poller.register(proc.stderr, selectors.EVENT_READ)
# limit launch rate
time.sleep(1.0 / rate)

Expand All @@ -85,11 +84,14 @@ def _wait(self, fds2procs):
left = len(fds2procs)
collected = 0
while collected < left:
pairs = self.poller.poll() # wait on hangup events
log.debug("received hangup for pairs '{}'".format(pairs))
for fd, status in pairs:
events = self.poller.select() # wait on hangup events
log.debug("received events for '{}'".format(events))
for key, event_mask in events:
collected += 1
fd = key.fd
proc = fds2procs[fd]
# unregister to avoid further events
self.poller.unregister(key.fileobj)
# attach streams so they can be read more then once
log.debug("collecting streams for {}".format(proc))
proc.streams = Streams(*proc.communicate()) # timeout=2))
Expand Down
Loading