Skip to content

Commit 2317825

Browse files
committed
Refactor message handling to remove GeneratorMessage and streamline session management.
1 parent 9e28789 commit 2317825

File tree

17 files changed

+241
-258
lines changed

17 files changed

+241
-258
lines changed

demo/python/generator/msg.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from iop import GeneratorMessage,Message
1+
from iop import Message
22
from dataclasses import dataclass
33

44
@dataclass
5-
class MyGenerator(GeneratorMessage):
5+
class MyGenerator(Message):
66
"""Base message to initialize generator function"""
77
my_string: str
88

src/iop/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from iop._business_service import _BusinessService
44
from iop._director import _Director
55
from iop._inbound_adapter import _InboundAdapter
6-
from iop._message import _Message, _PickleMessage, _PydanticMessage, _PydanticPickleMessage, _GeneratorMessage
6+
from iop._message import _Message, _PickleMessage, _PydanticMessage, _PydanticPickleMessage
77
from iop._outbound_adapter import _OutboundAdapter
88
from iop._private_session_duplex import _PrivateSessionDuplex
99
from iop._private_session_process import _PrivateSessionProcess
@@ -22,5 +22,4 @@ class Message(_Message): pass
2222
class PickleMessage(_PickleMessage): pass
2323
class PydanticMessage(_PydanticMessage): pass
2424
class PydanticPickleMessage(_PydanticPickleMessage): pass
25-
class GeneratorMessage(_GeneratorMessage): pass
2625
class Director(_Director): pass

src/iop/_business_host.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from . import _iris
55
from ._common import _Common
66
from ._message import _Message as Message
7-
from ._decorators import input_serializer_param, output_deserializer
8-
from ._dispatch import dispatch_serializer, dispatch_deserializer
7+
from ._decorators import input_serializer_param, output_deserializer, input_deserializer, output_serializer
8+
from ._dispatch import dispatch_serializer, dispatch_deserializer, dispach_message
99
from ._async_request import AsyncRequest
1010
from ._generator_request import _GeneratorRequest
1111

