@@ -1024,11 +1024,11 @@ class Receiver(object):
10241024 routed to the context due to disconnection, and ignores messages that
10251025 did not originate from the respondent context.
10261026 """
1027- #: If not :data:`None`, a reference to a function invoked as
1028- #: `notify(receiver)` when a new message is delivered to this receiver. The
1029- #: function is invoked on the broker thread, therefore it must not block.
1030- #: Used by :class:`mitogen.select.Select` to implement waiting on multiple
1031- #: receivers .
1027+ #: If not :data:`None`, a function invoked as `notify(receiver)` after a
1028+ #: message has been received. The function is invoked on :class:`Broker`
1029+ #: thread, therefore it must not block. Used by
1030+ #: :class:`mitogen.select.Select` to efficiently implement waiting on
1031+ #: multiple event sources .
10321032 notify = None
10331033
10341034 raise_channelerror = True
@@ -1513,6 +1513,22 @@ def get_source(self, fullname):
15131513
15141514
15151515class LogHandler (logging .Handler ):
1516+ """
1517+ A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG`
1518+ messages to be sent to a parent context in response to logging messages
1519+ generated by the current context. This is installed by default in child
1520+ contexts during bootstrap, so that :mod:`logging` events can be viewed and
1521+ managed centrally in the master process.
1522+
1523+ The handler is initially *corked* after construction, such that it buffers
1524+ messages until :meth:`uncork` is called. This allows logging to be
1525+ installed prior to communication with the target being available, and
1526+ avoids any possible race where early log messages might be dropped.
1527+
1528+ :param mitogen.core.Context context:
1529+ The context to send log messages towards. At present this is always
1530+ the master process.
1531+ """
15161532 def __init__ (self , context ):
15171533 logging .Handler .__init__ (self )
15181534 self .context = context
@@ -1549,6 +1565,9 @@ def _send(self, msg):
15491565 self ._buffer_lock .release ()
15501566
15511567 def emit (self , rec ):
1568+ """
1569+ Send a :data:`FORWARD_LOG` message towards the target context.
1570+ """
15521571 if rec .name == 'mitogen.io' or \
15531572 getattr (self .local , 'in_emit' , False ):
15541573 return
@@ -1566,6 +1585,30 @@ def emit(self, rec):
15661585
15671586
15681587class Stream (object ):
1588+ """
1589+ A :class:`Stream` is one readable and optionally one writeable file
1590+ descriptor (represented by :class:`Side`) aggregated alongside an
1591+ associated :class:`Protocol` that knows how to respond to IO readiness
1592+ events for those descriptors.
1593+
1594+ Streams are registered with :class:`Broker`, and callbacks are invoked on
1595+ the broker thread in response to IO activity. When registered using
1596+ :meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker
1597+ may call any of :meth:`on_receive`, :meth:`on_transmit`,
1598+ :meth:`on_shutdown` or :meth:`on_disconnect`.
1599+
1600+ It is expected that the :class:`Protocol` associated with a stream will
1601+ change over its life. For example during connection setup, the initial
1602+ protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to
1603+ enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to
1604+ the target, before handing off to :class:`MitogenProtocol` when the target
1605+ process is initialized.
1606+
1607+ Streams connecting to children are in turn aggregated by
1608+ :class:`mitogen.parent.Connection`, which contains additional logic for
1609+ managing any child process, and a reference to any separate ``stderr``
1610+ :class:`Stream` connected to that process.
1611+ """
15691612 #: A :class:`Side` representing the stream's receive file descriptor.
15701613 receive_side = None
15711614
@@ -1578,21 +1621,38 @@ class Stream(object):
15781621 #: In parents, the :class:`mitogen.parent.Connection` instance.
15791622 conn = None
15801623
1624+ #: The stream name. This is used in the :meth:`__repr__` output in any log
1625+ #: messages, it may be any descriptive string.
15811626 name = u'default'
15821627
15831628 def set_protocol (self , protocol ):
15841629 """
1585- Bind a protocol to this stream, by updating :attr:`Protocol.stream` to
1586- refer to this stream, and updating this stream's
1587- :attr:`Stream.protocol` to the refer to the protocol. Any prior
1588- protocol's :attr:`Protocol.stream` is set to :data:`None`.
1630+ Bind a :class:`Protocol` to this stream, by updating
1631+ :attr:`Protocol.stream` to refer to this stream, and updating this
1632+ stream's :attr:`Stream.protocol` to the refer to the protocol. Any
1633+ prior protocol's :attr:`Protocol.stream` is set to :data:`None`.
15891634 """
15901635 if self .protocol :
15911636 self .protocol .stream = None
15921637 self .protocol = protocol
15931638 self .protocol .stream = self
15941639
15951640 def accept (self , rfp , wfp ):
1641+ """
1642+ Attach a pair of file objects to :attr:`receive_side` and
1643+ :attr:`transmit_side`, after wrapping them in :class:`Side` instances.
1644+ :class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
1645+ on the underlying file descriptors during construction.
1646+
1647+ The same file object may be used for both sides. The default
1648+ :meth:`on_disconnect` is handles the possibility that only one
1649+ descriptor may need to be closed.
1650+
1651+ :param file rfp:
1652+ The file object to receive from.
1653+ :param file wfp:
1654+ The file object to transmit to.
1655+ """
15961656 self .receive_side = Side (self , rfp )
15971657 self .transmit_side = Side (self , wfp )
15981658
@@ -1601,13 +1661,17 @@ def __repr__(self):
16011661
16021662 def on_receive (self , broker ):
16031663 """
1604- Called by :class:`Broker` when the stream's :attr:`receive_side` has
1664+ Invoked by :class:`Broker` when the stream's :attr:`receive_side` has
16051665 been marked readable using :meth:`Broker.start_receive` and the broker
16061666 has detected the associated file descriptor is ready for reading.
16071667
1608- Subclasses must implement this if :meth:`Broker.start_receive` is ever
1609- called on them, and the method must call :meth:`on_disconect` if
1610- reading produces an empty string.
1668+ Subclasses must implement this if they are registered using
1669+ :meth:`Broker.start_receive`, and the method must invoke
1670+ :meth:`on_disconnect` if reading produces an empty string.
1671+
1672+ The default implementation reads :attr:`Protocol.read_size` bytes and
1673+ passes the resulting bytestring to :meth:`Protocol.on_receive`. If the
1674+ bytestring is 0 bytes, invokes :meth:`on_disconnect` instead.
16111675 """
16121676 buf = self .receive_side .read (self .protocol .read_size )
16131677 if not buf :
@@ -1618,30 +1682,39 @@ def on_receive(self, broker):
16181682
16191683 def on_transmit (self , broker ):
16201684 """
1621- Called by :class:`Broker` when the stream's :attr:`transmit_side`
1622- has been marked writeable using :meth:`Broker._start_transmit` and
1623- the broker has detected the associated file descriptor is ready for
1685+ Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has
1686+ been marked writeable using :meth:`Broker._start_transmit` and the
1687+ broker has detected the associated file descriptor is ready for
16241688 writing.
16251689
1626- Subclasses must implement this if :meth:`Broker._start_transmit` is
1627- ever called on them.
1690+ Subclasses must implement they are ever registerd with
1691+ :meth:`Broker._start_transmit`.
1692+
1693+ The default implementation invokes :meth:`Protocol.on_transmit`.
16281694 """
16291695 self .protocol .on_transmit (broker )
16301696
16311697 def on_shutdown (self , broker ):
16321698 """
1633- Called by :meth:`Broker.shutdown` to allow the stream time to
1634- gracefully shutdown. The base implementation simply called
1635- :meth:`on_disconnect`.
1699+ Invoked by :meth:`Broker.shutdown` to allow the stream time to
1700+ gracefully shutdown.
1701+
1702+ The default implementation emits a ``shutdown`` signal before
1703+ invoking :meth:`on_disconnect`.
16361704 """
16371705 fire (self , 'shutdown' )
16381706 self .protocol .on_shutdown (broker )
16391707
16401708 def on_disconnect (self , broker ):
16411709 """
1642- Called by :class:`Broker` to force disconnect the stream. The base
1643- implementation simply closes :attr:`receive_side` and
1644- :attr:`transmit_side` and unregisters the stream from the broker.
1710+ Invoked by :class:`Broker` to force disconnect the stream during
1711+ shutdown, invoked by the default :meth:`on_shutdown` implementation,
1712+ and usually invoked by any subclass :meth:`on_receive` implementation
1713+ in response to a 0-byte read.
1714+
1715+ The base implementation fires a ``disconnect`` event, then closes
1716+ :attr:`receive_side` and :attr:`transmit_side` after unregistering the
1717+ stream from the broker.
16451718 """
16461719 fire (self , 'disconnect' )
16471720 self .protocol .on_disconnect (broker )
@@ -1666,6 +1739,8 @@ class Protocol(object):
16661739 #: :data:`None`.
16671740 stream = None
16681741
1742+ #: The size of the read buffer used by :class:`Stream` when this is the
1743+ #: active protocol for the stream.
16691744 read_size = CHUNK_SIZE
16701745
16711746 @classmethod
@@ -2369,8 +2444,18 @@ class Latch(object):
23692444
23702445 See :ref:`waking-sleeping-threads` for further discussion.
23712446 """
2447+ #: The :class:`Poller` implementation to use for waiting. Since the poller
2448+ #: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller`
2449+ #: if it is available, or :class:`mitogen.core.Poller` otherwise, since
2450+ #: these implementations require no system calls to create, configure or
2451+ #: destroy.
23722452 poller_class = Poller
23732453
2454+ #: If not :data:`None`, a function invoked as `notify(latch)` after a
2455+ #: successful call to :meth:`put`. The function is invoked on the
2456+ #: :meth:`put` caller's thread, which may be the :class:`Broker` thread,
2457+ #: therefore it must not block. Used by :class:`mitogen.select.Select` to
2458+ #: efficiently implement waiting on multiple event sources.
23742459 notify = None
23752460
23762461 # The _cls_ prefixes here are to make it crystal clear in the code which
@@ -2725,15 +2810,22 @@ def defer(self, func, *args, **kwargs):
27252810
27262811class IoLoggerProtocol (DelimitedProtocol ):
27272812 """
2728- Handle redirection of standard IO into the :mod:`logging` package.
2813+ Attached to one end of a socket pair whose other end overwrites one of the
2814+ standard ``stdout`` or ``stderr`` file descriptors in a child context.
2815+ Received data is split up into lines, decoded as UTF-8 and logged to the
2816+ :mod:`logging` package as either the ``stdout`` or ``stderr`` logger.
2817+
2818+ Logging in child contexts is in turn forwarded to the master process using
2819+ :class:`LogHandler`.
27292820 """
27302821 @classmethod
27312822 def build_stream (cls , name , dest_fd ):
27322823 """
2733- Even though the descriptor `dest_fd` will hold the opposite end of the
2734- socket open, we must keep a separate dup() of it (i.e. wsock) in case
2735- some code decides to overwrite `dest_fd` later, which would thus break
2736- :meth:`on_shutdown`.
2824+ Even though the file descriptor `dest_fd` will hold the opposite end of
2825+ the socket open, we must keep a separate dup() of it (i.e. wsock) in
2826+ case some code decides to overwrite `dest_fd` later, which would
2827+ prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
2828+ <socket.socket.shutdown>` on it.
27372829 """
27382830 rsock , wsock = socket .socketpair ()
27392831 os .dup2 (wsock .fileno (), dest_fd )
0 commit comments