-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathksuid.py
More file actions
133 lines (90 loc) · 3.98 KB
/
ksuid.py
File metadata and controls
133 lines (90 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import math
import secrets
import typing as t
from datetime import datetime as datetime_lib
from datetime import timezone
from functools import total_ordering
from baseconv import base62
# KSUID's epoch starts more recently so that the 32-bit number space gives a
# significantly higher useful lifetime of around 136 years from March 2017.
# This number (14e8) was picked to be easy to remember.
EPOCH_STAMP = 1400000000
class ByteArrayLengthException(Exception):
pass
SelfT = t.TypeVar("SelfT", bound="Ksuid")
@total_ordering
class Ksuid:
"""Ksuid class inspired by https://github.com/segmentio/ksuid"""
# Timestamp is a uint32
TIMESTAMP_LENGTH_IN_BYTES = 4
# Payload is 16-bytes
PAYLOAD_LENGTH_IN_BYTES = 16
# The length in bytes
BYTES_LENGTH = TIMESTAMP_LENGTH_IN_BYTES + PAYLOAD_LENGTH_IN_BYTES
# The length of the base64 representation (str)
BASE62_LENGTH = math.ceil(BYTES_LENGTH * 4 / 3)
_uid: bytes
@classmethod
def from_base62(cls: t.Type[SelfT], data: str) -> SelfT:
"""initializes Ksuid from base62 encoding"""
return cls.from_bytes(int.to_bytes(int(base62.decode(data)), cls.BYTES_LENGTH, "big"))
@classmethod
def from_bytes(cls: t.Type[SelfT], value: bytes) -> SelfT:
"""initializes Ksuid from bytes"""
if len(value) != cls.TIMESTAMP_LENGTH_IN_BYTES + cls.PAYLOAD_LENGTH_IN_BYTES:
raise ByteArrayLengthException()
res = cls()
res._uid = value
return res
def __init__(self, datetime: t.Optional[datetime_lib] = None, payload: t.Optional[bytes] = None):
if payload is not None and len(payload) != self.PAYLOAD_LENGTH_IN_BYTES:
raise ByteArrayLengthException()
_payload = secrets.token_bytes(self.PAYLOAD_LENGTH_IN_BYTES) if payload is None else payload
datetime = datetime.astimezone(timezone.utc) if datetime is not None else datetime_lib.now(tz=timezone.utc)
self._uid = self._inner_init(datetime, _payload)
def __str__(self) -> str:
"""Creates a base62 string representation"""
return base62.encode(int.from_bytes(bytes(self), "big")).zfill(self.BASE62_LENGTH)
def __repr__(self) -> str:
return str(self)
def __bytes__(self) -> bytes:
return self._uid
def __eq__(self, other: object) -> bool:
assert isinstance(other, self.__class__)
return self._uid == other._uid
def __lt__(self: SelfT, other: SelfT) -> bool:
return self._uid < other._uid
def __hash__(self) -> int:
return int.from_bytes(self._uid, "big")
def _inner_init(self, datetime: datetime_lib, payload: bytes) -> bytes:
timestamp = int(datetime.timestamp() - EPOCH_STAMP)
return int.to_bytes(timestamp, self.TIMESTAMP_LENGTH_IN_BYTES, "big") + payload
@property
def datetime(self) -> datetime_lib:
unix_time = self.timestamp
return datetime_lib.fromtimestamp(unix_time, tz=timezone.utc)
@property
def timestamp(self) -> float:
return float(int.from_bytes(self._uid[: self.TIMESTAMP_LENGTH_IN_BYTES], "big") + EPOCH_STAMP)
@property
def payload(self) -> bytes:
"""Returns the payload of the Ksuid with the timestamp encoded portion removed"""
return self._uid[self.TIMESTAMP_LENGTH_IN_BYTES :]
class KsuidMs(Ksuid):
"""
Ksuid class with increased (millisecond) accuracy
"""
# Timestamp is a uint32
TIMESTAMP_LENGTH_IN_BYTES = 5
# Payload is 16-bytes
PAYLOAD_LENGTH_IN_BYTES = 15
TIMESTAMP_MULTIPLIER = 256
def _inner_init(self, datetime: datetime_lib, payload: bytes) -> bytes:
timestamp = round((datetime.timestamp() - EPOCH_STAMP) * self.TIMESTAMP_MULTIPLIER)
return int.to_bytes(timestamp, self.TIMESTAMP_LENGTH_IN_BYTES, "big") + payload
@property
def timestamp(self) -> float:
return (
float(int.from_bytes(self._uid[: self.TIMESTAMP_LENGTH_IN_BYTES], "big")) / self.TIMESTAMP_MULTIPLIER
+ EPOCH_STAMP
)