Skip to content

Commit 82053b7

Browse files
authored
[EventHub] handle lowest/"unset" timestamp value from service (Azure#40363)
* [EventHub/ServiceBus] update datetime from timestamp to account for min AMQP timestamp sentinel val * add sb tests * eh test * copilot comments * pylint * lint
1 parent 03e6d92 commit 82053b7

File tree

10 files changed

+85
-19
lines changed

10 files changed

+85
-19
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
### Bugs Fixed
3232

3333
- Fixed a bug where service errors were incorrectly required and expected to have info/description fields.
34+
- Fixed a bug where max number of messages was not being requested when receiving from the service due to an incorrect link credit calculation.
35+
- Fixed a bug where the lowest possible timestamp returned by the service to represent an unset time was not being parsed correctly.
3436

3537
## 5.14.0 (2025-02-13)
3638

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/utils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,23 @@
1414
from ._encode import encode_payload
1515

1616
TZ_UTC: timezone = timezone.utc
17+
# Number of seconds between the Unix epoch (1/1/1970) and year 1 CE.
18+
# This is the lowest value that can be represented by an AMQP timestamp.
19+
CE_ZERO_SECONDS: int = -62_135_596_800
1720

1821

19-
def utc_from_timestamp(timestamp):
22+
def utc_from_timestamp(timestamp: float) -> datetime.datetime:
23+
"""
24+
:param float timestamp: Timestamp in seconds to be converted to datetime.
25+
:rtype: datetime.datetime
26+
:returns: A datetime object representing the timestamp in UTC.
27+
"""
28+
# The AMQP timestamp is the number of seconds since the Unix epoch.
29+
# AMQP brokers represent the lowest value as -62_135_596_800 (the
30+
# number of seconds between the Unix epoch (1/1/1970) and year 1 CE) as
31+
# a sentinel for a time which is not set.
32+
if timestamp == CE_ZERO_SECONDS:
33+
return datetime.datetime.min.replace(tzinfo=TZ_UTC)
2034
return datetime.datetime.fromtimestamp(timestamp, tz=TZ_UTC)
2135

2236

sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
PROP_LAST_ENQUEUED_OFFSET,
2323
)
2424

25-
2625
if TYPE_CHECKING:
2726
from ._transport._base import AmqpTransport
2827
from ._pyamqp.message import Message as pyamqp_Message
@@ -44,11 +43,24 @@
4443
_LOGGER = logging.getLogger(__name__)
4544

4645

47-
4846
TZ_UTC: timezone = timezone.utc
47+
# Number of seconds between the Unix epoch (1/1/1970) and year 1 CE.
48+
# This is the lowest value that can be represented by an AMQP timestamp.
49+
CE_ZERO_SECONDS: int = -62_135_596_800
4950

5051

51-
def utc_from_timestamp(timestamp):
52+
def utc_from_timestamp(timestamp: float) -> datetime.datetime:
53+
"""
54+
:param float timestamp: Timestamp in seconds to be converted to datetime.
55+
:rtype: datetime.datetime
56+
:returns: A datetime object representing the timestamp in UTC.
57+
"""
58+
# The AMQP timestamp is the number of seconds since the Unix epoch.
59+
# AMQP brokers represent the lowest value as -62_135_596_800 (the
60+
# number of seconds between the Unix epoch (1/1/1970) and year 1 CE) as
61+
# a sentinel for a time which is not set.
62+
if timestamp == CE_ZERO_SECONDS:
63+
return datetime.datetime.min.replace(tzinfo=TZ_UTC)
5264
return datetime.datetime.fromtimestamp(timestamp, tz=TZ_UTC)
5365

5466

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_properties_async.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,6 @@ async def test_get_partition_ids(auth_credentials_async, uamqp_transport):
100100
assert partition_ids == ["0", "1"]
101101

102102

103-
@pytest.mark.skipif(
104-
sys.platform.startswith("win"),
105-
reason="Large negative timestamp to datetime conversion fails on Windows with: https://bugs.python.org/issue36439",
106-
)
107103
@pytest.mark.liveTest
108104
@pytest.mark.asyncio
109105
async def test_get_partition_properties(auth_credentials_async, uamqp_transport):

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_properties.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ def test_get_partition_ids(auth_credentials, uamqp_transport):
9797
assert partition_ids == ["0", "1"]
9898

