3
3
import logging
4
4
import sys
5
5
from asyncio import get_event_loop
6
- from datetime import datetime , timezone
6
+ from datetime import datetime
7
7
from os import listdir , path
8
8
from os .path import isfile , join
9
9
10
- from sqlalchemy import Table , Column , String , DateTime , UniqueConstraint , text , select
10
+ from sqlalchemy import DateTime , String
11
11
from sqlalchemy .ext .asyncio import AsyncEngine , AsyncSession , async_sessionmaker
12
- from sqlalchemy .orm import registry , sessionmaker , Mapped , mapped_column
12
+ from sqlalchemy .orm import Mapped , mapped_column
13
13
14
14
from alembic import context
15
15
from common .bootstrap import application_init
40
40
db_names = target_metadata .keys ()
41
41
config .set_main_option ("databases" , "," .join (db_names ))
42
42
43
+
43
44
def generate_fixture_migration_model (declarative_base : type ):
44
45
class FixtureMigration (declarative_base ):
45
46
__tablename__ = "alembic_fixtures"
46
47
47
48
bind : Mapped [str ] = mapped_column (String (), primary_key = True )
48
49
filename : Mapped [str ] = mapped_column (String (), primary_key = True )
49
50
signature : Mapped [str ] = mapped_column (String (), nullable = False )
50
- processed_at : Mapped [datetime ] = mapped_column (DateTime (), nullable = False , default = datetime .now )
51
+ processed_at : Mapped [datetime ] = mapped_column (
52
+ DateTime (), nullable = False , default = datetime .now
53
+ )
54
+
51
55
return FixtureMigration
52
56
57
+
53
58
fixture_migration_models = {}
54
59
for name in db_names :
55
60
fixture_migration_models [name ] = generate_fixture_migration_model (
@@ -75,55 +80,64 @@ class FixtureMigration(declarative_base):
75
80
# my_important_option = config.get_main_option("my_important_option")
76
81
# ... etc.
77
82
78
- def calculate_md5 (file_path ):
79
- hasher = hashlib .md5 ()
80
- with open (file_path , 'rb' ) as file :
83
+
84
+ def calculate_signature (file_path ):
85
+ hasher = hashlib .sha256 ()
86
+ with open (file_path , "rb" ) as file :
81
87
hasher .update (file .read ())
82
88
return hasher .hexdigest ()
83
89
84
- async def a_migrate_fixtures (bind_name : str , Session : async_sessionmaker [AsyncSession ]) -> str :
90
+
91
+ async def a_migrate_fixtures (
92
+ bind_name : str , session : async_sessionmaker [AsyncSession ]
93
+ ) -> str :
85
94
alembic_path = path .dirname (path .realpath (__file__ ))
86
95
fixtures_path = alembic_path + "/fixtures"
87
96
sys .path .append (alembic_path )
88
97
module_names = [
89
- f [:- 3 ] for f in listdir (fixtures_path )
90
- if isfile (join (fixtures_path , f ))
91
- and f .endswith (".py" )
92
- and f != "__init__.py"
98
+ f [:- 3 ]
99
+ for f in listdir (fixtures_path )
100
+ if isfile (join (fixtures_path , f )) and f .endswith (".py" ) and f != "__init__.py"
93
101
]
94
- async with Session () as session :
102
+ async with session () as session :
95
103
for module_name in module_names :
96
104
logging .debug (f"Creating { module_name } fixtures for { bind_name } " )
97
105
m = importlib .import_module (f"fixtures.{ module_name } " )
98
106
fixture_migration = await session .get (
99
- fixture_migration_models [bind_name ],
100
- (bind_name , f"{ module_name } .py" )
107
+ fixture_migration_models [bind_name ], (bind_name , f"{ module_name } .py" )
101
108
)
102
- signature = calculate_md5 (f"{ fixtures_path } /{ module_name } .py" )
109
+ signature = calculate_signature (f"{ fixtures_path } /{ module_name } .py" )
103
110
if fixture_migration :
104
111
if signature != fixture_migration .signature :
105
- logging .warning (f"Signature mismatch for { fixture_migration .filename } fixture."
106
- f" The file has been already processed but has been modified since then."
107
- f" It will not be processed again." )
112
+ logging .warning (
113
+ f"Signature mismatch for { fixture_migration .filename } fixture."
114
+ f" The file has been already processed but has been modified"
115
+ f" since then. It will not be processed again."
116
+ )
108
117
else :
109
- logging .debug (f"{ module_name } fixtures already processed for { bind_name } " )
118
+ logging .debug (
119
+ f"{ module_name } fixtures already processed for { bind_name } "
120
+ )
110
121
continue
111
122
112
123
session .add_all (m .fixtures ().get (bind_name , []))
113
- session .add (fixture_migration_models [bind_name ](
114
- bind = bind_name ,
115
- filename = f"{ module_name } .py" ,
116
- signature = signature ,
117
- ))
124
+ session .add (
125
+ fixture_migration_models [bind_name ](
126
+ bind = bind_name ,
127
+ filename = f"{ module_name } .py" ,
128
+ signature = signature ,
129
+ )
130
+ )
118
131
try :
119
132
await session .commit ()
120
- logging .info (f"{ module_name } fixtures correctly created for { bind_name } " )
121
- except :
133
+ logging .info (
134
+ f"{ module_name } fixtures correctly created for { bind_name } "
135
+ )
136
+ except Exception :
122
137
await session .rollback ()
123
138
logging .error (f"{ module_name } fixtures failed to apply to { bind_name } " )
124
139
125
140
126
-
127
141
def run_migrations_offline () -> None :
128
142
"""Run migrations in 'offline' mode.
129
143
@@ -206,12 +220,14 @@ async def run_migrations_online() -> None:
206
220
for name , rec in engines .items ():
207
221
logger .info (f"Migrating database { name } " )
208
222
if isinstance (rec ["engine" ], AsyncEngine ):
223
+
209
224
def migration_callable (* args , ** kwargs ):
210
225
return do_run_migration (* args , name = name , ** kwargs )
211
226
212
227
await rec ["connection" ].run_sync (migration_callable )
213
- Session = async_sessionmaker (bind = rec ["connection" ])
214
- await a_migrate_fixtures (bind_name = name , Session = Session )
228
+ await a_migrate_fixtures (
229
+ bind_name = name , session = async_sessionmaker (bind = rec ["connection" ])
230
+ )
215
231
216
232
else :
217
233
do_run_migration (rec ["connection" ], name )
0 commit comments