diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..11e9c19 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,25 @@ +variables: + PDM_PYPI_URL: https://gitlab.gitguardian.ovh/api/v4/projects/435/packages/pypi/simple + PDM_PYPI_USERNAME: $PYPI_USERNAME + PDM_PYPI_PASSWORD: $PYPI_PASSWORD + +stages: + - tests + - release + +workflow: + rules: + - if: $CI_PIPELINE_SOURCE == "merge_request_event" + - if: $CI_COMMIT_TAG + - if: $CI_COMMIT_BRANCH == "main" + - when: never # no pipeline on feature branch pushes, only MRs + +publish: + image: 513715405986.dkr.ecr.us-west-2.amazonaws.com/basics/python:3.11-slim-bullseye + stage: release + only: + - tags + script: + - pip install twine build + - python -m build + - twine upload --verbose --repository-url https://gitlab.gitguardian.ovh/api/v4/projects/435/packages/pypi/ --username ${PYPI_USERNAME} --password ${PYPI_PASSWORD} dist/* diff --git a/LICENSE b/LICENSE index f6f65d8..dd3a1df 100644 --- a/LICENSE +++ b/LICENSE @@ -187,6 +187,7 @@ identification within third-party archives. Copyright 2019 3YOURMIND GmbH + Copyright 2024 GitGuardian Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index bfd1344..2d8775d 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ # django-replace-migrations -This package is an extension to djangos `makemigrations.py`. -It can be used to get rid of old migrations as an alternative to djangos `squashmigration` command. +This package offers a new django command: `replace_all_migrations`. +It can be use to get rid of old migrations as an alternative to django's `squashmigration` command. ## Reasoning In big django projects, migration files easily pile up and get an increasing problem. -Django comes with the squashmigration command - however, it is hard to handle because of multiple reasons. -Especially, it can not handle circular dependencies - they must be resolved [manually and with great care](https://stackoverflow.com/questions/37711402/circular-dependency-when-squashing-django-migrations). + +Django comes with the squashmigration command - however, it is hard to handle because of multiple reasons. Especially, it can not handle circular dependencies - they must be resolved [manually and with great care](https://stackoverflow.com/questions/37711402/circular-dependency-when-squashing-django-migrations). One possible solution is to: @@ -17,13 +17,11 @@ One possible solution is to: This workflow might work fine, if you have only few (production) servers - however, it becomes hard, when you have many environments with different versions of your application. -With django-replace-migrations also creates new initial migrations, but also, additionally, adds the already existing migrations to the `replace` list of the new migration -(That list is used by `squashmigrations` as well). By doing that, faking migrations is not needed anymore. +django-replace-migrations also creates new initial migrations, but also, additionally, it adds the already existing migrations to the `replace` list of the new migration (That list is used by `squashmigrations` as well). By doing that, faking migrations is not needed anymore. ## Warning -The new replacing migrations will not consider any `RunPython` or `RunSQL` operations. -That might be acceptable depending on your use of those operations and if you need those to prepare a fresh database. +The new replacing migrations will add not elidable special operations (`RunPython`, `RunSQL` or `SeparateDatabaseAndState`) at the end of the squash files. You will have to manually add them when suitable. ## Installation @@ -35,17 +33,23 @@ Run pip install django-replace-migrations ``` -and add `django_replace_migrations` to your list of installed apps. +and add `gg_django_replace_migrations` to your list of installed apps. ## Simple Workflow -If your apps are not depending on each other, you can use django-replace-migrations like this: +If your apps are not depending on each other, you can use gg-django-replace-migrations like this: ``` -./manage.py makemigratons --replace-all --name replace [app1, app2, ...] +./manage.py replace_all_migrations --name replace [app1, app2, ...] +``` + +Note, that you will need to list all of your apps explicitly - otherwise django will also try to replace migrations from dependencies: + +``` +from django.apps import apps +print(" ".join(map(str, sorted({model._meta.app_label for model in apps.get_models()})))) ``` -Note, that you will need to [list all of your apps](https://stackoverflow.com/questions/4111244/get-a-list-of-all-installed-applications-in-django-and-their-attributes) explicitly - otherwise django will also try to replace migrations from dependencies. While `--name` could be omitted, it is highly recommended to use it so that you can easily recognize the new migrations. If for any of your apps there are not one but two or more migrations created, your apps are depending on each other (see below). @@ -71,7 +75,7 @@ The workflow for this would be: - `git checkout 2.0` - create a new branch `git checkout -b 2-0-replace-migrations` - Install `django-replace-migration` here. -- run `./manage.py makemigrations --replace-all --name replace_2_0 app1, app2, ...` ([How to get all apps](https://stackoverflow.com/questions/4111244/get-a-list-of-all-installed-applications-in-django-and-their-attributes)) +- run `./manage.py replace_all_migrations --name replace_2_0 app1, app2, ...` ([How to get all apps](https://stackoverflow.com/questions/4111244/get-a-list-of-all-installed-applications-in-django-and-their-attributes)) - commit and note the commit hash - `git checkout [your main/feature branch]` - `git cherry-pick [commit-hash from 2-0-delete-migrations]` @@ -87,7 +91,3 @@ If your app is below 2.0 and you want to update to something after 2.0, you firs - upgrading from 1.0 to 1.5 will be possible - upgrading from 2.0 to 3.0 will be possible - upgrading from 1.0 to 3.0 will be **not** possible - -## `makemigration.py` compatibility - -This package requires deep integration into `makemigrations.py` so that I needed to copy the whole `makemigrations.py` here. Currently the version of `makemigrations.py` is copied from Django 2.1, however it is also tested with Django 3.0 and works there as well. If you encounter problems, please write what version of Django you are using. diff --git a/django_replace_migrations/management/commands/makemigrations.py b/django_replace_migrations/management/commands/makemigrations.py deleted file mode 100644 index 4de5c93..0000000 --- a/django_replace_migrations/management/commands/makemigrations.py +++ /dev/null @@ -1,348 +0,0 @@ -import os -import sys -from itertools import takewhile - -from django.apps import apps -from django.conf import settings -from django.core.management.base import ( - BaseCommand, CommandError, no_translations, -) -from django.db import DEFAULT_DB_ALIAS, connections, router -from django.db.migrations import Migration -from django.db.migrations.autodetector import MigrationAutodetector -from django.db.migrations.loader import MigrationLoader -from django.db.migrations.questioner import ( - InteractiveMigrationQuestioner, MigrationQuestioner, - NonInteractiveMigrationQuestioner, -) -from django.db.migrations.state import ProjectState -from django.db.migrations.utils import get_migration_name_timestamp -from django.db.migrations.writer import MigrationWriter - - -class Command(BaseCommand): - help = "Creates new migration(s) for apps." - - def add_arguments(self, parser): - parser.add_argument( - 'args', metavar='app_label', nargs='*', - help='Specify the app label(s) to create migrations for.', - ) - parser.add_argument( - '--dry-run', action='store_true', dest='dry_run', - help="Just show what migrations would be made; don't actually write them.", - ) - parser.add_argument( - '--merge', action='store_true', dest='merge', - help="Enable fixing of migration conflicts.", - ) - parser.add_argument( - '--empty', action='store_true', dest='empty', - help="Create an empty migration.", - ) - parser.add_argument( - '--noinput', '--no-input', action='store_false', dest='interactive', - help='Tells Django to NOT prompt the user for input of any kind.', - ) - parser.add_argument( - '-n', '--name', action='store', dest='name', default=None, - help="Use this name for migration file(s).", - ) - parser.add_argument( - '--check', action='store_true', dest='check_changes', - help='Exit with a non-zero status if model changes are missing migrations.', - ) - parser.add_argument( - '--replace-all', action='store_true', dest='replace_all', - help='Create fresh migrations that replaces existing ones.', - ) - - @no_translations - def handle(self, *app_labels, **options): - self.verbosity = options['verbosity'] - self.interactive = options['interactive'] - self.dry_run = options['dry_run'] - self.merge = options['merge'] - self.empty = options['empty'] - self.migration_name = options['name'] - self.replace_all = options['replace_all'] - check_changes = options['check_changes'] - - # Make sure the app they asked for exists - app_labels = set(app_labels) - bad_app_labels = set() - for app_label in app_labels: - try: - apps.get_app_config(app_label) - except LookupError: - bad_app_labels.add(app_label) - if bad_app_labels: - for app_label in bad_app_labels: - if '.' in app_label: - self.stderr.write( - "'%s' is not a valid app label. Did you mean '%s'?" % ( - app_label, - app_label.split('.')[-1], - ) - ) - else: - self.stderr.write("App '%s' could not be found. Is it in INSTALLED_APPS?" % app_label) - sys.exit(2) - - # Load the current graph state. Pass in None for the connection so - # the loader doesn't try to resolve replaced migrations from DB. - loader = MigrationLoader(None, ignore_no_migrations=True) - - # Raise an error if any migrations are applied before their dependencies. - consistency_check_labels = {config.label for config in apps.get_app_configs()} - # Non-default databases are only checked if database routers used. - aliases_to_check = connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS] - for alias in sorted(aliases_to_check): - connection = connections[alias] - if (connection.settings_dict['ENGINE'] != 'django.db.backends.dummy' and any( - # At least one model must be migrated to the database. - router.allow_migrate(connection.alias, app_label, model_name=model._meta.object_name) - for app_label in consistency_check_labels - for model in apps.get_app_config(app_label).get_models() - )): - loader.check_consistent_history(connection) - - # Before anything else, see if there's conflicting apps and drop out - # hard if there are any and they don't want to merge - conflicts = loader.detect_conflicts() - - # If app_labels is specified, filter out conflicting migrations for unspecified apps - if app_labels: - conflicts = { - app_label: conflict for app_label, conflict in conflicts.items() - if app_label in app_labels - } - - if conflicts and not self.merge: - name_str = "; ".join( - "%s in %s" % (", ".join(names), app) - for app, names in conflicts.items() - ) - raise CommandError( - "Conflicting migrations detected; multiple leaf nodes in the " - "migration graph: (%s).\nTo fix them run " - "'python manage.py makemigrations --merge'" % name_str - ) - - # If they want to merge and there's nothing to merge, then politely exit - if self.merge and not conflicts: - self.stdout.write("No conflicts detected to merge.") - return - - # If they want to merge and there is something to merge, then - # divert into the merge code - if self.merge and conflicts: - return self.handle_merge(loader, conflicts) - - if self.interactive: - questioner = InteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run) - else: - questioner = NonInteractiveMigrationQuestioner(specified_apps=app_labels, dry_run=self.dry_run) - - if self.replace_all: - replace_list = [migration for migration in loader.graph.nodes.values()] - temp_nodes = loader.graph.nodes - - loader.graph.nodes = {k: v for (k, v) in loader.graph.nodes.items() if k[0] not in app_labels} - - autodetector = MigrationAutodetector( - loader.project_state(), - ProjectState.from_apps(apps), - questioner, - ) - - loader.graph.nodes = temp_nodes - - else: - autodetector = MigrationAutodetector( - loader.project_state(), - ProjectState.from_apps(apps), - questioner, - ) - - # If they want to make an empty migration, make one for each app - if self.empty: - if not app_labels: - raise CommandError("You must supply at least one app label when using --empty.") - # Make a fake changes() result we can pass to arrange_for_graph - changes = { - app: [Migration("custom", app)] - for app in app_labels - } - changes = autodetector.arrange_for_graph( - changes=changes, - graph=loader.graph, - migration_name=self.migration_name, - ) - self.write_migration_files(changes) - return - - # Detect changes - changes = autodetector.changes( - graph=loader.graph, - trim_to_apps=app_labels or None, - convert_apps=app_labels or None, - migration_name=self.migration_name, - ) - - if not changes: - # No changes? Tell them. - if self.verbosity >= 1: - if app_labels: - if len(app_labels) == 1: - self.stdout.write("No changes detected in app '%s'" % app_labels.pop()) - else: - self.stdout.write("No changes detected in apps '%s'" % ("', '".join(app_labels))) - else: - self.stdout.write("No changes detected") - else: - if self.replace_all: - for app_label, app_migrations in changes.items(): - for app_migration in app_migrations: - app_migration.replaces = \ - [ - (migration.app_label, migration.name) - for migration in replace_list - if migration.app_label == app_label - ] - app_migration.dependencies = [dependency for dependency in app_migration.dependencies if dependency not in app_migration.replaces] - - self.write_migration_files(changes) - if check_changes: - sys.exit(1) - - def write_migration_files(self, changes): - """ - Take a changes dict and write them out as migration files. - """ - directory_created = {} - for app_label, app_migrations in changes.items(): - if self.verbosity >= 1: - self.stdout.write(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label) + "\n") - for migration in app_migrations: - # Describe the migration - writer = MigrationWriter(migration) - if self.verbosity >= 1: - # Display a relative path if it's below the current working - # directory, or an absolute path otherwise. - try: - migration_string = os.path.relpath(writer.path) - except ValueError: - migration_string = writer.path - if migration_string.startswith('..'): - migration_string = writer.path - self.stdout.write(" %s\n" % (self.style.MIGRATE_LABEL(migration_string),)) - if self.replace_all: - self.stdout.write( - " Replaces '%s'." % migration.replaces - ) - for operation in migration.operations: - self.stdout.write(" - %s\n" % operation.describe()) - if not self.dry_run: - # Write the migrations file to the disk. - migrations_directory = os.path.dirname(writer.path) - if not directory_created.get(app_label): - if not os.path.isdir(migrations_directory): - os.mkdir(migrations_directory) - init_path = os.path.join(migrations_directory, "__init__.py") - if not os.path.isfile(init_path): - open(init_path, "w").close() - # We just do this once per app - directory_created[app_label] = True - migration_string = writer.as_string() - with open(writer.path, "w", encoding='utf-8') as fh: - fh.write(migration_string) - elif self.verbosity == 3: - # Alternatively, makemigrations --dry-run --verbosity 3 - # will output the migrations to stdout rather than saving - # the file to the disk. - self.stdout.write(self.style.MIGRATE_HEADING( - "Full migrations file '%s':" % writer.filename) + "\n" - ) - self.stdout.write("%s\n" % writer.as_string()) - - def handle_merge(self, loader, conflicts): - """ - Handles merging together conflicted migrations interactively, - if it's safe; otherwise, advises on how to fix it. - """ - if self.interactive: - questioner = InteractiveMigrationQuestioner() - else: - questioner = MigrationQuestioner(defaults={'ask_merge': True}) - - for app_label, migration_names in conflicts.items(): - # Grab out the migrations in question, and work out their - # common ancestor. - merge_migrations = [] - for migration_name in migration_names: - migration = loader.get_migration(app_label, migration_name) - migration.ancestry = [ - mig for mig in loader.graph.forwards_plan((app_label, migration_name)) - if mig[0] == migration.app_label - ] - merge_migrations.append(migration) - - def all_items_equal(seq): - return all(item == seq[0] for item in seq[1:]) - - merge_migrations_generations = zip(*(m.ancestry for m in merge_migrations)) - common_ancestor_count = sum(1 for common_ancestor_generation - in takewhile(all_items_equal, merge_migrations_generations)) - if not common_ancestor_count: - raise ValueError("Could not find common ancestor of %s" % migration_names) - # Now work out the operations along each divergent branch - for migration in merge_migrations: - migration.branch = migration.ancestry[common_ancestor_count:] - migrations_ops = (loader.get_migration(node_app, node_name).operations - for node_app, node_name in migration.branch) - migration.merged_operations = sum(migrations_ops, []) - # In future, this could use some of the Optimizer code - # (can_optimize_through) to automatically see if they're - # mergeable. For now, we always just prompt the user. - if self.verbosity > 0: - self.stdout.write(self.style.MIGRATE_HEADING("Merging %s" % app_label)) - for migration in merge_migrations: - self.stdout.write(self.style.MIGRATE_LABEL(" Branch %s" % migration.name)) - for operation in migration.merged_operations: - self.stdout.write(" - %s\n" % operation.describe()) - if questioner.ask_merge(app_label): - # If they still want to merge it, then write out an empty - # file depending on the migrations needing merging. - numbers = [ - MigrationAutodetector.parse_number(migration.name) - for migration in merge_migrations - ] - try: - biggest_number = max(x for x in numbers if x is not None) - except ValueError: - biggest_number = 1 - subclass = type("Migration", (Migration,), { - "dependencies": [(app_label, migration.name) for migration in merge_migrations], - }) - migration_name = "%04i_%s" % ( - biggest_number + 1, - self.migration_name or ("merge_%s" % get_migration_name_timestamp()) - ) - new_migration = subclass(migration_name, app_label) - writer = MigrationWriter(new_migration) - - if not self.dry_run: - # Write the merge migrations file to the disk - with open(writer.path, "w", encoding='utf-8') as fh: - fh.write(writer.as_string()) - if self.verbosity > 0: - self.stdout.write("\nCreated new merge migration %s" % writer.path) - elif self.verbosity == 3: - # Alternatively, makemigrations --merge --dry-run --verbosity 3 - # will output the merge migrations to stdout rather than saving - # the file to the disk. - self.stdout.write(self.style.MIGRATE_HEADING( - "Full merge migrations file '%s':" % writer.filename) + "\n" - ) - self.stdout.write("%s\n" % writer.as_string()) diff --git a/django_replace_migrations/__init__.py b/gg_django_replace_migrations/__init__.py similarity index 100% rename from django_replace_migrations/__init__.py rename to gg_django_replace_migrations/__init__.py diff --git a/django_replace_migrations/management/__init__.py b/gg_django_replace_migrations/management/__init__.py similarity index 100% rename from django_replace_migrations/management/__init__.py rename to gg_django_replace_migrations/management/__init__.py diff --git a/django_replace_migrations/management/commands/__init__.py b/gg_django_replace_migrations/management/commands/__init__.py similarity index 100% rename from django_replace_migrations/management/commands/__init__.py rename to gg_django_replace_migrations/management/commands/__init__.py diff --git a/gg_django_replace_migrations/management/commands/replace_all_migrations.py b/gg_django_replace_migrations/management/commands/replace_all_migrations.py new file mode 100644 index 0000000..d27237b --- /dev/null +++ b/gg_django_replace_migrations/management/commands/replace_all_migrations.py @@ -0,0 +1,307 @@ +import os +import sys +import warnings + +from django.apps import apps +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError, no_translations +from django.core.management.utils import run_formatters +from django.db import DEFAULT_DB_ALIAS, OperationalError, connections, router +from django.db.migrations.loader import MigrationLoader +from django.db.migrations.operations.special import ( + RunPython, + RunSQL, + SeparateDatabaseAndState, +) +from django.db.migrations.questioner import ( + InteractiveMigrationQuestioner, + NonInteractiveMigrationQuestioner, +) +from django.db.migrations.state import ProjectState + +from .replace_migration_autodetector import ReplaceMigrationAutodetector +from .replace_migration_loader import ReplaceMigrationLoader +from .replace_migration_writer import ReplaceMigrationWriter + + +class Command(BaseCommand): + help = "Replace all migration(s) for apps." + + def add_arguments(self, parser): + parser.add_argument( + "args", + metavar="app_label", + nargs="*", + help="Specify the app label(s) to create migrations for.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Just show what migrations would be made; don't actually write them.", + ) + parser.add_argument( + "--noinput", + "--no-input", + action="store_false", + dest="interactive", + help="Tells Django to NOT prompt the user for input of any kind.", + ) + parser.add_argument( + "-n", + "--name", + help="Use this name for migration file(s).", + ) + parser.add_argument( + "--no-header", + action="store_false", + dest="include_header", + help="Do not add header comments to new migration file(s).", + ) + parser.add_argument( + "--scriptable", + action="store_true", + dest="scriptable", + help=( + "Divert log output and input prompts to stderr, writing only " + "paths of generated migration files to stdout." + ), + ) + + @property + def log_output(self): + return self.stderr if self.scriptable else self.stdout + + def log(self, msg): + self.log_output.write(msg) + + @no_translations + def handle(self, *app_labels, **options): + self.written_files = [] + self.verbosity = options["verbosity"] + self.interactive = options["interactive"] + self.dry_run = options["dry_run"] + self.migration_name = options["name"] + if self.migration_name and not self.migration_name.isidentifier(): + raise CommandError("The migration name must be a valid Python identifier.") + self.include_header = options["include_header"] + self.scriptable = options["scriptable"] + # If logs and prompts are diverted to stderr, remove the ERROR style. + if self.scriptable: + self.stderr.style_func = None + + # Make sure the app they asked for exists + app_labels = set(app_labels) + has_bad_labels = False + for app_label in app_labels: + try: + apps.get_app_config(app_label) + except LookupError as err: + self.stderr.write(str(err)) + has_bad_labels = True + if has_bad_labels: + sys.exit(2) + + loader_from_disk = MigrationLoader(None, ignore_no_migrations=True) + loader_from_disk.load_disk() + + # Load the current graph state. Pass in None for the connection so + # the loader doesn't try to resolve replaced migrations from DB. + loader = ReplaceMigrationLoader(None, ignore_no_migrations=True) + + # Raise an error if any migrations are applied before their dependencies. + consistency_check_labels = {config.label for config in apps.get_app_configs()} + # Non-default databases are only checked if database routers used. + aliases_to_check = ( + connections if settings.DATABASE_ROUTERS else [DEFAULT_DB_ALIAS] + ) + for alias in sorted(aliases_to_check): + connection = connections[alias] + if connection.settings_dict["ENGINE"] != "django.db.backends.dummy" and any( + # At least one model must be migrated to the database. + router.allow_migrate( + connection.alias, app_label, model_name=model._meta.object_name + ) + for app_label in consistency_check_labels + for model in apps.get_app_config(app_label).get_models() + ): + try: + loader.check_consistent_history(connection) + except OperationalError as error: + warnings.warn( + "Got an error checking a consistent migration history " + "performed for database connection '%s': %s" % (alias, error), + RuntimeWarning, + ) + # Before anything else, see if there's conflicting apps and drop out + # hard if there are any and they don't want to merge + conflicts = loader.detect_conflicts() + + # If app_labels is specified, filter out conflicting migrations for + # unspecified apps. + if app_labels: + conflicts = { + app_label: conflict + for app_label, conflict in conflicts.items() + if app_label in app_labels + } + + if conflicts and not self.merge: + name_str = "; ".join( + "%s in %s" % (", ".join(names), app) for app, names in conflicts.items() + ) + raise CommandError( + "Conflicting migrations detected; multiple leaf nodes in the " + "migration graph: (%s).\nTo fix them run " + "'python manage.py makemigrations --merge'" % name_str + ) + + if self.interactive: + questioner = InteractiveMigrationQuestioner( + specified_apps=app_labels, + dry_run=self.dry_run, + prompt_output=self.log_output, + ) + else: + questioner = NonInteractiveMigrationQuestioner( + specified_apps=app_labels, + dry_run=self.dry_run, + verbosity=self.verbosity, + log=self.log, + ) + # Set up autodetector + replace_list = [migration for migration in loader.graph.nodes.values()] + temp_nodes = loader.graph.nodes + + loader.graph.nodes = { + k: v for (k, v) in loader.graph.nodes.items() if k[0] not in app_labels + } + + autodetector = ReplaceMigrationAutodetector( + from_state=loader.project_state(), + to_state=ProjectState.from_apps(apps), + questioner=questioner, + ) + + loader.graph.nodes = temp_nodes + + # Detect changes + changes = autodetector.changes( + graph=loader.graph, + trim_to_apps=app_labels or None, + convert_apps=app_labels or None, + migration_name=self.migration_name, + ) + + if not changes: + # No changes? Tell them. + if self.verbosity >= 1: + if app_labels: + if len(app_labels) == 1: + self.log("No changes detected in app '%s'" % app_labels.pop()) + else: + self.log( + "No changes detected in apps '%s'" + % ("', '".join(app_labels)) + ) + else: + self.log("No changes detected") + else: + for app_label, app_migrations in changes.items(): + for app_migration in app_migrations: + app_migration.replaces = [ + (migration.app_label, migration.name) + for migration in replace_list + if migration.app_label == app_label + ] + app_migration.dependencies = [ + dependency + for dependency in app_migration.dependencies + if dependency not in app_migration.replaces + ] + for app_label, name in app_migration.replaces: + app_migration.operations += [ + operation + for operation in loader_from_disk.get_migration( + app_label, name + ).operations + if ( + isinstance(operation, RunPython) + or isinstance(operation, RunSQL) + or isinstance(operation, SeparateDatabaseAndState) + ) + ] + + self.write_migration_files(changes) + + def write_migration_files(self, changes, update_previous_migration_paths=None): + """ + Take a changes dict and write them out as migration files. + """ + directory_created = {} + for app_label, app_migrations in changes.items(): + if self.verbosity >= 1: + self.log(self.style.MIGRATE_HEADING("Migrations for '%s':" % app_label)) + for migration in app_migrations: + # Describe the migration + writer = ReplaceMigrationWriter(migration, self.include_header) + if self.verbosity >= 1: + # Display a relative path if it's below the current working + # directory, or an absolute path otherwise. + migration_string = self.get_relative_path(writer.path) + self.log(" %s\n" % self.style.MIGRATE_LABEL(migration_string)) + self.stdout.write(" Replaces '%s'." % migration.replaces) + for operation in migration.operations: + self.log(" - %s" % operation.describe()) + if self.scriptable: + self.stdout.write(migration_string) + if not self.dry_run: + # Write the migrations file to the disk. + migrations_directory = os.path.dirname(writer.path) + if not directory_created.get(app_label): + os.makedirs(migrations_directory, exist_ok=True) + init_path = os.path.join(migrations_directory, "__init__.py") + if not os.path.isfile(init_path): + open(init_path, "w").close() + # We just do this once per app + directory_created[app_label] = True + migration_string = writer.as_string() + with open(writer.path, "w", encoding="utf-8") as fh: + fh.write(migration_string) + self.written_files.append(writer.path) + if update_previous_migration_paths: + prev_path = update_previous_migration_paths[app_label] + rel_prev_path = self.get_relative_path(prev_path) + if writer.needs_manual_porting: + migration_path = self.get_relative_path(writer.path) + self.log( + self.style.WARNING( + f"Updated migration {migration_path} requires " + f"manual porting.\n" + f"Previous migration {rel_prev_path} was kept and " + f"must be deleted after porting functions manually." + ) + ) + else: + os.remove(prev_path) + self.log(f"Deleted {rel_prev_path}") + elif self.verbosity == 3: + # Alternatively, replaceallmigrations --dry-run --verbosity 3 + # will log the migrations rather than saving the file to + # the disk. + self.log( + self.style.MIGRATE_HEADING( + "Full migrations file '%s':" % writer.filename + ) + ) + self.log(writer.as_string()) + run_formatters(self.written_files) + + @staticmethod + def get_relative_path(path): + try: + migration_string = os.path.relpath(path) + except ValueError: + migration_string = path + if migration_string.startswith(".."): + migration_string = path + return migration_string diff --git a/gg_django_replace_migrations/management/commands/replace_migration_autodetector.py b/gg_django_replace_migrations/management/commands/replace_migration_autodetector.py new file mode 100644 index 0000000..d238226 --- /dev/null +++ b/gg_django_replace_migrations/management/commands/replace_migration_autodetector.py @@ -0,0 +1,16 @@ +import re + +from django.db.migrations.autodetector import MigrationAutodetector + + +class ReplaceMigrationAutodetector(MigrationAutodetector): + @classmethod + def parse_number(cls, name): + """ + Given a migration name, try to extract a number from the beginning of + it. For a squashed migration such as '0001_squashed_0004…', return the + second number. If no number is found, return None. + """ + if squashed_match := re.search(r"(\d+)_squashed_.*", name): + return int(squashed_match[1]) + return None diff --git a/gg_django_replace_migrations/management/commands/replace_migration_graph.py b/gg_django_replace_migrations/management/commands/replace_migration_graph.py new file mode 100644 index 0000000..bdf8a72 --- /dev/null +++ b/gg_django_replace_migrations/management/commands/replace_migration_graph.py @@ -0,0 +1,26 @@ +from django.db.migrations.graph import MigrationGraph +from django.db.migrations.state import ProjectState + + +class ReplaceMigrationGraph(MigrationGraph): + def make_state(self, nodes=None, at_end=True, real_apps=None): + """ + Given a migration node or nodes, return a complete ProjectState for it. + If at_end is False, return the state before the migration has run. + If nodes is not provided, return the overall most current project state. + """ + if nodes is None: + nodes = list(self.leaf_nodes()) + if not nodes: + return ProjectState() + if not isinstance(nodes[0], tuple): + nodes = [nodes] + plan = self._generate_plan(nodes, at_end) + project_state = ProjectState(real_apps=real_apps) + for node in plan: + # We have dependencies between the contrib and our migrations + # if a node is not found, do not link / use it + if node not in self.nodes: + continue + project_state = self.nodes[node].mutate_state(project_state, preserve=False) + return project_state diff --git a/gg_django_replace_migrations/management/commands/replace_migration_loader.py b/gg_django_replace_migrations/management/commands/replace_migration_loader.py new file mode 100644 index 0000000..f54b7fd --- /dev/null +++ b/gg_django_replace_migrations/management/commands/replace_migration_loader.py @@ -0,0 +1,94 @@ +from django.db.migrations.exceptions import NodeNotFoundError +from django.db.migrations.loader import MigrationLoader +from django.db.migrations.recorder import MigrationRecorder + +from .replace_migration_graph import ReplaceMigrationGraph + + +class ReplaceMigrationLoader(MigrationLoader): + def build_graph(self): + """ + Build a migration dependency graph using both the disk and database. + You'll need to rebuild the graph if you apply migrations. This isn't + usually a problem as generally migration stuff runs in a one-shot process. + """ + # Load disk data + self.load_disk() + # Load database data + if self.connection is None: + self.applied_migrations = {} + else: + recorder = MigrationRecorder(self.connection) + self.applied_migrations = recorder.applied_migrations() + # To start, populate the migration graph with nodes for ALL migrations + # and their dependencies. Also make note of replacing migrations at this step. + self.graph = ReplaceMigrationGraph() + self.replacements = {} + for key, migration in self.disk_migrations.items(): + self.graph.add_node(key, migration) + # Replacing migrations. + if migration.replaces: + self.replacements[key] = migration + for key, migration in self.disk_migrations.items(): + # Internal (same app) dependencies. + self.add_internal_dependencies(key, migration) + # Add external dependencies now that the internal ones have been resolved. + for key, migration in self.disk_migrations.items(): + self.add_external_dependencies(key, migration) + # Carry out replacements where possible and if enabled. + if self.replace_migrations: + for key, migration in self.replacements.items(): + # Get applied status of each of this migration's replacement + # targets. + applied_statuses = [ + (target in self.applied_migrations) for target in migration.replaces + ] + # The replacing migration is only marked as applied if all of + # its replacement targets are. + if all(applied_statuses): + self.applied_migrations[key] = migration + else: + self.applied_migrations.pop(key, None) + # A replacing migration can be used if either all or none of + # its replacement targets have been applied. + if all(applied_statuses) or (not any(applied_statuses)): + self.graph.remove_replaced_nodes(key, migration.replaces) + else: + # This replacing migration cannot be used because it is + # partially applied. Remove it from the graph and remap + # dependencies to it (#25945). + self.graph.remove_replacement_node(key, migration.replaces) + # Ensure the graph is consistent. + try: + self.graph.validate_consistency() + except NodeNotFoundError as exc: + # Check if the missing node could have been replaced by any squash + # migration but wasn't because the squash migration was partially + # applied before. In that case raise a more understandable exception + # (#23556). + # Get reverse replacements. + reverse_replacements = {} + for key, migration in self.replacements.items(): + for replaced in migration.replaces: + reverse_replacements.setdefault(replaced, set()).add(key) + # Try to reraise exception with more detail. + if exc.node in reverse_replacements: + candidates = reverse_replacements.get(exc.node, set()) + is_replaced = any( + candidate in self.graph.nodes for candidate in candidates + ) + if not is_replaced: + tries = ", ".join( + f"{key}.{migration}" for key, migration in candidates + ) + raise NodeNotFoundError( + "Migration {0} depends on nonexistent node ('{1}', '{2}'). " + "Django tried to replace migration {1}.{2} with any of [{3}] " + "but wasn't able to because some of the replaced migrations " + "are already applied.".format( + exc.origin, exc.node[0], exc.node[1], tries + ), + exc.node, + ) from exc + raise + self.graph.ensure_not_cyclic() diff --git a/gg_django_replace_migrations/management/commands/replace_migration_writer.py b/gg_django_replace_migrations/management/commands/replace_migration_writer.py new file mode 100644 index 0000000..e113d92 --- /dev/null +++ b/gg_django_replace_migrations/management/commands/replace_migration_writer.py @@ -0,0 +1,148 @@ +import re + +from django import get_version +from django.db.migrations.operations.special import ( + RunPython, + RunSQL, + SeparateDatabaseAndState, +) +from django.db.migrations.writer import MigrationWriter, OperationWriter +from django.utils.timezone import now + + +class ReplaceMigrationWriter(MigrationWriter): + def as_string(self): + """Return a string of the file contents.""" + items = { + "replaces_str": "", + "initial_str": "", + } + + imports = set() + + # Deconstruct operations + operations = [] + for operation in self.migration.operations: + if isinstance(operation, (RunPython, RunSQL, SeparateDatabaseAndState)): + continue + operation_string, operation_imports = OperationWriter(operation).serialize() + imports.update(operation_imports) + operations.append(operation_string) + items["operations"] = "\n".join(operations) + "\n" if operations else "" + + # Deconstruct special_operations + special_operations = [] + for special_op in self.migration.operations: + if ( + isinstance(special_op, (RunPython, RunSQL, SeparateDatabaseAndState)) + and not special_op.elidable + ): + operation_string, operation_imports = OperationWriter( + special_op + ).serialize() + imports.update(operation_imports) + special_operations.append(operation_string) + special_ops = ( + "\n".join(special_operations) + "\n" if special_operations else None + ) + items["special_operations"] = ( + "\n # /!\\ PRINT ALL THE SPECIAL OPERATIONS\n" + + " # /!\\ MUST BE MANUALLY REVIEWED\n\n" + + " special_operations = [\n" + + special_ops + + " ]\n" + if special_ops + else "" + ) + + # Format dependencies and write out swappable dependencies right + dependencies = [] + for dependency in self.migration.dependencies: + if dependency[0] == "__setting__": + dependencies.append( + f" migrations.swappable_dependency(settings.{dependency[1]})," + ) + imports.add("from django.conf import settings") + else: + dependencies.append(f" {self.serialize(dependency)[0]},") + items["dependencies"] = ( + "\n".join(sorted(dependencies)) + "\n" if dependencies else "" + ) + + # Format imports nicely, swapping imports of functions from migration files + # for comments + migration_imports = set() + for line in list(imports): + if re.match(r"^import (.*)\.\d+[^\s]*$", line): + migration_imports.add(line.split("import")[1].strip()) + imports.remove(line) + self.needs_manual_porting = True + + # django.db.migrations is always used, but models import may not be. + # If models import exists, merge it with migrations import. + if "from django.db import models" in imports: + imports.discard("from django.db import models") + imports.add("from django.db import migrations, models") + else: + imports.add("from django.db import migrations") + + # Sort imports by the package / module to be imported (the part after + # "from" in "from ... import ..." or after "import" in "import ..."). + sorted_imports = sorted(imports, key=lambda i: i.split()[1]) + items["imports"] = "\n".join(sorted_imports) + "\n" if imports else "" + if migration_imports: + items["imports"] += ( + "\n\n# Functions from the following migrations need manual " + "copying.\n# Move them and any dependencies into this file, " + "then update the\n# RunPython operations to refer to the local " + "versions:\n# %s" + ) % "\n# ".join(sorted(migration_imports)) + # If there's a replaces, make a string for it + if self.migration.replaces: + items[ + "replaces_str" + ] = f"\n replaces = {self.serialize(sorted(self.migration.replaces))[0]}\n" + # Hinting that goes into comment + if self.include_header: + items["migration_header"] = MIGRATION_HEADER_TEMPLATE % { + "version": get_version(), + "timestamp": now().strftime("%Y-%m-%d %H:%M"), + } + else: + items["migration_header"] = "" + + if self.migration.initial: + items["initial_str"] = "\n initial = True\n" + + return MIGRATION_TEMPLATE % items + + +MIGRATION_HEADER_TEMPLATE = """\ +# Generated by Django %(version)s on %(timestamp)s + +""" + + +MIGRATION_TEMPLATE = """\ +%(migration_header)s%(imports)s +from phased_migrations.constants import DeployPhase + + +class Migration(migrations.Migration): + # Note: deploy_phase was added to ensure consistency with no down time + # it is possible that this migration in not really compatible with pre-deploy + deploy_phase = DeployPhase.pre_deploy + + squashed_with_gg_script = True + +%(replaces_str)s%(initial_str)s + dependencies = [ +%(dependencies)s\ + ] + + operations = [ +%(operations)s\ + ] + +%(special_operations)s +""" diff --git a/setup.py b/setup.py index 133d139..f26ef2a 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ # Copyright 2019 3YOURMIND GmbH +# Copyright 2024 GitGuardian # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,25 +19,30 @@ PROJECT_DIR = path.abspath(path.dirname(__file__)) -VERSION = '0.0.2' +VERSION = "0.0.4" from os import path + this_directory = path.abspath(path.dirname(__file__)) -with open(path.join(this_directory, 'README.md'), encoding='utf-8') as f: +with open(path.join(this_directory, "README.md"), encoding="utf-8") as f: long_description = f.read() setup( - name="django-replace-migrations", + name="gg-django-replace-migrations", version=VERSION, - description="This package is an extension to djangos makemigrations.py. It can be used to get rid of old migrations as an alternative to djangos squashmigration command.", + description=( + "This package offers a new command: replace_all_migrations. It can be" + " used to get rid of old migrations as an alternative to djangos" + " squashmigration command." + ), long_description=long_description, - url="https://github.com/3YOURMIND/django-replace-migrations", - author="3YOURMIND GmbH", + url="https://gitlab.gitguardian.ovh/gg-code/gg-django-replace-migrations", + author="GitGuardian", license="Apache License 2.0", packages=find_packages(exclude=["tests/"]), - install_requires=["django>=2.1"], + install_requires=["django>=4.2"], extras_require={}, keywords="django migration replace squash squashmigrations database", classifiers=[ @@ -44,13 +50,10 @@ "Intended Audience :: Developers", "Environment :: Web Environment", "Framework :: Django", - "Framework :: Django :: 2.1", - "Framework :: Django :: 2.2", - "Framework :: Django :: 3.0", + "Framework :: Django :: 4.2", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", ], )