@@ -83,7 +83,7 @@ def rpc(fn: Callable[..., Any]) -> Callable[..., Any]:
8383
8484
8585# ---------------------------------------------------------------------------
86- # Protocols
86+ # Protocols (work in progress)
8787# ---------------------------------------------------------------------------
8888
8989
@@ -102,7 +102,11 @@ def direct_msg(self, selfstream: Out, target: RemoteIn, value: T) -> None: ...
102102Transport = TransportProtocol | DirectTransportProtocol
103103
104104
105- class DaskTransport : ...
105+ class DaskTransport (DirectTransportProtocol ):
106+ def msg (self , selfstream : Out [T ], target : RemoteIn [T ], value : T ) -> None : ...
107+
108+
109+ daskTransport = DaskTransport () # singleton instance for use in Out/RemoteOut
106110
107111
108112# ---------------------------------------------------------------------------
@@ -120,9 +124,13 @@ class State(enum.Enum):
120124class Stream (Generic [T ]):
121125 """Base class shared by *In* and *Out* streams."""
122126
123- def __init__ (self , typ : type [T ], name : str , transport : Transport = DaskTransport ):
127+ transport : Transport = daskTransport # default transport
128+
129+ def __init__ (self , typ : type [T ], name : str , transport : Transport = None ):
124130 self .type : type [T ] = typ
125131 self .name : str = name
132+ if transport :
133+ self .transport = transport
126134
127135 # ------------------------------------------------------------------
128136 # Descriptor plumbing – auto-fill name when used as class attr
@@ -166,8 +174,8 @@ def state(self) -> State: # pragma: no cover – abstract
166174class BaseOut (Stream [T ]):
167175 """Common behaviour shared by *local* and *remote* outputs."""
168176
169- def __init__ (self , typ : type [T ], name : str = "Out" , owner : Any | None = None ):
170- super ().__init__ (typ , name )
177+ def __init__ (self , typ : type [T ], name : str = "Out" , owner : Any | None = None , ** kwargs ):
178+ super ().__init__ (typ , name , ** kwargs )
171179 self .owner : Any | None = owner
172180
173181 @property
@@ -194,6 +202,8 @@ def __str__(self) -> str: # noqa: D401
194202class Out (BaseOut [T ]):
195203 """Local *Out* – synchronous fan-out to subscribers."""
196204
205+ transport : Transport = daskTransport
206+
197207 def __init__ (self , typ : type [T ], name : str = "Out" , owner : Any | None = None ):
198208 super ().__init__ (typ , name , owner )
199209 self ._subscribers : List [In [T ]] = []
@@ -320,6 +330,9 @@ def __init__(self, typ: type[T], name: str, owner: Actor):
320330 def __str__ (self ) -> str : # noqa: D401
321331 return f"{ self .__class__ .__name__ } { super ().__str__ ()} @ { self .owner } "
322332
333+ def connect (self , source : Out [Any ]) -> None :
334+ self .owner .connect (self .name , source )
335+
323336
324337# ---------------------------------------------------------------------------
325338# Module infrastructure
@@ -336,13 +349,11 @@ class Module: # pylint: disable=too-few-public-methods
336349 # ------------------------------------------------------------------
337350 # Runtime helpers
338351 # ------------------------------------------------------------------
339- def bind (self , input_name : str , source : Out [Any ]) -> None :
352+ def connect (self , input_name : str , source : Out [Any ]) -> None :
340353 inp = In (source .type , input_name , self , source )
341354 self .inputs [input_name ] = inp
342355 setattr (self , input_name , inp )
343356
344- connect = bind # legacy alias
345-
346357 def subscribe (self , output_name : str , remote_input : In [Any ]) -> None : # noqa: D401
347358 getattr (self , output_name ).subscribe (remote_input )
348359
0 commit comments