@@ -47,7 +47,7 @@ class IOPubThread:
4747 whose IO is always run in a thread.
4848 """
4949
50- def __init__ (self , socket , pipe = False ):
50+ def __init__ (self , socket , session = None , pipe = False ):
5151 """Create IOPub thread
5252
5353 Parameters
@@ -59,6 +59,7 @@ def __init__(self, socket, pipe=False):
5959 piped from subprocesses.
6060 """
6161 self .socket = socket
62+ self .session = session
6263 self ._stopped = False
6364 self .background_socket = BackgroundSocket (self )
6465 self ._master_pid = os .getpid ()
@@ -73,12 +74,82 @@ def __init__(self, socket, pipe=False):
7374 self ._event_pipe_gc_seconds : float = 10
7475 self ._event_pipe_gc_task : asyncio .Task [Any ] | None = None
7576 self ._setup_event_pipe ()
77+ self ._setup_xpub_listener ()
7678 self .thread = threading .Thread (target = self ._thread_main , name = "IOPub" )
7779 self .thread .daemon = True
7880 self .thread .pydev_do_not_trace = True # type:ignore[attr-defined]
7981 self .thread .is_pydev_daemon_thread = True # type:ignore[attr-defined]
8082 self .thread .name = "IOPub"
8183
84+ def _setup_xpub_listener (self ):
85+ """Setup listener for XPUB subscription events"""
86+
87+ # Checks the socket is not a DummySocket
88+ if not hasattr (self .socket , "getsockopt" ):
89+ return
90+
91+ socket_type = self .socket .getsockopt (zmq .TYPE )
92+ if socket_type == zmq .XPUB :
93+ self ._xpub_stream = ZMQStream (self .socket , self .io_loop )
94+ self ._xpub_stream .on_recv (self ._handle_subscription )
95+
96+ def _handle_subscription (self , frames ):
97+ """Handle subscription/unsubscription events from XPUB socket
98+
99+ XPUB sockets receive:
100+ - subscribe: single frame with b'\\ x01' + topic
101+ - unsubscribe: single frame with b'\\ x00' + topic
102+ """
103+
104+ for frame in frames :
105+ event_type = frame [0 ]
106+ if event_type == 1 :
107+ subscription = frame [1 :] if len (frame ) > 1 else b""
108+ try :
109+ subscription_str = subscription .decode ("utf-8" )
110+ except UnicodeDecodeError :
111+ continue
112+ self ._send_welcome_message (subscription_str )
113+
114+ def _send_welcome_message (self , subscription ):
115+ """Send iopub_welcome message for new subscription
116+
117+ Parameters
118+ ----------
119+ subscription : str
120+ The subscription topic (UTF-8 decoded)
121+ """
122+
123+ # TODO: This early return is for backward-compatibility with ipyparallel.
124+ # This should be removed when ipykernel has been released with support of
125+ # xpub and ipyparallel has been updated to pass the session parameter
126+ # to IOPubThread upon construction.
127+ # (NB: the call to fix is here:
128+ # https://github.com/ipython/ipyparallel/blob/main/ipyparallel/engine/app.py#L679
129+ if self .session is None :
130+ return
131+
132+ content = {"subscription" : subscription }
133+
134+ header = self .session .msg_header ("iopub_welcome" )
135+ msg = {
136+ "header" : header ,
137+ "parent_header" : {},
138+ "metadata" : {},
139+ "content" : content ,
140+ "buffers" : [],
141+ }
142+
143+ msg_list = self .session .serialize (msg )
144+
145+ if subscription :
146+ identity = subscription .encode ("utf-8" )
147+ full_msg = [identity , * msg_list ]
148+ else :
149+ full_msg = msg_list
150+ # Send directly on socket (we're already in IO thread context)
151+ self .socket .send_multipart (full_msg )
152+
82153 def _thread_main (self ):
83154 """The inner loop that's actually run in a thread"""
84155
@@ -447,7 +518,7 @@ def __init__(
447518 DeprecationWarning ,
448519 stacklevel = 2 ,
449520 )
450- pub_thread = IOPubThread (pub_thread )
521+ pub_thread = IOPubThread (pub_thread , self . session )
451522 pub_thread .start ()
452523 self .pub_thread = pub_thread
453524 self .name = name
0 commit comments