9999

100-
@pytest.mark.skipif(
101-
sys.platform.startswith("win"),
102-
reason="Large negative timestamp to datetime conversion fails on Windows with: https://bugs.python.org/issue36439",
103-
)
104100
@pytest.mark.liveTest
105101
def test_get_partition_properties(auth_credentials, uamqp_transport):
106102
fully_qualified_namespace, eventhub_name, credential = auth_credentials

sdk/eventhub/azure-eventhub/tests/unittest/test_event_data.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
UamqpTransport = None
1919
from azure.eventhub._transport._pyamqp_transport import PyamqpTransport
2020
from azure.eventhub._pyamqp.message import Message, Properties, Header
21+
from azure.eventhub._utils import CE_ZERO_SECONDS
22+
from azure.eventhub._constants import PROP_TIMESTAMP
2123
from azure.eventhub.amqp import AmqpAnnotatedMessage, AmqpMessageHeader, AmqpMessageProperties
2224

2325
from azure.eventhub import _common
@@ -101,7 +103,7 @@ def test_sys_properties(uamqp_transport):
101103
properties.group_sequence = 1
102104
properties.reply_to_group_id = "reply_to_group_id"
103105
message = uamqp.message.Message(properties=properties)
104-
message.annotations = {_common.PROP_OFFSET: "@latest"}
106+
message.annotations = {_common.PROP_OFFSET: "@latest", PROP_TIMESTAMP: CE_ZERO_SECONDS * 1000}
105107
else:
106108
properties = Properties(
107109
message_id="message_id",
@@ -118,7 +120,7 @@ def test_sys_properties(uamqp_transport):
118120
group_sequence=1,
119121
reply_to_group_id="reply_to_group_id",
120122
)
121-
message_annotations = {_common.PROP_OFFSET: "@latest"}
123+
message_annotations = {_common.PROP_OFFSET: "@latest", PROP_TIMESTAMP: CE_ZERO_SECONDS * 1000}
122124
message = Message(properties=properties, message_annotations=message_annotations)
123125
ed = EventData._from_message(message) # type: EventData
124126

@@ -136,6 +138,7 @@ def test_sys_properties(uamqp_transport):
136138
assert ed.system_properties[_common.PROP_GROUP_ID] == properties.group_id
137139
assert ed.system_properties[_common.PROP_GROUP_SEQUENCE] == properties.group_sequence
138140
assert ed.system_properties[_common.PROP_REPLY_TO_GROUP_ID] == properties.reply_to_group_id
141+
assert ed.enqueued_time == datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
139142

140143

