|
| 1 | +import datetime |
| 2 | + |
| 3 | +from betterproto2.lib.std.google.protobuf import Timestamp as VanillaTimestamp |
| 4 | + |
| 5 | + |
| 6 | +class Timestamp(VanillaTimestamp): |
| 7 | + @classmethod |
| 8 | + def from_datetime(cls, dt: datetime.datetime) -> "Timestamp": |
| 9 | + # manual epoch offset calulation to avoid rounding errors, |
| 10 | + # to support negative timestamps (before 1970) and skirt |
| 11 | + # around datetime bugs (apparently 0 isn't a year in [0, 9999]??) |
| 12 | + offset = dt - datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) |
| 13 | + # below is the same as timedelta.total_seconds() but without dividing by 1e6 |
| 14 | + # so we end up with microseconds as integers instead of seconds as float |
| 15 | + offset_us = (offset.days * 24 * 60 * 60 + offset.seconds) * 10**6 + offset.microseconds |
| 16 | + seconds, us = divmod(offset_us, 10**6) |
| 17 | + return cls(seconds, us * 1000) |
| 18 | + |
| 19 | + def to_datetime(self) -> datetime.datetime: |
| 20 | + # datetime.fromtimestamp() expects a timestamp in seconds, not microseconds |
| 21 | + # if we pass it as a floating point number, we will run into rounding errors |
| 22 | + # see also #407 |
| 23 | + offset = datetime.timedelta(seconds=self.seconds, microseconds=self.nanos // 1000) |
| 24 | + return datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + offset |
| 25 | + |
| 26 | + @staticmethod |
| 27 | + def timestamp_to_json(dt: datetime.datetime) -> str: |
| 28 | + nanos = dt.microsecond * 1e3 |
| 29 | + if dt.tzinfo is not None: |
| 30 | + # change timezone aware datetime objects to utc |
| 31 | + dt = dt.astimezone(datetime.timezone.utc) |
| 32 | + copy = dt.replace(microsecond=0, tzinfo=None) |
| 33 | + result = copy.isoformat() |
| 34 | + if (nanos % 1e9) == 0: |
| 35 | + # If there are 0 fractional digits, the fractional |
| 36 | + # point '.' should be omitted when serializing. |
| 37 | + return f"{result}Z" |
| 38 | + if (nanos % 1e6) == 0: |
| 39 | + # Serialize 3 fractional digits. |
| 40 | + return f"{result}.{int(nanos // 1e6) :03d}Z" |
| 41 | + if (nanos % 1e3) == 0: |
| 42 | + # Serialize 6 fractional digits. |
| 43 | + return f"{result}.{int(nanos // 1e3) :06d}Z" |
| 44 | + # Serialize 9 fractional digits. |
| 45 | + return f"{result}.{nanos:09d}" |
0 commit comments