|
| 1 | +import os |
1 | 2 | import random |
2 | 3 | import re |
3 | 4 | from abc import ABC, abstractmethod |
4 | 5 | from datetime import datetime |
| 6 | +from pathlib import Path |
5 | 7 | from uuid import uuid4 |
6 | 8 |
|
7 | 9 | from domain.core.base import BaseModel |
8 | 10 | from domain.core.device_key import validate_key |
9 | 11 | from domain.core.error import InvalidKeyPattern |
10 | 12 | from domain.core.product_key import ProductKeyType |
| 13 | +from event.json import json_load |
11 | 14 | from pydantic import validator |
12 | 15 |
|
13 | 16 | FIRST_ASID = 200000099999 |
|
19 | 22 | PRODUCT_ID_PATTERN = re.compile( |
20 | 23 | rf"^P\.[{PRODUCT_ID_VALID_CHARS}]{{{PRODUCT_ID_PART_LENGTH}}}-[{PRODUCT_ID_VALID_CHARS}]{{{PRODUCT_ID_PART_LENGTH}}}$" |
21 | 24 | ) |
| 25 | + |
| 26 | +PATH_TO_CPM_SYSTEM_IDS = Path(__file__).parent |
| 27 | +PRODUCT_IDS_GENERATED_FILE = f"{PATH_TO_CPM_SYSTEM_IDS}/generated_ids/product_ids.json" |
22 | 28 | PRODUCT_TEAM_ID_PATTERN = re.compile( |
23 | 29 | r"^[a-zA-Z0-9]+\.([a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12})$" |
24 | 30 | ) |
@@ -105,19 +111,29 @@ def latest_number(self): |
105 | 111 | class ProductId(CpmSystemId): |
106 | 112 | @classmethod |
107 | 113 | def create(cls): |
108 | | - """No current_id needed, key is generated randomly.""" |
109 | | - rng = random.Random(datetime.now().timestamp()) |
110 | | - product_id = "-".join( |
111 | | - "".join(rng.choices(PRODUCT_ID_VALID_CHARS, k=PRODUCT_ID_PART_LENGTH)) |
112 | | - for _ in range(PRODUCT_ID_NUMBER_OF_PARTS) |
113 | | - ) |
114 | | - return cls(__root__=f"P.{product_id}") |
| 114 | + existing_ids = cls.load_existing_ids() |
| 115 | + while True: |
| 116 | + """No current_id needed, key is generated randomly.""" |
| 117 | + rng = random.Random(datetime.now().timestamp()) |
| 118 | + product_id = "-".join( |
| 119 | + "".join(rng.choices(PRODUCT_ID_VALID_CHARS, k=PRODUCT_ID_PART_LENGTH)) |
| 120 | + for _ in range(PRODUCT_ID_NUMBER_OF_PARTS) |
| 121 | + ) |
| 122 | + if f"P.{product_id}" not in existing_ids: |
| 123 | + return cls(__root__=f"P.{product_id}") |
115 | 124 |
|
116 | 125 | @classmethod |
117 | 126 | def validate_cpm_system_id(cls, cpm_system_id: str) -> bool: |
118 | 127 | """Validate that the ProductId has the correct format.""" |
119 | 128 | return PRODUCT_ID_PATTERN.match(cpm_system_id) is not None |
120 | 129 |
|
| 130 | + @staticmethod |
| 131 | + def load_existing_ids(): |
| 132 | + if os.path.exists(PRODUCT_IDS_GENERATED_FILE): |
| 133 | + with open(PRODUCT_IDS_GENERATED_FILE, "r") as file: |
| 134 | + return set(json_load(file)) |
| 135 | + return set() |
| 136 | + |
121 | 137 |
|
122 | 138 | class ProductTeamId(CpmSystemId): |
123 | 139 | @classmethod |
|
0 commit comments