1
+ import hashlib
2
+ import importlib
1
3
import logging
4
+ import sys
2
5
from asyncio import get_event_loop
3
- from datetime import datetime
6
+ from datetime import datetime , timezone
7
+ from os import listdir , path
8
+ from os .path import isfile , join
4
9
5
- from sqlalchemy import Table , Column , String , DateTime
6
- from sqlalchemy .ext .asyncio import AsyncEngine
7
- from sqlalchemy .orm import registry
10
+ from sqlalchemy import Table , Column , String , DateTime , UniqueConstraint , text , select
11
+ from sqlalchemy .ext .asyncio import AsyncEngine , AsyncSession , async_sessionmaker
12
+ from sqlalchemy .orm import registry , sessionmaker , Mapped , mapped_column
8
13
9
14
from alembic import context
10
15
from common .bootstrap import application_init
35
40
db_names = target_metadata .keys ()
36
41
config .set_main_option ("databases" , "," .join (db_names ))
37
42
38
- def inject_fixture_tables (registry_mapper : registry ):
39
- Table (
40
- "alembic_fixtures" ,
41
- registry_mapper .metadata ,
42
- Column ("filename" , String (), primary_key = True ),
43
- Column ("signature" , String (), nullable = False ),
44
- Column ("processed_at" , DateTime (timezone = True ), nullable = False , default = datetime .now ),
45
- )
43
+ def generate_fixture_migration_model (declarative_base : type ):
44
+ class FixtureMigration (declarative_base ):
45
+ __tablename__ = "alembic_fixtures"
46
+
47
+ bind : Mapped [str ] = mapped_column (String (), primary_key = True )
48
+ filename : Mapped [str ] = mapped_column (String (), primary_key = True )
49
+ signature : Mapped [str ] = mapped_column (String (), nullable = False )
50
+ processed_at : Mapped [datetime ] = mapped_column (DateTime (), nullable = False , default = datetime .now )
51
+ return FixtureMigration
46
52
53
+ fixture_migration_models = {}
47
54
for name in db_names :
48
- inject_fixture_tables (sa_manager .get_bind (name ).registry_mapper )
55
+ fixture_migration_models [name ] = generate_fixture_migration_model (
56
+ sa_manager .get_bind (name ).declarative_base
57
+ )
49
58
50
59
51
60
# add your model's MetaData objects here
@@ -66,6 +75,51 @@ def inject_fixture_tables(registry_mapper: registry):
66
75
# my_important_option = config.get_main_option("my_important_option")
67
76
# ... etc.
68
77
78
+ def calculate_md5 (file_path ):
79
+ hasher = hashlib .md5 ()
80
+ with open (file_path , 'rb' ) as file :
81
+ hasher .update (file .read ())
82
+ return hasher .hexdigest ()
83
+
84
+ async def a_migrate_fixtures (bind_name : str , Session : async_sessionmaker [AsyncSession ]) -> str :
85
+ alembic_path = path .dirname (path .realpath (__file__ ))
86
+ fixtures_path = alembic_path + "/fixtures"
87
+ sys .path .append (alembic_path )
88
+ module_names = [
89
+ f [:- 3 ] for f in listdir (fixtures_path )
90
+ if isfile (join (fixtures_path , f ))
91
+ and f .endswith (".py" )
92
+ and f != "__init__.py"
93
+ ]
94
+ async with Session () as session :
95
+ for module_name in module_names :
96
+ logging .debug (f"Creating { module_name } fixtures for { bind_name } " )
97
+ m = importlib .import_module (f"fixtures.{ module_name } " )
98
+ fixture_migration = await session .get (
99
+ fixture_migration_models [bind_name ],
100
+ (bind_name , f"{ module_name } .py" )
101
+ )
102
+ signature = calculate_md5 (f"{ fixtures_path } /{ module_name } .py" )
103
+ if fixture_migration :
104
+ if signature != fixture_migration .signature :
105
+ logging .warning (f"Signature mismatch for { fixture_migration .filename } fixture. The file has been modified after being processed." )
106
+ logging .debug (f"{ module_name } fixtures already migrated for { bind_name } " )
107
+ continue
108
+
109
+ session .add_all (m .fixtures ().get (bind_name , []))
110
+ session .add (fixture_migration_models [bind_name ](
111
+ bind = bind_name ,
112
+ filename = f"{ module_name } .py" ,
113
+ signature = signature ,
114
+ ))
115
+ try :
116
+ await session .commit ()
117
+ logging .info (f"{ module_name } fixtures correctly created for { bind_name } " )
118
+ except :
119
+ await session .rollback ()
120
+ logging .error (f"{ module_name } fixtures failed to apply to { bind_name } " )
121
+
122
+
69
123
70
124
def run_migrations_offline () -> None :
71
125
"""Run migrations in 'offline' mode.
@@ -149,13 +203,17 @@ async def run_migrations_online() -> None:
149
203
for name , rec in engines .items ():
150
204
logger .info (f"Migrating database { name } " )
151
205
if isinstance (rec ["engine" ], AsyncEngine ):
152
-
153
206
def migration_callable (* args , ** kwargs ):
154
207
return do_run_migration (* args , name = name , ** kwargs )
155
208
156
209
await rec ["connection" ].run_sync (migration_callable )
210
+ Session = async_sessionmaker (bind = rec ["connection" ])
211
+ await a_migrate_fixtures (bind_name = name , Session = Session )
212
+
157
213
else :
158
214
do_run_migration (rec ["connection" ], name )
215
+ # Session = sessionmaker(bind=rec["connection"])
216
+ # session = Session()
159
217
160
218
if USE_TWOPHASE :
161
219
for rec in engines .values ():
0 commit comments