11import logging
2+ from dataclasses import asdict , dataclass
3+ from enum import Enum
24from os import getcwd
35from pathlib import Path
46from typing import Any , Optional
57
6- from tomlkit import dumps , parse
8+ from tomlkit import TOMLDocument , dumps , parse , table
79
8- from twyn .base .constants import DEFAULT_PROJECT_TOML_FILE
10+ from twyn .base .constants import (
11+ DEFAULT_PROJECT_TOML_FILE ,
12+ DEFAULT_SELECTOR_METHOD ,
13+ AvailableLoggingLevels ,
14+ )
915from twyn .core .exceptions import (
1016 AllowlistPackageAlreadyExistsError ,
1117 AllowlistPackageDoesNotExistError ,
18+ TOMLError ,
1219)
1320
1421logger = logging .getLogger ()
1522
1623
24+ @dataclass (frozen = True )
25+ class TwynConfiguration :
26+ """Fully resolved configuration for Twyn."""
27+
28+ dependency_file : Optional [str ]
29+ selector_method : str
30+ logging_level : AvailableLoggingLevels
31+ allowlist : set [str ]
32+
33+
34+ @dataclass (frozen = True )
35+ class ReadTwynConfiguration :
36+ """Configuration for twyn as set by the user. It may have None values."""
37+
38+ dependency_file : Optional [str ]
39+ selector_method : Optional [str ]
40+ logging_level : Optional [AvailableLoggingLevels ]
41+ allowlist : set [str ]
42+
43+
1744class ConfigHandler :
18- """Read certain values into a central ConfigHandler object ."""
45+ """Manage reading and writing configurations for Twyn ."""
1946
2047 def __init__ (self , file_path : Optional [str ] = None , enforce_file : bool = True ):
2148 self ._file_path = file_path or DEFAULT_PROJECT_TOML_FILE
2249 self ._enforce_file = enforce_file
23- self ._toml = self ._get_toml_as_dict ()
24- self ._twyn_data = self ._get_twyn_data ()
2550
26- self .dependency_file : Optional [str ] = self ._twyn_data .get ("dependency_file" )
27- self .selector_method : Optional [str ] = self ._twyn_data .get ("selector_method" )
28- self .logging_level : Optional [str ] = self ._twyn_data .get ("logging_level" )
29- self .allowlist : set [str ] = set (self ._twyn_data .get ("allowlist" , []))
51+ def resolve_config (
52+ self ,
53+ selector_method : Optional [str ] = None ,
54+ dependency_file : Optional [str ] = None ,
55+ verbosity : AvailableLoggingLevels = AvailableLoggingLevels .none ,
56+ ) -> TwynConfiguration :
57+ """Resolve the configuration for Twyn.
58+
59+ Given the cli flags it will return a fully resolved configuration for Twyn,
60+ giving precedence to cli flags vs values set in the config files.
61+
62+ It will also handle default values, when appropriate.
63+ """
64+ toml = self ._read_toml ()
65+ twyn_config_data = self ._get_twyn_data_from_toml (toml )
66+
67+ # Resolve the configuration so that it is ready to be used by Twyn,
68+ # handling defaults etc
69+ return TwynConfiguration (
70+ dependency_file = dependency_file or twyn_config_data .get ("dependency_file" ),
71+ selector_method = selector_method or twyn_config_data .get ("selector_method" , DEFAULT_SELECTOR_METHOD ),
72+ logging_level = _get_logging_level (verbosity , twyn_config_data .get ("logging_level" )),
73+ allowlist = set (twyn_config_data .get ("allowlist" , set ())),
74+ )
3075
3176 def add_package_to_allowlist (self , package_name : str ) -> None :
32- if package_name in self .allowlist :
77+ """Add a package to the allowlist configuration in the toml file."""
78+ toml = self ._read_toml ()
79+ config = self ._get_read_config (toml )
80+ if package_name in config .allowlist :
3381 raise AllowlistPackageAlreadyExistsError (package_name )
3482
35- self ._create_allowlist_in_toml_if_not_exists ()
36-
37- self ._toml ["tool" ]["twyn" ]["allowlist" ].append (package_name )
38- self ._write_toml ()
39-
40- logger .warning (f"Package '{ package_name } ' successfully added to allowlist" )
83+ new_config = ReadTwynConfiguration (
84+ dependency_file = config .dependency_file ,
85+ selector_method = config .selector_method ,
86+ logging_level = config .logging_level ,
87+ allowlist = config .allowlist | {package_name },
88+ )
89+ self ._write_config (toml , new_config )
90+ logger .info (f"Package '{ package_name } ' successfully added to allowlist" )
4191
4292 def remove_package_from_allowlist (self , package_name : str ) -> None :
43- if package_name not in self .allowlist :
93+ """Remove a package from the allowlist configuration in the toml file."""
94+ toml = self ._read_toml ()
95+ config = self ._get_read_config (toml )
96+ if package_name not in config .allowlist :
4497 raise AllowlistPackageDoesNotExistError (package_name )
4598
46- self ._toml ["tool" ]["twyn" ]["allowlist" ].remove (package_name )
47- self ._write_toml ()
48- logger .warning (f"Package '{ package_name } ' successfully removed from allowlist" )
99+ new_config = ReadTwynConfiguration (
100+ dependency_file = config .dependency_file ,
101+ selector_method = config .selector_method ,
102+ logging_level = config .logging_level ,
103+ allowlist = config .allowlist - {package_name },
104+ )
105+ self ._write_config (toml , new_config )
106+ logger .info (f"Package '{ package_name } ' successfully removed from allowlist" )
107+
108+ def _get_read_config (self , toml : TOMLDocument ) -> ReadTwynConfiguration :
109+ """Read the twyn configuration from a provided toml document."""
110+ twyn_config_data = self ._get_twyn_data_from_toml (toml )
111+ return ReadTwynConfiguration (
112+ dependency_file = twyn_config_data .get ("dependency_file" ),
113+ selector_method = twyn_config_data .get ("selector_method" ),
114+ logging_level = twyn_config_data .get ("logging_level" ),
115+ allowlist = set (twyn_config_data .get ("allowlist" , set ())),
116+ )
117+
118+ def _write_config (self , toml : TOMLDocument , config : ReadTwynConfiguration ) -> None :
119+ """Write the configuration to the toml file.
120+
121+ All null values are simply omitted from the toml file.
122+ """
123+ twyn_toml_data = asdict (config , dict_factory = lambda x : _serialize_config (x ))
124+ if "tool" not in toml :
125+ toml .add ("tool" , table ())
126+ if "twyn" not in toml ["tool" ]: # type: ignore[operator]
127+ toml ["tool" ]["twyn" ] = {} # type: ignore[index]
128+ toml ["tool" ]["twyn" ] = twyn_toml_data # type: ignore[index]
129+ self ._write_toml (toml )
130+
131+ def _read_toml (self ) -> TOMLDocument :
132+ try :
133+ fp = self ._get_toml_file_pointer ()
134+ except FileNotFoundError :
135+ if not self ._enforce_file and self ._file_path == DEFAULT_PROJECT_TOML_FILE :
136+ return TOMLDocument ()
137+ raise TOMLError (f"Error reading toml from { self ._file_path } " ) from None
138+
139+ with open (fp , "r" ) as f :
140+ content = parse (f .read ())
141+ return parse (dumps (content ))
49142
50- def _get_twyn_data (self ) -> dict [str , Any ]:
51- return self . _toml .get ("tool" , {}).get ("twyn" , {})
143+ def _get_twyn_data_from_toml (self , toml : TOMLDocument ) -> dict [str , Any ]:
144+ return toml .get ("tool" , {}).get ("twyn" , {})
52145
53146 def _get_toml_file_pointer (self ) -> Path :
54147 """Create a path for the toml file with the format <current working directory>/self.file_path."""
@@ -59,32 +152,35 @@ def _get_toml_file_pointer(self) -> Path:
59152
60153 return fp
61154
62- def _write_toml (self ) -> None :
155+ def _write_toml (self , toml : TOMLDocument ) -> None :
63156 with open (self ._get_toml_file_pointer (), "w" ) as f :
64- f .write (dumps (self ._toml ))
65-
66- def _get_toml_as_dict (self ) -> dict [str , Any ]:
67- """Read TOML into a dictionary."""
68- try :
69- fp = self ._get_toml_file_pointer ()
70- except FileNotFoundError :
71- if not self ._enforce_file and self ._file_path == DEFAULT_PROJECT_TOML_FILE :
72- return {}
73- raise
74-
75- with open (fp , "r" ) as f :
76- content = parse (f .read ())
77- return parse (dumps (content ))
78-
79- def _create_allowlist_in_toml_if_not_exists (self ) -> None :
80- try :
81- isinstance (self ._toml ["tool" ]["twyn" ]["allowlist" ], list )
82- except KeyError :
83- if "tool" not in self ._toml :
84- self ._toml ["tool" ] = {}
85-
86- if "twyn" not in self ._toml ["tool" ]:
87- self ._toml ["tool" ]["twyn" ] = {}
88-
89- if "allowlist" not in self ._toml ["tool" ]["twyn" ]:
90- self ._toml ["tool" ]["twyn" ]["allowlist" ] = []
157+ try :
158+ f .write (dumps (toml ))
159+ except Exception :
160+ logger .exception ("Error writing toml file" )
161+ raise TOMLError (f"Error writing toml to { self ._file_path } " ) from None
162+
163+
164+ def _get_logging_level (
165+ cli_verbosity : AvailableLoggingLevels ,
166+ config_logging_level : Optional [str ],
167+ ) -> AvailableLoggingLevels :
168+ """Return the appropriate logging level, considering that the one in config has less priority than the one passed directly."""
169+ if cli_verbosity is AvailableLoggingLevels .none :
170+ if config_logging_level :
171+ return AvailableLoggingLevels [config_logging_level .lower ()]
172+ else :
173+ # default logging level
174+ return AvailableLoggingLevels .warning
175+ return cli_verbosity
176+
177+
178+ def _serialize_config (x ):
179+ def _value_to_for_config (v ):
180+ if isinstance (v , Enum ):
181+ return v .name
182+ elif isinstance (v , set ):
183+ return list (v )
184+ return v
185+
186+ return {k : _value_to_for_config (v ) for (k , v ) in x if v is not None and v != set ()}
0 commit comments