|
3 | 3 | import re |
4 | 4 | from abc import ABC, abstractmethod |
5 | 5 | from datetime import datetime |
| 6 | +from functools import cache |
6 | 7 | from pathlib import Path |
7 | 8 | from uuid import uuid4 |
8 | 9 |
|
|
30 | 31 | ) |
31 | 32 |
|
32 | 33 |
|
| 34 | +@cache |
| 35 | +def _load_existing_ids(): |
| 36 | + if os.path.exists(PRODUCT_IDS_GENERATED_FILE): |
| 37 | + with open(PRODUCT_IDS_GENERATED_FILE, "r") as file: |
| 38 | + return set(json_load(file)) |
| 39 | + return set() |
| 40 | + |
| 41 | + |
33 | 42 | class CpmSystemId(BaseModel, ABC): |
34 | 43 | __root__: str = None |
35 | 44 |
|
@@ -111,28 +120,24 @@ def latest_number(self): |
111 | 120 | class ProductId(CpmSystemId): |
112 | 121 | @classmethod |
113 | 122 | def create(cls): |
114 | | - existing_ids = cls.load_existing_ids() |
115 | | - while True: |
116 | | - """No current_id needed, key is generated randomly.""" |
117 | | - rng = random.Random(datetime.now().timestamp()) |
118 | | - product_id = "-".join( |
119 | | - "".join(rng.choices(PRODUCT_ID_VALID_CHARS, k=PRODUCT_ID_PART_LENGTH)) |
120 | | - for _ in range(PRODUCT_ID_NUMBER_OF_PARTS) |
121 | | - ) |
122 | | - if f"P.{product_id}" not in existing_ids: |
123 | | - return cls(__root__=f"P.{product_id}") |
| 123 | + """No current_id needed, key is generated randomly.""" |
| 124 | + rng = random.Random(datetime.now().timestamp()) |
| 125 | + product_id = "-".join( |
| 126 | + "".join(rng.choices(PRODUCT_ID_VALID_CHARS, k=PRODUCT_ID_PART_LENGTH)) |
| 127 | + for _ in range(PRODUCT_ID_NUMBER_OF_PARTS) |
| 128 | + ) |
| 129 | + if f"P.{product_id}" in cls.load_existing_ids(): |
| 130 | + return cls.create() |
| 131 | + return cls(__root__=f"P.{product_id}") |
124 | 132 |
|
125 | 133 | @classmethod |
126 | 134 | def validate_cpm_system_id(cls, cpm_system_id: str) -> bool: |
127 | 135 | """Validate that the ProductId has the correct format.""" |
128 | 136 | return PRODUCT_ID_PATTERN.match(cpm_system_id) is not None |
129 | 137 |
|
130 | | - @staticmethod |
131 | | - def load_existing_ids(): |
132 | | - if os.path.exists(PRODUCT_IDS_GENERATED_FILE): |
133 | | - with open(PRODUCT_IDS_GENERATED_FILE, "r") as file: |
134 | | - return set(json_load(file)) |
135 | | - return set() |
| 138 | + @classmethod |
| 139 | + def load_existing_ids(cls): |
| 140 | + return _load_existing_ids() |
136 | 141 |
|
137 | 142 |
|
138 | 143 | class ProductTeamId(CpmSystemId): |
|
0 commit comments