|
| 1 | +import dataclasses |
| 2 | +import functools |
| 3 | +import json |
| 4 | +from argparse import FileType |
| 5 | +from sys import stdin |
| 6 | + |
| 7 | +from dacite import from_dict |
| 8 | +from django.conf import settings |
| 9 | +from django.core.management.base import BaseCommand |
| 10 | + |
| 11 | +from apps.project.models import Client, Contractor, Project |
| 12 | +from apps.track.models import Contract, Task |
| 13 | +from apps.user.models import User |
| 14 | + |
| 15 | + |
| 16 | +@dataclasses.dataclass |
| 17 | +class ImportTask: |
| 18 | + name: str |
| 19 | + hours: float |
| 20 | + |
| 21 | + |
| 22 | +@dataclasses.dataclass |
| 23 | +class ImportContract: |
| 24 | + name: str |
| 25 | + hours: float |
| 26 | + tasks: list[ImportTask] |
| 27 | + |
| 28 | + |
| 29 | +@dataclasses.dataclass |
| 30 | +class ImportProject: |
| 31 | + name: str |
| 32 | + project_manager_email: str |
| 33 | + client: str |
| 34 | + contractor: str |
| 35 | + contracts: list[ImportContract] |
| 36 | + |
| 37 | + |
| 38 | +@dataclasses.dataclass |
| 39 | +class ImportData: |
| 40 | + projects: list[ImportProject] |
| 41 | + user_emails: list[str] |
| 42 | + |
| 43 | + |
| 44 | +def cache_with_args(*cache_args): |
| 45 | + cache = {} |
| 46 | + |
| 47 | + def decorator(func): |
| 48 | + @functools.wraps(func) |
| 49 | + def wrapper(*args, **kwargs): |
| 50 | + # Extract the cache-relevant arguments |
| 51 | + cache_key = tuple(kwargs[arg] if arg in kwargs else args[arg] for arg in cache_args) |
| 52 | + if cache_key in cache: |
| 53 | + return cache[cache_key] |
| 54 | + result = func(*args, **kwargs) |
| 55 | + cache[cache_key] = result |
| 56 | + return result |
| 57 | + |
| 58 | + return wrapper |
| 59 | + |
| 60 | + return decorator |
| 61 | + |
| 62 | + |
| 63 | +# NOTE: This is not meant to be run on PRODUCTION!! |
| 64 | +class Command(BaseCommand): |
| 65 | + help = "Generate data for development" |
| 66 | + |
| 67 | + def add_arguments(self, parser): |
| 68 | + parser.add_argument("input_file", nargs="?", type=FileType("r"), default=stdin) |
| 69 | + # TODO: Generate time track data |
| 70 | + # TODO: Generate journal data |
| 71 | + |
| 72 | + def handle(self, **options): |
| 73 | + if not (settings.DEBUG and settings.ALLOW_DUMMY_DATA_SCRIPT): |
| 74 | + self.stdout.write( |
| 75 | + self.style.ERROR( |
| 76 | + "Enable DJANGO_DEBUG and ALLOW_DUMMY_DATA_SCRIPT using environment variable to run this", |
| 77 | + ) |
| 78 | + ) |
| 79 | + return |
| 80 | + |
| 81 | + try: |
| 82 | + import_raw_data = json.load(options["input_file"]) |
| 83 | + except json.decoder.JSONDecodeError as e: |
| 84 | + self.stdout.write( |
| 85 | + self.style.ERROR( |
| 86 | + f"Invalid JSON file: {e}", |
| 87 | + ) |
| 88 | + ) |
| 89 | + return |
| 90 | + |
| 91 | + import_data: ImportData = from_dict( |
| 92 | + data_class=ImportData, |
| 93 | + data=import_raw_data, |
| 94 | + ) |
| 95 | + self.run(import_data) |
| 96 | + |
| 97 | + def get_user_resource_kwargs(self, user) -> dict: |
| 98 | + return { |
| 99 | + "created_by": user, |
| 100 | + "modified_by": user, |
| 101 | + } |
| 102 | + |
| 103 | + def create_user(self, email: str, is_admin: bool = False) -> User: |
| 104 | + insecure_password = "password123" # XXX: Insecure password - Don't use this in production |
| 105 | + if user := User.objects.filter(email=email).first(): |
| 106 | + return user |
| 107 | + self.stdout.write( |
| 108 | + self.style.SUCCESS( |
| 109 | + f"Creating user with email: {email=} {insecure_password=} as {"Admin" if is_admin else "Normal"} User", |
| 110 | + ) |
| 111 | + ) |
| 112 | + if is_admin: |
| 113 | + return User.objects.create_superuser( |
| 114 | + email=email, |
| 115 | + password=insecure_password, |
| 116 | + ) |
| 117 | + return User.objects.create_user( |
| 118 | + email=email, |
| 119 | + password=insecure_password, |
| 120 | + ) |
| 121 | + |
| 122 | + @cache_with_args(1) |
| 123 | + def get_user(self, email: str) -> User | None: |
| 124 | + if user := User.objects.filter(email=email).first(): |
| 125 | + return user |
| 126 | + |
| 127 | + @cache_with_args(2) |
| 128 | + def get_or_create_client(self, creator: User, name: str) -> Client: |
| 129 | + client, created = Client.objects.get_or_create( |
| 130 | + name=name, |
| 131 | + defaults=self.get_user_resource_kwargs(creator), |
| 132 | + ) |
| 133 | + if created: |
| 134 | + self.stdout.write(self.style.SUCCESS(f"Created client {name=}")) |
| 135 | + return client |
| 136 | + |
| 137 | + @cache_with_args(2) |
| 138 | + def get_or_create_contractor(self, creator: User, name: str) -> Contractor: |
| 139 | + contractor, created = Contractor.objects.get_or_create( |
| 140 | + name=name, |
| 141 | + defaults=self.get_user_resource_kwargs(creator), |
| 142 | + ) |
| 143 | + if created: |
| 144 | + self.stdout.write(self.style.SUCCESS(f"Created contractor {name=}")) |
| 145 | + return contractor |
| 146 | + |
| 147 | + @cache_with_args(2, 3, 4) |
| 148 | + def get_or_create_project(self, creator: User, name: str, client: Client, contractor: Contractor) -> Project: |
| 149 | + project, created = Project.objects.get_or_create( |
| 150 | + name=name, |
| 151 | + client=client, |
| 152 | + contractor=contractor, |
| 153 | + defaults=self.get_user_resource_kwargs(creator), |
| 154 | + ) |
| 155 | + if created: |
| 156 | + self.stdout.write(self.style.SUCCESS(f"Created Project {name=} {client=} {contractor=}")) |
| 157 | + return project |
| 158 | + |
| 159 | + @cache_with_args(2, 3) |
| 160 | + def get_or_create_contract(self, creator: User, name: str, project: Project, hours: float) -> Contract: |
| 161 | + contract, created = Contract.objects.get_or_create( |
| 162 | + name=name, |
| 163 | + project=project, |
| 164 | + total_estimated_hours=hours, |
| 165 | + defaults=self.get_user_resource_kwargs(creator), |
| 166 | + ) |
| 167 | + if created: |
| 168 | + self.stdout.write(self.style.SUCCESS(f"Created Contract {name=} {project=}")) |
| 169 | + return contract |
| 170 | + |
| 171 | + @cache_with_args(2, 3) |
| 172 | + def get_or_create_task(self, creator: User, name: str, contract: Contract, hours: float) -> Task: |
| 173 | + task, created = Task.objects.get_or_create( |
| 174 | + name=name, |
| 175 | + contract=contract, |
| 176 | + estimated_hours=hours, |
| 177 | + defaults=self.get_user_resource_kwargs(creator), |
| 178 | + ) |
| 179 | + if created: |
| 180 | + self.stdout.write(self.style.SUCCESS(f"Created Task {name=} {contract=}")) |
| 181 | + return task |
| 182 | + |
| 183 | + def run(self, import_data: ImportData): |
| 184 | + self.stdout.write("---- Generating tasks") |
| 185 | + pm_not_admin_users = set() |
| 186 | + |
| 187 | + for project_data in import_data.projects: |
| 188 | + pm_user = self.get_user(project_data.project_manager_email) |
| 189 | + if pm_user is None: |
| 190 | + pm_user = self.create_user(project_data.project_manager_email, is_admin=True) |
| 191 | + |
| 192 | + if not pm_user.is_superuser: |
| 193 | + pm_not_admin_users.add(pm_user) |
| 194 | + |
| 195 | + project = self.get_or_create_project( |
| 196 | + pm_user, |
| 197 | + project_data.name, |
| 198 | + self.get_or_create_client(pm_user, project_data.client), |
| 199 | + self.get_or_create_contractor(pm_user, project_data.contractor), |
| 200 | + ) |
| 201 | + for contract_data in project_data.contracts: |
| 202 | + contract = self.get_or_create_contract( |
| 203 | + pm_user, |
| 204 | + contract_data.name, |
| 205 | + project, |
| 206 | + contract_data.hours, |
| 207 | + ) |
| 208 | + for task_data in contract_data.tasks: |
| 209 | + self.get_or_create_task( |
| 210 | + pm_user, |
| 211 | + task_data.name, |
| 212 | + contract, |
| 213 | + contract_data.hours, |
| 214 | + ) |
| 215 | + |
| 216 | + self.stdout.write("---- Generating Dev users") |
| 217 | + for email in import_data.user_emails: |
| 218 | + self.create_user(email, is_admin=False) |
| 219 | + |
| 220 | + self.stdout.write("---- Grant missing admin access to new PM") |
| 221 | + for user in pm_not_admin_users: |
| 222 | + print(f"- {user.email}") |
| 223 | + user.is_superuser = True |
| 224 | + user.save(update_fields=("is_superuser",)) |
0 commit comments