141144
def test_event_data_batch(uamqp_transport):

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
- Fixed a bug where service errors were incorrectly expected to have info/description fields set in all cases.
88
- Fixed a bug where the type in azure.servicebus.management.AuthorizationRule was not being correctly passed to the request.
9+
- Fixed a bug where max number of messages was not being requested when receiving from the service due to an incorrect link credit calculation. ([#40156](https://github.com/Azure/azure-sdk-for-python/issues/40156))
910

1011
## 7.14.0 (2025-02-13)
1112

sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,24 @@
6565

6666
_log = logging.getLogger(__name__)
6767

68+
TZ_UTC: timezone = timezone.utc
69+
# Number of seconds between the Unix epoch (1/1/1970) and year 1 CE.
70+
# This is the lowest value that can be represented by an AMQP timestamp.
71+
CE_ZERO_SECONDS: int = -62_135_596_800
6872

69-
def utc_from_timestamp(timestamp):
70-
return datetime.datetime.fromtimestamp(timestamp, tz=timezone.utc)
73+
def utc_from_timestamp(timestamp: float) -> datetime.datetime:
74+
"""
75+
:param float timestamp: Timestamp in seconds to be converted to datetime.
76+
:rtype: datetime.datetime
77+
:returns: A datetime object representing the timestamp in UTC.
78+
"""
79+
# The AMQP timestamp is the number of seconds since the Unix epoch.
80+
# AMQP brokers represent the lowest value as -62_135_596_800 (the
81+
# number of seconds between the Unix epoch (1/1/1970) and year 1 CE) as
82+
# a sentinel for a time which is not set.
83+
if timestamp == CE_ZERO_SECONDS:
84+
return datetime.datetime.min.replace(tzinfo=TZ_UTC)
85+
return datetime.datetime.fromtimestamp(timestamp, tz=TZ_UTC)
7186

7287

7388
def utc_now():

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/utils.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,22 @@
1414
from ._encode import encode_payload
1515

1616
TZ_UTC: timezone = timezone.utc
17+
# Number of seconds between the Unix epoch (1/1/1970) and year 1 CE.
18+
# This is the lowest value that can be represented by an AMQP timestamp.
19+
CE_ZERO_SECONDS: int = -62_135_596_800
1720

18-
19-
def utc_from_timestamp(timestamp):
21+
def utc_from_timestamp(timestamp: float) -> datetime.datetime:
22+
"""
23+
:param float timestamp: Timestamp in seconds to be converted to datetime.
24+
:rtype: datetime.datetime
25+
:returns: A datetime object representing the timestamp in UTC.
26+
"""
27+
# The AMQP timestamp is the number of seconds since the Unix epoch.
28+
# AMQP brokers represent the lowest value as -62_135_596_800 (the
29+
# number of seconds between the Unix epoch (1/1/1970) and year 1 CE) as
30+
# a sentinel for a time which is not set.
31+
if timestamp == CE_ZERO_SECONDS:
32+
return datetime.datetime.min.replace(tzinfo=TZ_UTC)
2033
return datetime.datetime.fromtimestamp(timestamp, tz=TZ_UTC)
2134

2235

sdk/servicebus/azure-servicebus/tests/test_message.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from azure.servicebus._transport._uamqp_transport import UamqpTransport
88
except (ModuleNotFoundError, ImportError):
99
uamqp = None
10-
from datetime import datetime, timedelta
10+
from datetime import datetime, timedelta, timezone
1111
from azure.servicebus import (
1212
ServiceBusClient,
1313
ServiceBusMessage,
@@ -20,6 +20,10 @@
2020
_X_OPT_PARTITION_KEY,
2121
_X_OPT_VIA_PARTITION_KEY,
2222
_X_OPT_SCHEDULED_ENQUEUE_TIME,
23+
_X_OPT_ENQUEUED_TIME,
24+
)
25+
from azure.servicebus._common.utils import (
26+
CE_ZERO_SECONDS,
2327
)
2428
from azure.servicebus.amqp import AmqpAnnotatedMessage, AmqpMessageBodyType, AmqpMessageProperties, AmqpMessageHeader
2529
from azure.servicebus._pyamqp.message import Message
@@ -69,6 +73,15 @@ def test_servicebus_message_repr_with_props():
6973
in message.__repr__()
7074
)
7175

76+
def test_servicebus_message_min_timestamp():
77+
received_message = Message(
78+
data=[b"data"],
79+
message_annotations={
80+
_X_OPT_ENQUEUED_TIME: CE_ZERO_SECONDS*1000,
81+
},
82+
)
83+
received_message = ServiceBusReceivedMessage(received_message, receiver=None)
84+
assert received_message.enqueued_time_utc == datetime.min.replace(tzinfo=timezone.utc)
7285

7386
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
7487
def test_servicebus_received_message_repr(uamqp_transport):
@@ -80,6 +93,7 @@ def test_servicebus_received_message_repr(uamqp_transport):
8093
_X_OPT_PARTITION_KEY: b"r_key",
8194
_X_OPT_VIA_PARTITION_KEY: b"r_via_key",
8295
_X_OPT_SCHEDULED_ENQUEUE_TIME: 123424566,
96+
_X_OPT_ENQUEUED_TIME: CE_ZERO_SECONDS * 1000,
8397
},
8498
properties={},
8599
)

0 commit comments

Comments
 (0)