16
16
# See the License for the specific language governing permissions and
17
17
# limitations under the License.
18
18
"""Various metadata migrations for v10."""
19
+
19
20
import io
20
21
import json
21
22
import os
26
27
27
28
import zstandard as zstd
28
29
30
+ from renku .command .checks .activities import fix_activity_dates
29
31
from renku .command .checks .workflow import fix_plan_dates
30
32
from renku .command .command_builder import inject
31
33
from renku .core .interface .activity_gateway import IActivityGateway
@@ -53,9 +55,12 @@ def migrate(migration_context: MigrationContext):
53
55
if MigrationType .WORKFLOWS in migration_context .options .type :
54
56
migrate_activity_ids ()
55
57
fix_plan_times ()
58
+ fix_activity_times ()
56
59
57
60
migrate_remote_entity_ids ()
58
61
fix_dataset_date_modified ()
62
+ fix_dataset_image_ids ()
63
+ fix_removed_plans ()
59
64
60
65
# NOTE: Rebuild all workflow catalogs since ids and times have changed
61
66
communication .echo ("Rebuilding workflow metadata" )
@@ -76,8 +81,8 @@ def migrate_old_metadata_namespaces():
76
81
header = int .from_bytes (file .read (4 ), "little" )
77
82
file .seek (0 )
78
83
if header == zstd .MAGIC_NUMBER :
79
- with decompressor .stream_reader (file ) as zfile :
80
- data = json .load (zfile )
84
+ with decompressor .stream_reader (file ) as compressed_file :
85
+ data = json .load (compressed_file )
81
86
compressed = True
82
87
else :
83
88
data = json .load (file )
@@ -99,7 +104,7 @@ def migrate_old_metadata_namespaces():
99
104
100
105
101
106
def nested_update (data : Dict [str , Any ], target_key : str , transforms : List [Tuple [str , str ]]) -> None :
102
- """Update a key's value based on tranformations (from, to) in a deeply nested dictionary."""
107
+ """Update a key's value based on transformations (from, to) in a deeply nested dictionary."""
103
108
for k in list (data .keys ()):
104
109
value = data [k ]
105
110
if isinstance (value , str ) and k == target_key :
@@ -232,19 +237,10 @@ def migrate_project_template_data(project_gateway: IProjectGateway):
232
237
project_context .database .commit ()
233
238
234
239
235
- @inject .autoparams ("activity_gateway" , " plan_gateway" )
236
- def fix_plan_times (activity_gateway : IActivityGateway , plan_gateway : IPlanGateway ):
240
+ @inject .autoparams ("plan_gateway" )
241
+ def fix_plan_times (plan_gateway : IPlanGateway ):
237
242
"""Add timezone to plan invalidations."""
238
- database = project_context .database
239
-
240
243
plans : List [AbstractPlan ] = plan_gateway .get_all_plans ()
241
- all_activities = activity_gateway .get_all_activities ()
242
- activity_map : Dict [str , Activity ] = {}
243
-
244
- for activity in all_activities :
245
- plan_id = activity .association .plan .id
246
- if plan_id not in activity_map or activity .started_at_time < activity_map [plan_id ].started_at_time :
247
- activity_map [plan_id ] = activity
248
244
249
245
for plan in plans :
250
246
plan .unfreeze ()
@@ -255,24 +251,41 @@ def fix_plan_times(activity_gateway: IActivityGateway, plan_gateway: IPlanGatewa
255
251
plan .date_removed = None
256
252
257
253
if plan .date_removed is not None :
258
- if plan .date_removed < plan .date_created :
259
- # NOTE: Fix invalidation times set before creation date on plans
260
- plan .date_removed = plan .date_created
261
254
if plan .date_removed .tzinfo is None :
262
255
# NOTE: There was a bug that caused date_removed to be set without timezone (as UTC time)
263
256
# so we patch in the timezone here
264
257
plan .date_removed = plan .date_removed .replace (microsecond = 0 ).astimezone (timezone .utc )
265
- if plan .id in activity_map and plan .date_created > activity_map [plan .id ].started_at_time :
266
- plan .date_created = activity_map [plan .id ].started_at_time
258
+ if plan .date_removed < plan .date_created :
259
+ # NOTE: Fix invalidation times set before creation date on plans
260
+ plan .date_removed = plan .date_created
267
261
plan .freeze ()
268
262
269
263
fix_plan_dates (plans = plans , plan_gateway = plan_gateway )
270
- database .commit ()
264
+ project_context .database .commit ()
265
+
266
+
267
+ @inject .autoparams ("activity_gateway" )
268
+ def fix_activity_times (activity_gateway : IActivityGateway ):
269
+ """Make sure activities have valid start/end/delete dates."""
270
+ fix_activity_dates (activities = activity_gateway .get_all_activities (include_deleted = True ))
271
+ project_context .database .commit ()
271
272
272
273
273
274
@inject .autoparams ("dataset_gateway" )
274
275
def fix_dataset_date_modified (dataset_gateway : IDatasetGateway ):
275
276
"""Change date_created and date_modified to have correct semantics."""
277
+
278
+ def fix_creation_date (dataset ):
279
+ """Check creation date to make sure that it's after project's creation date."""
280
+ if dataset .date_created and dataset .date_created < project_context .project .date_created :
281
+ try :
282
+ dataset .date_created = min ([f .date_added for f in dataset .files ])
283
+ except (ValueError , TypeError ):
284
+ dataset .date_created = project_context .project .date_created
285
+ else :
286
+ if dataset .date_created < project_context .project .date_created :
287
+ dataset .date_created = project_context .project .date_created
288
+
276
289
tails = dataset_gateway .get_provenance_tails ()
277
290
278
291
for dataset_tail in tails :
@@ -281,6 +294,7 @@ def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
281
294
previous_modification_date = local_now ()
282
295
283
296
while dataset .derived_from is not None :
297
+ fix_creation_date (dataset )
284
298
modification_date = dataset .date_removed or dataset .date_created
285
299
286
300
if modification_date is not None :
@@ -294,8 +308,9 @@ def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
294
308
found_datasets .append (dataset )
295
309
dataset = dataset_gateway .get_by_id (dataset .derived_from .value )
296
310
311
+ fix_creation_date (dataset )
297
312
# NOTE: first dataset in chain
298
- modification_date = dataset .date_created or dataset .date_published
313
+ modification_date = dataset .date_published or dataset .date_created
299
314
if modification_date is not None :
300
315
dataset .unfreeze ()
301
316
dataset .date_modified = modification_date
@@ -308,3 +323,41 @@ def fix_dataset_date_modified(dataset_gateway: IDatasetGateway):
308
323
child .freeze ()
309
324
310
325
project_context .database .commit ()
326
+
327
+
328
+ @inject .autoparams ("dataset_gateway" )
329
+ def fix_dataset_image_ids (dataset_gateway : IDatasetGateway ):
330
+ """Remove dashes from dataset image IDs."""
331
+ for dataset in dataset_gateway .get_provenance_tails ():
332
+ while True :
333
+ if dataset .images :
334
+ for image in dataset .images :
335
+ image .id = image .id .replace ("-" , "" )
336
+
337
+ dataset ._p_changed = True
338
+
339
+ if not dataset .derived_from :
340
+ break
341
+
342
+ dataset = dataset_gateway .get_by_id (dataset .derived_from .value )
343
+
344
+ project_context .database .commit ()
345
+
346
+
347
+ @inject .autoparams ("plan_gateway" )
348
+ def fix_removed_plans (plan_gateway : IPlanGateway ):
349
+ """Create a derivative if a removed plan doesn't have one."""
350
+ plans : List [AbstractPlan ] = plan_gateway .get_all_plans ()
351
+
352
+ for plan in plans :
353
+ if plan .date_removed and plan .derived_from is None :
354
+ derived_plan = plan .derive ()
355
+ derived_plan .date_modified = plan .date_modified
356
+ derived_plan .delete (when = plan .date_removed )
357
+ plan_gateway .add (derived_plan )
358
+
359
+ plan .unfreeze ()
360
+ plan .date_removed = None
361
+ plan .freeze ()
362
+
363
+ project_context .database .commit ()
0 commit comments