diff --git a/kirovy/constants/__init__.py b/kirovy/constants/__init__.py index 2162621..7e79504 100644 --- a/kirovy/constants/__init__.py +++ b/kirovy/constants/__init__.py @@ -90,12 +90,18 @@ def is_messiah(cls, user_group: str) -> bool: class MigrationUser: - ID = -1 + CNCNET_ID = -1 USERNAME = "MobileConstructionVehicle_Migrator" GROUP = CncnetUserGroup.USER -class GameSlugs(enum.StrEnum): +class LegacyUploadUser: + CNCNET_ID = -2 + USERNAME = "Spy_ShapeShifting_LegacyUploader" + GROUP = CncnetUserGroup.USER + + +class GameSlugs(str, enum.Enum): """The slugs for each game / total conversion mod. These **must** be unique. They are in constants because we need them to determine which parser to use diff --git a/kirovy/constants/api_codes.py b/kirovy/constants/api_codes.py index 567a181..f94f19f 100644 --- a/kirovy/constants/api_codes.py +++ b/kirovy/constants/api_codes.py @@ -3,8 +3,19 @@ class UploadApiCodes(enum.StrEnum): GAME_SLUG_DOES_NOT_EXIST = "game-slug-does-not-exist" + GAME_DOES_NOT_EXIST = "game-does-not-exist" MISSING_GAME_SLUG = "missing-game-slug" FILE_TO_LARGE = "file-too-large" EMPTY_UPLOAD = "where-file" DUPLICATE_MAP = "duplicate-map" FILE_EXTENSION_NOT_SUPPORTED = "file-extension-not-supported" + + +class LegacyUploadApiCodes(enum.StrEnum): + NOT_A_VALID_ZIP_FILE = "invalid-zipfile" + BAD_ZIP_STRUCTURE = "invalid-zip-structure" + MAP_TOO_LARGE = "map-file-too-large" + NO_VALID_MAP_FILE = "no-valid-map-file" + HASH_MISMATCH = "file-hash-does-not-match-zip-name" + INVALID_FILE_TYPE = "invalid-file-type-in-zip" + GAME_NOT_SUPPORTED = "game-not-supported" diff --git a/kirovy/models/cnc_user.py b/kirovy/models/cnc_user.py index 4c22fb1..dfcb4cc 100644 --- a/kirovy/models/cnc_user.py +++ b/kirovy/models/cnc_user.py @@ -15,7 +15,8 @@ class CncUserManager(models.Manager): use_in_migrations = True _SYSTEM_CNCNET_IDS = { - constants.MigrationUser.ID, + constants.MigrationUser.CNCNET_ID, + constants.LegacyUploadUser.CNCNET_ID, } def find_by_cncnet_id(self, cncnet_id: int) -> t.Tuple["CncUser"]: @@ -27,10 +28,10 @@ def get_or_create_migration_user(self) -> "CncUser": :return: The user for running migrations. """ - mcv = self.find_by_cncnet_id(constants.MigrationUser.ID) + mcv = self.find_by_cncnet_id(constants.MigrationUser.CNCNET_ID) if not mcv: mcv = CncUser( - cncnet_id=constants.MigrationUser.ID, + cncnet_id=constants.MigrationUser.CNCNET_ID, username=constants.MigrationUser.USERNAME, group=constants.MigrationUser.GROUP, ) @@ -39,6 +40,30 @@ def get_or_create_migration_user(self) -> "CncUser": return mcv + def get_or_create_legacy_upload_user(self) -> "CncUser": + """Gets or creates a system-user to represent anonymous uploads from a CnCNet client. + + .. warning:: + + This should **only** be used for the legacy upload URLs for clients + that CnCNet doesn't have the source for. + + :return: + User for legacy uploads. + """ + # If we copy and paste this again then it should be DRY'd up. + spy = self.find_by_cncnet_id(constants.LegacyUploadUser.CNCNET_ID) + if not spy: + spy = CncUser( + cncnet_id=constants.LegacyUploadUser.CNCNET_ID, + username=constants.LegacyUploadUser.USERNAME, + group=constants.LegacyUploadUser.GROUP, + ) + spy.save() + spy.refresh_from_db() + + return spy + def get_queryset(self) -> models.QuerySet: """Makes ``CncUser.object.all()`` filter out the system users by default. @@ -61,9 +86,7 @@ class CncUser(AbstractBaseUser): help_text=_("The user ID from the CNCNet ladder API."), ) - username = models.CharField( - null=True, help_text=_("The name from the CNCNet ladder API."), blank=False - ) + username = models.CharField(null=True, help_text=_("The name from the CNCNet ladder API."), blank=False) """:attr: The username for debugging purposes. Don't rely on this field for much else.""" verified_map_uploader = models.BooleanField(null=False, default=False) @@ -77,23 +100,15 @@ class CncUser(AbstractBaseUser): blank=False, ) - is_banned = models.BooleanField( - default=False, help_text="If true, user was banned for some reason." - ) - ban_reason = models.CharField( - default=None, null=True, help_text="If banned, the reason the user was banned." - ) - ban_date = models.DateTimeField( - default=None, null=True, help_text="If banned, when the user was banned." - ) + is_banned = models.BooleanField(default=False, help_text="If true, user was banned for some reason.") + ban_reason = models.CharField(default=None, null=True, help_text="If banned, the reason the user was banned.") + ban_date = models.DateTimeField(default=None, null=True, help_text="If banned, when the user was banned.") ban_expires = models.DateTimeField( default=None, null=True, help_text="If banned, when the ban expires, if temporary.", ) - ban_count = models.IntegerField( - default=0, help_text="How many times this user has been banned." - ) + ban_count = models.IntegerField(default=0, help_text="How many times this user has been banned.") USERNAME_FIELD = "cncnet_id" """:attr: @@ -113,9 +128,7 @@ def can_upload(self) -> bool: :return: True if user can upload maps / mixes / big, or edit their existing uploads. """ - self.refresh_from_db( - fields=["verified_map_uploader", "verified_email", "is_banned"] - ) + self.refresh_from_db(fields=["verified_map_uploader", "verified_email", "is_banned"]) can_upload = self.verified_map_uploader or self.verified_email or self.is_staff return can_upload and not self.is_banned @@ -142,9 +155,7 @@ def create_or_update_from_cncnet(user_dto: CncnetUserInfo) -> "CncUser": :return: The user object in Kirovy's database, updated with the data from CnCNet. """ - kirovy_user: t.Optional[CncUser] = CncUser.objects.filter( - cncnet_id=user_dto.id - ).first() + kirovy_user: t.Optional[CncUser] = CncUser.objects.filter(cncnet_id=user_dto.id).first() if not kirovy_user: kirovy_user = CncUser.objects.create( cncnet_id=user_dto.id, diff --git a/kirovy/models/file_base.py b/kirovy/models/file_base.py index 2217530..2ab2b7b 100644 --- a/kirovy/models/file_base.py +++ b/kirovy/models/file_base.py @@ -89,9 +89,12 @@ def validate_file_extension(self, file_extension: game_models.CncFileExtension) def save(self, *args, **kwargs): self.validate_file_extension(self.file_extension) - self.hash_md5 = file_utils.hash_file_md5(self.file) - self.hash_sha512 = file_utils.hash_file_sha512(self.file) - self.hash_sha1 = file_utils.hash_file_sha1(self.file) + if not self.hash_md5: + self.hash_md5 = file_utils.hash_file_md5(self.file) + if not self.hash_sha512: + self.hash_sha512 = file_utils.hash_file_sha512(self.file) + if not self.hash_sha1: + self.hash_sha1 = file_utils.hash_file_sha1(self.file) super().save(*args, **kwargs) @staticmethod diff --git a/kirovy/services/legacy_upload/__init__.py b/kirovy/services/legacy_upload/__init__.py new file mode 100644 index 0000000..5fa2631 --- /dev/null +++ b/kirovy/services/legacy_upload/__init__.py @@ -0,0 +1,23 @@ +from kirovy.constants import GameSlugs +from kirovy.constants.api_codes import LegacyUploadApiCodes +from kirovy.exceptions import view_exceptions +from kirovy.services.legacy_upload import westwood, dune_2000 +from kirovy.services.legacy_upload.base import LegacyMapServiceBase +from kirovy import typing as t + + +_GAME_LEGACY_SERVICE_MAP: t.Dict[str, t.Type[LegacyMapServiceBase]] = { + GameSlugs.yuris_revenge.value: westwood.YurisRevengeLegacyMapService, + GameSlugs.dune_2000.value: dune_2000.Dune2000LegacyMapService, +} + + +def get_legacy_service_for_slug(game_slug: str) -> t.Type[LegacyMapServiceBase]: + if service := _GAME_LEGACY_SERVICE_MAP.get(game_slug): + return service + + raise view_exceptions.KirovyValidationError( + "Game not supported on legacy endpoint", + code=LegacyUploadApiCodes.GAME_NOT_SUPPORTED, + additional={"supported": _GAME_LEGACY_SERVICE_MAP.keys()}, + ) diff --git a/kirovy/services/legacy_upload/base.py b/kirovy/services/legacy_upload/base.py new file mode 100644 index 0000000..7af4a86 --- /dev/null +++ b/kirovy/services/legacy_upload/base.py @@ -0,0 +1,219 @@ +import dataclasses +import functools +import io +import pathlib +import zipfile + +from cryptography.utils import cached_property +from django.core.files.base import ContentFile +from django.core.files.uploadedfile import UploadedFile + +from kirovy import typing as t, constants +from kirovy.constants.api_codes import LegacyUploadApiCodes +from kirovy.exceptions import view_exceptions +from kirovy.services.cnc_gen_2_services import CncGen2MapParser +from kirovy.utils import file_utils +from kirovy.utils.file_utils import ByteSized + + +@dataclasses.dataclass +class ExpectedFile: + possible_extensions: t.Set[str] + file_validator: t.Callable[[str, ContentFile, zipfile.ZipInfo], bool] + required: bool = True + """attr: If false, this file is not required to be present.""" + + +class LegacyMapServiceBase: + game_slug: t.ClassVar[constants.GameSlugs] + _file: zipfile.ZipFile + ini_extensions: t.ClassVar[t.Set[str]] + + def __init__(self, file: UploadedFile): + """Initializes the class and runs the validation for the expected files. + + :param file: + The raw uploaded file from the view. + :raises view_exceptions.KirovyValidationError: + Raised if the file is invalid in any way. + """ + if not file_utils.is_zipfile(file): + raise view_exceptions.KirovyValidationError( + detail="Your zipfile is invalid", code=LegacyUploadApiCodes.NOT_A_VALID_ZIP_FILE + ) + + self._file = zipfile.ZipFile(file) + if len(self.expected_files) == 1: + self.single_file_validator() + else: + self.multi_file_validator() + + @cached_property + def expected_files(self) -> t.List[ExpectedFile]: + raise NotImplementedError("This Game's map validator hasn't implemented the expectd file structure.") + + def multi_file_validator(self): + file_list = self._file.infolist() + min_files = len([x for x in self.expected_files if x.required]) + max_files = len(self.expected_files) + if min_files > len(file_list) > max_files: + raise view_exceptions.KirovyValidationError( + "Incorrect file count", code=LegacyUploadApiCodes.BAD_ZIP_STRUCTURE + ) + + for file_info in file_list: + expected_file = self._get_expected_file_for_extension(file_info) + expected_file.file_validator(self._file.filename, ContentFile(self._file.read(file_info)), file_info) + + def single_file_validator(self): + first_file = self._file.infolist()[0] + if pathlib.Path(first_file.filename).suffix not in self.expected_files[0].possible_extensions: + raise view_exceptions.KirovyValidationError( + "Map file was not the first Zip entry.", code=LegacyUploadApiCodes.BAD_ZIP_STRUCTURE + ) + self.expected_files[0].file_validator(self._file.filename, ContentFile(self._file.read(first_file)), first_file) + + @cached_property + def map_sha1_from_filename(self) -> str: + """Legacy upload endpoints need the hash for the map file, not the full zip hash. + + This should only be used after ``__init__`` has completed and the hash from the filename has been verified + to match the internal map file's hash. + """ + return pathlib.Path(self._file.filename).stem + + @cached_property + def file_contents_merged(self) -> io.BytesIO: + """The legacy map DB checks hash by extracting all zip-file contents, appending them, then hashing. + + We will mirror that behavior for legacy endpoints so that the file hashes in the database match + what the legacy clients expect. + + This will not match the logic for the new UI upload endpoints at all, but that's fine. + + .. note:: + + The order of ``self.expected_files`` will control the order that files + are appended in. This order matter and needs to match the client, or the hahes will not match. + + :return: + All file contents merged into one stream. Order depends on ``self.expected_files`` + """ + output = io.BytesIO() + for expected_file in self.expected_files: + file_info = self._find_file_info_by_extension(expected_file.possible_extensions) + output.write(self._file.read(file_info)) + output.seek(0) + return output + + @cached_property + def map_name(self) -> str: + ini_file_info = self._find_file_info_by_extension(self.ini_extensions) + fallback = f"legacy_client_upload_{self.map_sha1_from_filename}" + ini_file = ContentFile(self._file.read(ini_file_info)) + return CncGen2MapParser(ini_file).ini.get("Basic", "Name", fallback=fallback) + + def _find_file_info_by_extension(self, extensions: t.Set[str]) -> zipfile.ZipInfo: + """Find the zipinfo object for a file by a set of possible file extensions. + + This is meant to be used to find specific files in the zip. + e.g. finding the ``.ini`` file to extract the map name from. + + It's hacky, but filenames and file order within the zip aren't predictable. + + :param extensions: + A set of possible extensions to look for. + e.g. ``{".map", ".yro", ".yrm"}`` to find the map ini for Yuri's Revenge. + :return: + The zipinfo for the matching file in the archive. + :raises view_exceptions.KirovyValidationError: + Raised when no file matching the possible extensions was found in the zip archive. + """ + for file in self._file.infolist(): + if pathlib.Path(file.filename).suffix in extensions: + return file + raise view_exceptions.KirovyValidationError( + "No file matching the expected extensions was found", + LegacyUploadApiCodes.NO_VALID_MAP_FILE, + {"expected": extensions}, + ) + + def _get_expected_file_for_extension(self, zip_info: zipfile.ZipInfo) -> ExpectedFile: + """Get the ``expected_file`` class instance corresponding to the file in the zipfile. + + This is used to find the validator for a file in the uploaded zip file. + This is hacky, but the order and filenames aren't predictable in uploaded zips, + so it will have to do. + + :param zip_info: + The info for a file in the uploaded zipfile. We will try to find the + :class:`kirovy.services.legacy_upload.base.ExpectedFile` for it. + :return: + The :class:`kirovy.services.legacy_upload.base.ExpectedFile` for this uploaded file. + :raises view_exceptions.KirovyValidationError: + Raised if no corresponding `kirovy.services.legacy_upload.base.ExpectedFile` is found. + This has a security benefit of not allowing non-c&c-map-files to sneak their way in with a legitimate map. + """ + extension = pathlib.Path(zip_info.filename).suffix + for expected in self.expected_files: + if extension in expected.possible_extensions: + return expected + raise view_exceptions.KirovyValidationError( + "Unexpected file type in zip file", LegacyUploadApiCodes.INVALID_FILE_TYPE + ) + + def processed_zip_file(self) -> ContentFile: + """Returns a file to save to the database. + + This file has been processed to match the format that legacy CnCNet clients expect. + + :return: + A django-compatible file for a legacy CnCNet-client-compatible zip file. + """ + files_info = self._file.infolist() + zip_bytes = io.BytesIO() + processed_zip = zipfile.ZipFile(zip_bytes, mode="w", compresslevel=5, allowZip64=False) + map_hash = pathlib.Path(self._file.filename).stem + + for file_info_original in files_info: + file_name_original = pathlib.Path(file_info_original.filename) + with self._file.open(file_info_original, mode="r") as original_file_data: + # All files must have the map hash as the filename. + processed_zip.writestr( + f"{map_hash}{file_name_original.suffix}", data=original_file_data.read(), compresslevel=4 + ) + + processed_zip.close() + zip_bytes.seek(0) + return ContentFile(zip_bytes.read(), name=self._file.filename) + + +def default_map_file_validator(zip_file_name: str, file_content: ContentFile, zip_info: zipfile.ZipInfo): + """Legacy map file validator that works for most Westwood games. + + Won't work for Dune 2000, or Tiberian Dawn. + + :param zip_file_name: + The name of the uploaded zip file. + :param file_content: + The contents of the map file extracted from the zip file. + For RA2 and YR this will be a ``.map`` file, or one of its aliases like ``.yrm``. + :param zip_info: + The zip metadata for the extracted map file. + :return: + None + :raises view_exceptions.KirovyValidationError: + Raised for any validation errors. + """ + if ByteSized(zip_info.file_size) > ByteSized(mega=2): + raise view_exceptions.KirovyValidationError( + "Map file larger than expected.", code=LegacyUploadApiCodes.MAP_TOO_LARGE + ) + + # Reaching into the new map parsers is kind of dirty, but this legacy endpoint + # is (hopefully) just to tide us over until we can migrate everyone to the new endpoints. + if not CncGen2MapParser.is_text(file_content): + raise view_exceptions.KirovyValidationError("No valid map file.", code=LegacyUploadApiCodes.NO_VALID_MAP_FILE) + + if file_utils.hash_file_sha1(file_content) != pathlib.Path(zip_file_name).stem: + raise view_exceptions.KirovyValidationError("Map file checksum differs from Zip name, rejected.") diff --git a/kirovy/services/legacy_upload/dune_2000.py b/kirovy/services/legacy_upload/dune_2000.py new file mode 100644 index 0000000..a71014c --- /dev/null +++ b/kirovy/services/legacy_upload/dune_2000.py @@ -0,0 +1,6 @@ +from kirovy import constants +from kirovy.services.legacy_upload.base import LegacyMapServiceBase + + +class Dune2000LegacyMapService(LegacyMapServiceBase): + game_slug = constants.GameSlugs.dune_2000 diff --git a/kirovy/services/legacy_upload/westwood.py b/kirovy/services/legacy_upload/westwood.py new file mode 100644 index 0000000..99e403f --- /dev/null +++ b/kirovy/services/legacy_upload/westwood.py @@ -0,0 +1,17 @@ +from functools import cached_property + +from kirovy import constants, typing as t +from kirovy.services.legacy_upload.base import LegacyMapServiceBase, ExpectedFile, default_map_file_validator + + +class YurisRevengeLegacyMapService(LegacyMapServiceBase): + ini_extensions = {".map", ".yro", ".yrm"} + game_slug = constants.GameSlugs.yuris_revenge + + @cached_property + def expected_files(self) -> t.List[ExpectedFile]: + return [ + ExpectedFile( + possible_extensions=self.ini_extensions, file_validator=default_map_file_validator, required=True + ) + ] diff --git a/kirovy/utils/file_utils.py b/kirovy/utils/file_utils.py index 884259c..434646e 100644 --- a/kirovy/utils/file_utils.py +++ b/kirovy/utils/file_utils.py @@ -1,6 +1,8 @@ import collections import functools import hashlib +import pathlib +import zipfile from django.core.files import File @@ -131,3 +133,21 @@ def __le__(self, other: "ByteSized") -> bool: def __eq__(self, other: "ByteSized") -> bool: return self.total_bytes == other.total_bytes + + +def is_zipfile(file_or_path: File | pathlib.Path) -> bool: + """Checks if a file is a zip file. + + :param file_or_path: + The path to a file, or the file itself, to check. + :returns: + ``True`` if the file is a zip file. + """ + try: + with zipfile.ZipFile(file_or_path, "r") as zf: + # zf.getinfo("") # check if zipfile is valid. + return True + except zipfile.BadZipfile: + return False + except Exception as e: + raise e diff --git a/kirovy/views/map_upload_views.py b/kirovy/views/map_upload_views.py index b263493..fc2581b 100644 --- a/kirovy/views/map_upload_views.py +++ b/kirovy/views/map_upload_views.py @@ -5,6 +5,7 @@ from cryptography.utils import cached_property from django.conf import settings from django.core.exceptions import ObjectDoesNotExist +from django.core.files.base import ContentFile, File from django.core.files.uploadedfile import UploadedFile, InMemoryUploadedFile from django.db.models import Q, QuerySet from rest_framework import status, serializers @@ -15,11 +16,12 @@ from kirovy import typing as t, permissions, exceptions, constants, logging from kirovy.constants.api_codes import UploadApiCodes from kirovy.exceptions.view_exceptions import KirovyValidationError -from kirovy.models import cnc_map, CncGame, CncFileExtension, MapCategory, map_preview +from kirovy.models import cnc_map, CncGame, CncFileExtension, MapCategory, map_preview, CncUser from kirovy.objects.ui_objects import ResultResponseData from kirovy.request import KirovyRequest from kirovy.response import KirovyResponse from kirovy.serializers import cnc_map_serializers +from kirovy.services import legacy_upload from kirovy.services.cnc_gen_2_services import CncGen2MapParser, CncGen2MapSections from kirovy.utils import file_utils @@ -53,7 +55,9 @@ def post(self, request: KirovyRequest, format=None) -> KirovyResponse: # todo: make validation less trash uploaded_file: UploadedFile = request.data["file"] - game_id = self.get_game_id_from_request(request) + game = self.get_game_from_request(request) + if not game: + raise KirovyValidationError(detail="Game doesnot exist", code=UploadApiCodes.GAME_DOES_NOT_EXIST) extension_id = self.get_extension_id_for_upload(uploaded_file) self.verify_file_size_is_allowed(uploaded_file) @@ -65,7 +69,7 @@ def post(self, request: KirovyRequest, format=None) -> KirovyResponse: # Make the map that we will attach the map file too. new_map = cnc_map.CncMap( map_name=map_parser.ini.map_name, - cnc_game_id=game_id, + cnc_game_id=game.id, is_published=False, incomplete_upload=True, cnc_user=request.user, @@ -199,14 +203,14 @@ def user_log_attrs(self) -> t.DictStrAny: } @staticmethod - def _get_file_hashes(uploaded_file: UploadedFile) -> MapHashes: + def _get_file_hashes(uploaded_file: File) -> MapHashes: map_hash_sha512 = file_utils.hash_file_sha512(uploaded_file) map_hash_md5 = file_utils.hash_file_md5(uploaded_file) map_hash_sha1 = file_utils.hash_file_sha1(uploaded_file) # legacy ban list support return MapHashes(md5=map_hash_md5, sha1=map_hash_sha1, sha512=map_hash_sha512) - def get_game_id_from_request(self, request: KirovyRequest) -> str | None: + def get_game_from_request(self, request: KirovyRequest) -> CncGame | None: """Get the game_id from the request. This is a method, rather than a direct lookup, so that the client can use ``game_slug``. @@ -307,15 +311,18 @@ class MapFileUploadView(_BaseMapFileUploadView): permission_classes = [permissions.CanUpload] upload_is_temporary = False - def get_game_id_from_request(self, request: KirovyRequest) -> str | None: - return request.data.get("game_id") + def get_game_from_request(self, request: KirovyRequest) -> CncGame | None: + game_id = request.data.get("game_id") + return CncGame.objects.filter(id=game_id).first() class CncnetClientMapUploadView(_BaseMapFileUploadView): + """DO NOT USE THIS FOR NOW. Use""" + permission_classes = [AllowAny] upload_is_temporary = True - def get_game_id_from_request(self, request: KirovyRequest) -> str | None: + def get_game_from_request(self, request: KirovyRequest) -> CncGame | None: """Get the game ID for a CnCNet client upload. The client currently sends a slug in ``request.data["game"]``. The game table has a unique constraint @@ -347,7 +354,7 @@ def get_game_id_from_request(self, request: KirovyRequest) -> str | None: additional={"attempted_slug": game_slug}, ) - return str(game.id) + return game class CncNetBackwardsCompatibleUploadView(CncnetClientMapUploadView): @@ -362,23 +369,24 @@ class CncNetBackwardsCompatibleUploadView(CncnetClientMapUploadView): def post(self, request: KirovyRequest, format=None) -> KirovyResponse: uploaded_file: UploadedFile = request.data["file"] - game_id = self.get_game_id_from_request(request) + game = self.get_game_from_request(request) extension_id = self.get_extension_id_for_upload(uploaded_file) self.verify_file_size_is_allowed(uploaded_file) - map_hashes = self._get_file_hashes(uploaded_file) - self.verify_file_does_not_exist(map_hashes) + # Will raise validation errors if the upload is invalid + legacy_map_service = legacy_upload.get_legacy_service_for_slug(game.slug.lower())(uploaded_file) - if uploaded_file.name != f"{map_hashes.sha1}.zip": - return KirovyResponse(status=status.HTTP_400_BAD_REQUEST) + # These hashes are for the full zip file and won't match + map_hashes = self._get_file_hashes(ContentFile(legacy_map_service.file_contents_merged.read())) + self.verify_file_does_not_exist(map_hashes) - # Make the map that we will attach the map file too. + # Make the map that we will attach the map file to. new_map = cnc_map.CncMap( - map_name=f"backwards_compatible_{map_hashes.sha1}", - cnc_game_id=game_id, + map_name=legacy_map_service.map_name, + cnc_game=game, is_published=False, incomplete_upload=True, - cnc_user=request.user, + cnc_user=CncUser.objects.get_or_create_legacy_upload_user(), parent=None, ) new_map.save() @@ -388,9 +396,9 @@ def post(self, request: KirovyRequest, format=None) -> KirovyResponse: width=-1, height=-1, cnc_map_id=new_map.id, - file=uploaded_file, + file=legacy_map_service.processed_zip_file(), file_extension_id=extension_id, - cnc_game_id=new_map.cnc_game_id, + cnc_game_id=game.id, hash_md5=map_hashes.md5, hash_sha512=map_hashes.sha512, hash_sha1=map_hashes.sha1, diff --git a/kirovy/zip_storage.py b/kirovy/zip_storage.py index 832f4cd..0f5d378 100644 --- a/kirovy/zip_storage.py +++ b/kirovy/zip_storage.py @@ -10,7 +10,7 @@ class ZipFileStorage(FileSystemStorage): def save(self, name: str, content: File, max_length: int | None = None): - if is_zipfile(content): + if file_utils.is_zipfile(content): return super().save(name, content, max_length) internal_extension = pathlib.Path(name).suffix @@ -21,21 +21,3 @@ def save(self, name: str, content: File, max_length: int | None = None): zf.writestr(internal_filename, content.read()) return super().save(f"{name}.zip", zip_buffer, max_length=max_length) - - -def is_zipfile(file_or_path: File | pathlib.Path) -> bool: - """Checks if a file is a zip file. - - :param file_or_path: - The path to a file, or the file itself, to check. - :returns: - ``True`` if the file is a zip file. - """ - try: - with zipfile.ZipFile(file_or_path, "r") as zf: - # zf.getinfo("") # check if zipfile is valid. - return True - except zipfile.BadZipfile: - return False - except Exception as e: - raise e diff --git a/tests/fixtures/file_fixtures.py b/tests/fixtures/file_fixtures.py index e365145..3a7aee7 100644 --- a/tests/fixtures/file_fixtures.py +++ b/tests/fixtures/file_fixtures.py @@ -7,6 +7,13 @@ import pytest from django.core.files import File +from kirovy.utils import file_utils + + +class ReadOnlyFile(File): + def write(self, s, /): + raise Exception("Don't tamper with the test files.") + @pytest.fixture def test_data_path() -> pathlib.Path: @@ -28,7 +35,7 @@ def _inner(relative_path: t.Union[str, pathlib.Path], read_mode: str = "rb") -> """ full_path = test_data_path / relative_path - return File(open(full_path, read_mode)) + return ReadOnlyFile(open(full_path, read_mode)) return _inner @@ -85,3 +92,24 @@ def file_map_unfair(load_test_file) -> Generator[File, Any, None]: file = load_test_file("totally_fair_map.map") yield file file.close() + + +@pytest.fixture +def file_map_dune2k(load_test_file) -> Generator[File, Any, None]: + """Return a valid zip file for a dune 2k mpa.""" + file = load_test_file("valid_dune_map.zip") + yield file + file.close() + + +@pytest.fixture +def rename_file_for_legacy_upload(): + """Returns a function to rename a file for upload to a legacy endpoint.""" + + def _inner(file: File) -> File: + assert file.name.endswith(".zip"), "Only zip files can be sent to legacy endpoints." + # Uploads to legacy endpoints need the hash name + file.name = file_utils.hash_file_sha1(file) + ".zip" + return file + + return _inner diff --git a/tests/fixtures/game_fixtures.py b/tests/fixtures/game_fixtures.py index 76c11e9..1a4270b 100644 --- a/tests/fixtures/game_fixtures.py +++ b/tests/fixtures/game_fixtures.py @@ -1,5 +1,5 @@ import pytest -from kirovy import typing as t, models as k_models +from kirovy import typing as t, models as k_models, constants @pytest.fixture @@ -11,4 +11,16 @@ def game_yuri(db) -> k_models.CncGame: :return: The game object for Yuri's revenge. """ - return k_models.CncGame.objects.get(slug="yr") + return k_models.CncGame.objects.get(slug=constants.GameSlugs.yuris_revenge) + + +@pytest.fixture +def game_dune2k(db) -> k_models.CncGame: + """Get the Dune 2000 ``CncGame``. + + This fixture depends on the game data migration being run. + + :return: + The game object for Dune 2000. + """ + return k_models.CncGame.objects.get(slug__iexact=constants.GameSlugs.dune_2000) diff --git a/tests/test_data/valid_dune_map.zip b/tests/test_data/valid_dune_map.zip new file mode 100644 index 0000000..2046165 Binary files /dev/null and b/tests/test_data/valid_dune_map.zip differ diff --git a/tests/test_views/test_backwards_compatibility.py b/tests/test_views/test_backwards_compatibility.py index f465639..eef1a60 100644 --- a/tests/test_views/test_backwards_compatibility.py +++ b/tests/test_views/test_backwards_compatibility.py @@ -1,12 +1,16 @@ import hashlib +import pathlib import zipfile import io import pytest +from django.core.files.base import ContentFile from django.http import FileResponse from rest_framework import status from kirovy.models import CncGame +from kirovy.response import KirovyResponse +from kirovy.utils import file_utils @pytest.mark.parametrize("game_slug", ["td", "ra", "ts", "dta", "yr", "d2"]) @@ -28,3 +32,45 @@ def test_map_download_backwards_compatible( map_from_zip = zip_file.read(f"{map_file.hash_sha1}.map") downloaded_map_hash = hashlib.sha1(map_from_zip).hexdigest() assert downloaded_map_hash == map_file.hash_sha1 + + +# def test_map_upload_dune2k_backwards_compatible( +# client_anonymous, rename_file_for_legacy_upload, file_map_dune2k_valid, game_dune2k +# ): +# url = "/upload" +# file = rename_file_for_legacy_upload(file_map_dune2k_valid) +# +# response: KirovyResponse = client_anonymous.post( +# url, {"file": file, "game": game_dune2k.slug}, format="multipart", content_type=None +# ) +# +# assert response.status_code == status.HTTP_200_OK + + +def test_map_upload_yuri_backwards_compatible(client_anonymous, file_map_desert, game_yuri): + url = "/upload" + file_sha1 = file_utils.hash_file_sha1(file_map_desert) + extension = pathlib.Path(file_map_desert.name).suffix + zip_bytes = io.BytesIO() + zip_file = zipfile.ZipFile(zip_bytes, mode="w") + zip_file.writestr(file_map_desert.name, file_map_desert.read()) + zip_file.close() + zip_bytes.seek(0) + upload_file = ContentFile(zip_bytes.read(), f"{file_sha1}.zip") + + upload_response: KirovyResponse = client_anonymous.post( + url, {"file": upload_file, "game": game_yuri.slug}, format="multipart", content_type=None + ) + + assert upload_response.status_code == status.HTTP_200_OK + + response: FileResponse = client_anonymous.get(f"/{game_yuri.slug}/{file_sha1}") + assert response.status_code == status.HTTP_200_OK + + file_content_io = io.BytesIO(response.getvalue()) + zip_file = zipfile.ZipFile(file_content_io) + assert zip_file + + map_from_zip = zip_file.read(f"{file_sha1}.map") + downloaded_map_hash = hashlib.sha1(map_from_zip).hexdigest() + assert downloaded_map_hash == file_sha1