@@ -68,8 +68,7 @@ async def send_request_async_ng(self, target: str, request: Union[Message, Any],
6868
Response from target component
6969
"""
7070
return await AsyncRequest(target, request, timeout, description, self)
71-
72-
@input_serializer_param(1, 'request')
71+
7372
def send_generator_request(self, target: str, request: Union[Message, Any],
7473
timeout: int = -1, description: Optional[str] = None) -> _GeneratorRequest:
7574
"""Send message as a generator request to target component.
@@ -81,7 +80,7 @@ def send_generator_request(self, target: str, request: Union[Message, Any],
8180
Returns:
8281
_GeneratorRequest: An instance of _GeneratorRequest to iterate over responses
8382
Raises:
84-
TypeError: If request is not of type Message or _GeneratorMessage
83+
TypeError: If request is not of type Message
8584
"""
8685
return _GeneratorRequest(self, target, request, timeout, description)
8786

@@ -196,7 +195,7 @@ def on_get_connections(self) -> List[str]:
196195
"""
197196
## Parse the class code to find all invocations of send_request_sync and send_request_async
198197
## and return the targets
199-
targer_list = []
198+
target_list = []
200199
# get the source code of the class
201200
source = getsource(self.__class__)
202201
# find all invocations of send_request_sync and send_request_async
@@ -211,29 +210,47 @@ def on_get_connections(self) -> List[str]:
211210
if target.find("=") != -1:
212211
# it's a keyword argument, remove the keyword
213212
target = target[target.find("=")+1:].strip()
214-
if target not in targer_list:
215-
targer_list.append(target)
213+
if target not in target_list:
214+
target_list.append(target)
216215
i = source.find(method, i+1)
217216

218-
for target in targer_list:
217+
for target in target_list:
219218
# if target is a string, remove the quotes
220219
if target[0] == "'" and target[-1] == "'":
221-
targer_list[targer_list.index(target)] = target[1:-1]
220+
target_list[target_list.index(target)] = target[1:-1]
222221
elif target[0] == '"' and target[-1] == '"':
223-
targer_list[targer_list.index(target)] = target[1:-1]
222+
target_list[target_list.index(target)] = target[1:-1]
224223
# if target is a variable, try to find the value of the variable
225224
else:
226225
self.on_init()
227226
try:
228227
if target.find("self.") != -1:
229228
# it's a class variable
230-
targer_list[targer_list.index(target)] = getattr(self, target[target.find(".")+1:])
229+
target_list[target_list.index(target)] = getattr(self, target[target.find(".")+1:])
231230
elif target.find(".") != -1:
232231
# it's a class variable
233-
targer_list[targer_list.index(target)] = getattr(getattr(self, target[:target.find(".")]), target[target.find(".")+1:])
232+
target_list[target_list.index(target)] = getattr(getattr(self, target[:target.find(".")]), target[target.find(".")+1:])
234233
else:
235-
targer_list[targer_list.index(target)] = getattr(self, target)
234+
target_list[target_list.index(target)] = getattr(self, target)
236235
except Exception as e:
237236
pass
238237

239-
return targer_list
238+
return target_list
239+
240+
@input_deserializer
241+
def _dispatch_generator_started(self, request: Any) -> Any:
242+
"""For internal use only."""
243+
self._gen = dispach_message(self, request)
244+
# check if self._gen is a generator
245+
if not hasattr(self._gen, '__iter__'):
246+
raise TypeError("Expected a generator or iterable object, got: {}".format(type(self._gen).__name__))
247+
248+
return _iris.get_iris().IOP.Generator.Message.Ack._New()
249+
250+
@output_serializer
251+
def _dispatch_generator_poll(self) -> Any:
252+
"""For internal use only."""
253+
try:
254+
return next(self._gen)
255+
except StopIteration:
256+
return _iris.get_iris().IOP.Generator.Message.Stop._New()

src/iop/_business_operation.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -73,33 +73,3 @@ def OnMessage(self, request: Any) -> Any:
7373
"""
7474
return
7575

76-
@input_deserializer
77-
@output_serializer
78-
def _dispatch_private_session_started(self, request: Any) -> Any:
79-
"""For internal use only."""
80-
self._gen = self.on_private_session_started(request)
81-
from . import _iris
82-
return _iris.get_iris().IOP.PrivateSession.Message.Ack._New()
83-
84-
def on_private_session_started(self, request: Any) -> Any:
85-
"""Called when a private session is started.
86-
87-
This method can be overridden to handle the start of a private session.
88-
89-
Args:
90-
request: The request object containing session details.
91-
92-
Returns:
93-
Any response object or None.
94-
"""
95-
return None
96-
97-
@input_serializer
98-
@output_deserializer
99-
def _dispatch_private_session_poll(self, request: Any) -> Any:
100-
"""For internal use only."""
101-
try:
102-
return next(self._gen)
103-
except StopIteration:
104-
import _iris
105-
return _iris.get_iris().IOP.PrivateSession.Message.Stop._New()

src/iop/_dispatch.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
from inspect import signature, Parameter
22
from typing import Any, List, Tuple, Callable
33

4-
from ._serialization import serialize_message, serialize_pickle_message, deserialize_message, deserialize_pickle_message
4+
from ._serialization import serialize_message, serialize_pickle_message, deserialize_message, deserialize_pickle_message, serialize_message_generator, serialize_pickle_message_generator
55
from ._message_validator import is_message_instance, is_pickle_message_instance, is_iris_object_instance
66

7-
from ._message import _GeneratorMessage
8-
9-
def dispatch_serializer(message: Any) -> Any:
7+
def dispatch_serializer(message: Any, is_generator: bool = False) -> Any:
108
"""Serializes the message based on its type.
119
1210
Args:
@@ -20,8 +18,12 @@ def dispatch_serializer(message: Any) -> Any:
2018
"""
2119
if message is not None:
2220
if is_message_instance(message):
21+
if is_generator:
22+
return serialize_message_generator(message)
2323
return serialize_message(message)
2424
elif is_pickle_message_instance(message):
25+
if is_generator:
26+
return serialize_pickle_message_generator(message)
2527
return serialize_pickle_message(message)
2628
elif is_iris_object_instance(message):
2729
return message
@@ -75,9 +77,6 @@ def dispach_message(host: Any, request: Any) -> Any:
7577
module = request.__class__.__module__
7678
classname = request.__class__.__name__
7779

78-
if isinstance(request, _GeneratorMessage):
79-
return getattr(host, '_dispatch_private_session_started')(request)
80-
8180
for msg, method in host.DISPATCH:
8281
if msg == module + "." + classname:
8382
call = method

src/iop/_generator_request.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,30 @@
11
from typing import Any, Optional, Union
22

33
from . import _iris
4-
from ._message import _GeneratorMessage
5-
4+
from ._dispatch import dispatch_serializer, dispatch_deserializer
65

76
class _GeneratorRequest:
87
"""Generator class to interetate over responses from a request.
98
This class is used to handle the responses from a request in a generator-like manner."""
109

11-
def __init__(self, host: Any, target: str, request: Union[_GeneratorMessage, Any],
10+
def __init__(self, host: Any, target: str, request: Any,
1211
timeout: int = -1, description: Optional[str] = None) -> None:
1312
self.host = host
1413
self.target = target
1514
self.request = request
16-
self.timeout = timeout
17-
self.description = description
18-
self._response = None
1915

20-
# if not isinstance(self.request, _GeneratorMessage):
21-
# raise TypeError("request must be of type Message or _GeneratorMessage")
22-
23-
ack_rsponse = self.host.send_request_sync(self.target, self.request)
16+
ack_response = self.host.send_request_sync(self.target, dispatch_serializer(self.request, is_generator=True),
17+
timeout=timeout, description=description)
2418

25-
if ack_rsponse is None:
19+
if ack_response is None or not ack_response._IsA("IOP.Generator.Message.Ack"):
2620
raise RuntimeError("Failed to send request, no acknowledgment received.")
2721

2822
def __iter__(self):
2923
return self
3024

3125
def __next__(self):
32-
poll = _iris.get_iris().IOP.PrivateSession.Message.Poll._New()
26+
poll = _iris.get_iris().IOP.Generator.Message.Poll._New()
3327
rsp = self.host.send_request_sync(self.target, poll)
34-
if rsp is None:
28+
if rsp is None or (hasattr(rsp, '_IsA') and rsp._IsA("IOP.Generator.Message.Stop")):
3529
raise StopIteration("No more responses available.")
3630
return rsp

src/iop/_message.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class _Message:
99
"""
1010
pass
1111

12-
class _PickleMessage:
12+
class _PickleMessage(_Message):
1313
""" The abstract class that is the superclass for persistent messages sent from one component to another.
1414
This class has no properties or methods. Users subclass Message and add properties.
1515
The IOP framework provides the persistence to objects derived from the Message class.
@@ -28,5 +28,3 @@ class _PydanticPickleMessage(BaseModel):
2828
def __init__(self, **data: Any):
2929
super().__init__(**data)
3030

31-
class _GeneratorMessage(_Message):
32-
""""Base message to initialize generator function"""

src/iop/_serialization.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from pydantic import BaseModel, TypeAdapter, ValidationError
1111

1212
from . import _iris
13-
from ._message import _PydanticPickleMessage, _Message, _GeneratorMessage
13+
from ._message import _PydanticPickleMessage, _Message
1414
from ._utils import _Utils
1515

1616
class SerializationError(Exception):
@@ -37,18 +37,18 @@ def _convert_to_json_safe(obj: Any) -> Any:
3737
raise SerializationError(f"Object {obj} must be a Pydantic model or dataclass Message")
3838

3939
@staticmethod
40-
def serialize(message: Any, use_pickle: bool = False) -> Any:
40+
def serialize(message: Any, use_pickle: bool = False, is_generator:bool = False) -> Any:
4141
"""Serializes a message to IRIS format."""
42-
if isinstance(message, _PydanticPickleMessage) or use_pickle:
43-
return MessageSerializer._serialize_pickle(message)
44-
return MessageSerializer._serialize_json(message)
42+
if use_pickle:
43+
return MessageSerializer._serialize_pickle(message, is_generator)
44+
return MessageSerializer._serialize_json(message, is_generator)
4545

4646
@staticmethod
47-
def _serialize_json(message: Any) -> Any:
47+
def _serialize_json(message: Any, is_generator: bool = False) -> Any:
4848
json_string = MessageSerializer._convert_to_json_safe(message)
4949

50-
if isinstance(json_string, _GeneratorMessage):
51-
msg = _iris.get_iris().cls('IOP.PrivateSession.Message.Start')._New()
50+
if is_generator:
51+
msg = _iris.get_iris().cls('IOP.Generator.Message.Start')._New()
5252
else:
5353
msg = _iris.get_iris().cls('IOP.Message')._New()
5454
msg.classname = f"{message.__class__.__module__}.{message.__class__.__name__}"
@@ -91,9 +91,12 @@ def _deserialize_json(serial: Any) -> Any:
9191
raise SerializationError(f"Failed to deserialize JSON: {str(e)}")
9292

9393
@staticmethod
94-
def _serialize_pickle(message: Any) -> Any:
94+
def _serialize_pickle(message: Any, is_generator: bool = False) -> Any:
9595
pickle_string = codecs.encode(pickle.dumps(message), "base64").decode()
96-
msg = _iris.get_iris().cls('IOP.PickleMessage')._New()
96+
if is_generator:
97+
msg = _iris.get_iris().cls('IOP.Generator.Message.StartPickle')._New()
98+
else:
99+
msg = _iris.get_iris().cls('IOP.PickleMessage')._New()
97100
msg.classname = f"{message.__class__.__module__}.{message.__class__.__name__}"
98101
msg.jstr = _Utils.string_to_stream(pickle_string)
99102
return msg
@@ -168,7 +171,9 @@ def dataclass_to_dict(instance: Any) -> Dict:
168171
return result
169172

170173
# Maintain backwards compatibility
171-
serialize_pickle_message = lambda msg: MessageSerializer.serialize(msg, use_pickle=True)
172-
serialize_message = lambda msg: MessageSerializer.serialize(msg, use_pickle=False)
174+
serialize_pickle_message = lambda msg: MessageSerializer.serialize(msg, use_pickle=True, is_generator=False)
175+
serialize_pickle_message_generator = lambda msg: MessageSerializer.serialize(msg, use_pickle=True, is_generator=True)
176+
serialize_message = lambda msg: MessageSerializer.serialize(msg, use_pickle=False, is_generator=False)
177+
serialize_message_generator = lambda msg: MessageSerializer.serialize(msg, use_pickle=False, is_generator=True)
173178
deserialize_pickle_message = lambda serial: MessageSerializer.deserialize(serial, use_pickle=True)
174179
deserialize_message = lambda serial: MessageSerializer.deserialize(serial, use_pickle=False)

0 commit comments

Comments
 (0)