13
13
import re
14
14
from time import strftime
15
15
from traceback import format_exception
16
+ import typing as ty
17
+ import inspect
18
+ import warnings
16
19
17
20
18
- from .specs import Runtime , File , Directory , attr_fields , Result
21
+ from .specs import Runtime , File , Directory , attr_fields , Result , LazyField
19
22
from .helpers_file import hash_file , hash_dir , copyfile , is_existing_file
20
23
21
24
@@ -234,8 +237,11 @@ def make_klass(spec):
234
237
if len (item ) == 2 :
235
238
if isinstance (item [1 ], attr ._make ._CountingAttr ):
236
239
newfields [item [0 ]] = item [1 ]
240
+ newfields [item [0 ]].validator (custom_validator )
237
241
else :
238
- newfields [item [0 ]] = attr .ib (type = item [1 ])
242
+ newfields [item [0 ]] = attr .ib (
243
+ type = item [1 ], validator = custom_validator
244
+ )
239
245
else :
240
246
if (
241
247
any ([isinstance (ii , attr ._make ._CountingAttr ) for ii in item ])
@@ -251,17 +257,201 @@ def make_klass(spec):
251
257
name , tp = item [:2 ]
252
258
if isinstance (item [- 1 ], dict ) and "help_string" in item [- 1 ]:
253
259
mdata = item [- 1 ]
254
- newfields [name ] = attr .ib (type = tp , metadata = mdata )
260
+ newfields [name ] = attr .ib (
261
+ type = tp , metadata = mdata , validator = custom_validator
262
+ )
255
263
else :
256
264
dflt = item [- 1 ]
257
- newfields [name ] = attr .ib (type = tp , default = dflt )
265
+ newfields [name ] = attr .ib (
266
+ type = tp , default = dflt , validator = custom_validator
267
+ )
258
268
elif len (item ) == 4 :
259
269
name , tp , dflt , mdata = item
260
- newfields [name ] = attr .ib (type = tp , default = dflt , metadata = mdata )
270
+ newfields [name ] = attr .ib (
271
+ type = tp ,
272
+ default = dflt ,
273
+ metadata = mdata ,
274
+ validator = custom_validator ,
275
+ )
261
276
fields = newfields
262
277
return attr .make_class (spec .name , fields , bases = spec .bases , kw_only = True )
263
278
264
279
280
+ def custom_validator (instance , attribute , value ):
281
+ """simple custom validation
282
+ take into account ty.Union, ty.List, ty.Dict (but only one level depth)
283
+ adding an additional validator, if allowe_values provided
284
+ """
285
+ validators = []
286
+ tp_attr = attribute .type
287
+ # a flag that could be changed to False, if the type is not recognized
288
+ check_type = True
289
+ if (
290
+ value is attr .NOTHING
291
+ or value is None
292
+ or attribute .name .startswith ("_" ) # e.g. _func
293
+ or isinstance (value , LazyField )
294
+ or tp_attr in [ty .Any , inspect ._empty ]
295
+ ):
296
+ check_type = False # no checking of the type
297
+ elif isinstance (tp_attr , type ) or tp_attr in [File , Directory ]:
298
+ tp = _single_type_update (tp_attr , name = attribute .name )
299
+ cont_type = None
300
+ else : # more complex types
301
+ cont_type , tp_attr_list = _check_special_type (tp_attr , name = attribute .name )
302
+ if cont_type is ty .Union :
303
+ tp , check_type = _types_updates (tp_attr_list , name = attribute .name )
304
+ elif cont_type is list :
305
+ tp , check_type = _types_updates (tp_attr_list , name = attribute .name )
306
+ elif cont_type is dict :
307
+ # assuming that it should have length of 2 for keys and values
308
+ if len (tp_attr_list ) != 2 :
309
+ check_type = False
310
+ else :
311
+ tp_attr_key , tp_attr_val = tp_attr_list
312
+ # updating types separately for keys and values
313
+ tp_k , check_k = _types_updates ([tp_attr_key ], name = attribute .name )
314
+ tp_v , check_v = _types_updates ([tp_attr_val ], name = attribute .name )
315
+ # assuming that I have to be able to check keys and values
316
+ if not (check_k and check_v ):
317
+ check_type = False
318
+ else :
319
+ tp = {"key" : tp_k , "val" : tp_v }
320
+ else :
321
+ warnings .warn (
322
+ f"no type check for { attribute .name } field, no type check implemented for value { value } and type { tp_attr } "
323
+ )
324
+ check_type = False
325
+
326
+ if check_type :
327
+ validators .append (_type_validator (instance , attribute , value , tp , cont_type ))
328
+
329
+ # checking additional requirements for values (e.g. allowed_values)
330
+ meta_attr = attribute .metadata
331
+ if "allowed_values" in meta_attr :
332
+ validators .append (_allowed_values_validator (isinstance , attribute , value ))
333
+ return validators
334
+
335
+
336
+ def _type_validator (instance , attribute , value , tp , cont_type ):
337
+ """ creating a customized type validator,
338
+ uses validator.deep_iterable/mapping if the field is a container
339
+ (i.e. ty.List or ty.Dict),
340
+ it also tries to guess when the value is a list due to the splitter
341
+ and validates the elements
342
+ """
343
+ if cont_type is None or cont_type is ty .Union :
344
+ # if tp is not (list,), we are assuming that the value is a list
345
+ # due to the splitter, so checking the member types
346
+ if isinstance (value , list ) and tp != (list ,):
347
+ return attr .validators .deep_iterable (
348
+ member_validator = attr .validators .instance_of (
349
+ tp + (attr ._make ._Nothing ,)
350
+ )
351
+ )(instance , attribute , value )
352
+ else :
353
+ return attr .validators .instance_of (tp + (attr ._make ._Nothing ,))(
354
+ instance , attribute , value
355
+ )
356
+ elif cont_type is list :
357
+ return attr .validators .deep_iterable (
358
+ member_validator = attr .validators .instance_of (tp + (attr ._make ._Nothing ,))
359
+ )(instance , attribute , value )
360
+ elif cont_type is dict :
361
+ return attr .validators .deep_mapping (
362
+ key_validator = attr .validators .instance_of (tp ["key" ]),
363
+ value_validator = attr .validators .instance_of (
364
+ tp ["val" ] + (attr ._make ._Nothing ,)
365
+ ),
366
+ )(instance , attribute , value )
367
+ else :
368
+ raise Exception (
369
+ f"container type of { attribute .name } should be None, list, dict or ty.Union, and not { cont_type } "
370
+ )
371
+
372
+
373
+ def _types_updates (tp_list , name ):
374
+ """updating the type's tuple with possible additional types"""
375
+ tp_upd_list = []
376
+ check = True
377
+ for tp_el in tp_list :
378
+ tp_upd = _single_type_update (tp_el , name , simplify = True )
379
+ if tp_upd is None :
380
+ check = False
381
+ break
382
+ else :
383
+ tp_upd_list += list (tp_upd )
384
+ tp_upd = tuple (set (tp_upd_list ))
385
+ return tp_upd , check
386
+
387
+
388
+ def _single_type_update (tp , name , simplify = False ):
389
+ """ updating a single type with other related types - e.g. adding bytes for str
390
+ if simplify is True, than changing typing.List to list etc.
391
+ (assuming that I validate only one depth, so have to simplify at some point)
392
+ """
393
+ if isinstance (tp , type ) or tp in [File , Directory ]:
394
+ if tp is str :
395
+ return (str , bytes )
396
+ elif tp in [File , Directory , os .PathLike ]:
397
+ return (os .PathLike , str )
398
+ elif tp is float :
399
+ return (float , int )
400
+ else :
401
+ return (tp ,)
402
+ elif simplify is True :
403
+ warnings .warn (f"simplify validator for { name } field, checking only one depth" )
404
+ cont_tp , types_list = _check_special_type (tp , name = name )
405
+ if cont_tp is list :
406
+ return (list ,)
407
+ elif cont_tp is dict :
408
+ return (dict ,)
409
+ elif cont_tp is ty .Union :
410
+ return types_list
411
+ else :
412
+ warnings .warn (
413
+ f"no type check for { name } field, type check not implemented for type of { tp } "
414
+ )
415
+ return None
416
+ else :
417
+ warnings .warn (
418
+ f"no type check for { name } field, type check not implemented for type - { tp } , consider using simplify=True"
419
+ )
420
+ return None
421
+
422
+
423
+ def _check_special_type (tp , name ):
424
+ """checking if the type is a container: ty.List, ty.Dict or ty.Union """
425
+ if sys .version_info .minor >= 8 :
426
+ return ty .get_origin (tp ), ty .get_args (tp )
427
+ else :
428
+ if isinstance (tp , type ): # simple type
429
+ return None , ()
430
+ else :
431
+ if tp ._name == "List" :
432
+ return list , tp .__args__
433
+ elif tp ._name == "Dict" :
434
+ return dict , tp .__args__
435
+ elif tp .__origin__ is ty .Union :
436
+ return ty .Union , tp .__args__
437
+ else :
438
+ warnings .warn (
439
+ f"not type check for { name } field, type check not implemented for type { tp } "
440
+ )
441
+ return None , ()
442
+
443
+
444
+ def _allowed_values_validator (instance , attribute , value ):
445
+ """ checking if the values is in allowed_values"""
446
+ allowed = attribute .metadata ["allowed_values" ]
447
+ if value is attr .NOTHING :
448
+ pass
449
+ elif value not in allowed :
450
+ raise ValueError (
451
+ f"value of { attribute .name } has to be from { allowed } , but { value } provided"
452
+ )
453
+
454
+
265
455
async def read_stream_and_display (stream , display ):
266
456
"""
267
457
Read from stream line by line until EOF, display, and capture the lines.
0 commit comments