|
1 | 1 | import logging |
2 | 2 | from typing import Literal, Optional, Union, cast |
3 | 3 |
|
| 4 | +from .amqp_consumer_handler import AMQPMessagingHandler |
4 | 5 | from .entities import StreamOptions |
5 | 6 | from .options import ( |
6 | 7 | ReceiverOptionUnsettled, |
7 | 8 | ReceiverOptionUnsettledWithFilters, |
8 | 9 | ) |
9 | | -from .qpid.proton._handlers import MessagingHandler |
10 | 10 | from .qpid.proton._message import Message |
11 | 11 | from .qpid.proton.utils import ( |
12 | 12 | BlockingConnection, |
@@ -37,7 +37,7 @@ def __init__( |
37 | 37 | self, |
38 | 38 | conn: BlockingConnection, |
39 | 39 | addr: str, |
40 | | - handler: Optional[MessagingHandler] = None, |
| 40 | + handler: Optional[AMQPMessagingHandler] = None, |
41 | 41 | stream_options: Optional[StreamOptions] = None, |
42 | 42 | credit: Optional[int] = None, |
43 | 43 | ): |
@@ -67,7 +67,23 @@ def _open(self) -> None: |
67 | 67 |
|
68 | 68 | def _update_connection(self, conn: BlockingConnection) -> None: |
69 | 69 | self._conn = conn |
70 | | - self._receiver = self._create_receiver(self._addr) |
| 70 | + if self._stream_options is None: |
| 71 | + print("creating new receiver without stream") |
| 72 | + self._receiver = self._conn.create_receiver( |
| 73 | + self._addr, |
| 74 | + options=ReceiverOptionUnsettled(self._addr), |
| 75 | + handler=self._handler, |
| 76 | + ) |
| 77 | + else: |
| 78 | + print("creating new stream receiver") |
| 79 | + self._stream_options.offset(self._handler.offset - 1) # type: ignore |
| 80 | + self._receiver = self._conn.create_receiver( |
| 81 | + self._addr, |
| 82 | + options=ReceiverOptionUnsettledWithFilters( |
| 83 | + self._addr, self._stream_options |
| 84 | + ), |
| 85 | + handler=self._handler, |
| 86 | + ) |
71 | 87 |
|
72 | 88 | def _set_consumers_list(self, consumers: []) -> None: # type: ignore |
73 | 89 | self._consumers = consumers |
@@ -149,3 +165,7 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: |
149 | 165 | def address(self) -> str: |
150 | 166 | """Get the current publisher address.""" |
151 | 167 | return self._addr |
| 168 | + |
| 169 | + @property |
| 170 | + def handler(self) -> Optional[AMQPMessagingHandler]: |
| 171 | + return self._handler |
0 commit comments