1
1
import dataclasses
2
+ import logging
2
3
import re
3
4
from dataclasses import dataclass
4
5
from pathlib import Path
5
- from typing import Any , Dict , Iterable , List , Optional , Tuple
6
+ from typing import Any , Dict , Iterable , List , Optional , Protocol , Tuple , Type , TypeVar
6
7
7
8
from pip ._vendor import tomli_w
9
+ from pip ._vendor .packaging .version import InvalidVersion , Version
8
10
from pip ._vendor .typing_extensions import Self
9
11
10
12
from pip ._internal .models .direct_url import ArchiveInfo , DirInfo , VcsInfo
11
13
from pip ._internal .models .link import Link
12
14
from pip ._internal .req .req_install import InstallRequirement
13
15
from pip ._internal .utils .urls import url_to_path
14
16
17
+ T = TypeVar ("T" )
18
+
19
+
20
+ class PylockDataClass (Protocol ):
21
+ @classmethod
22
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
23
+ pass
24
+
25
+
26
+ PylockDataClassT = TypeVar ("PylockDataClassT" , bound = PylockDataClass )
27
+
15
28
PYLOCK_FILE_NAME_RE = re .compile (r"^pylock\.([^.]+)\.toml$" )
16
29
17
30
@@ -23,52 +36,199 @@ def _toml_dict_factory(data: List[Tuple[str, Any]]) -> Dict[str, Any]:
23
36
return {key .replace ("_" , "-" ): value for key , value in data if value is not None }
24
37
25
38
39
+ def _get (
40
+ d : Dict [str , Any ], expected_type : Type [T ], key : str , default : Optional [T ] = None
41
+ ) -> Optional [T ]:
42
+ """Get value from dictionary and verify expected type."""
43
+ if key not in d :
44
+ return default
45
+ value = d [key ]
46
+ if not isinstance (value , expected_type ):
47
+ raise PylockValidationError (
48
+ f"{ value !r} has unexpected type for { key } (expected { expected_type } )"
49
+ )
50
+ return value
51
+
52
+
53
+ def _get_required (d : Dict [str , Any ], expected_type : Type [T ], key : str ) -> T :
54
+ """Get required value from dictionary and verify expected type."""
55
+ value = _get (d , expected_type , key )
56
+ if value is None :
57
+ raise PylockRequiredKeyError (key )
58
+ return value
59
+
60
+
61
+ def _get_object (
62
+ d : Dict [str , Any ], expected_type : Type [PylockDataClassT ], key : str
63
+ ) -> Optional [PylockDataClassT ]:
64
+ """Get dictionary value from dictionary and convert to dataclass."""
65
+ if key not in d :
66
+ return None
67
+ value = d [key ]
68
+ if not isinstance (value , dict ):
69
+ raise PylockValidationError (f"{ key !r} is not a dictionary" )
70
+ return expected_type .from_dict (value )
71
+
72
+
73
+ def _get_list_of_objects (
74
+ d : Dict [str , Any ], expected_type : Type [PylockDataClassT ], key : str
75
+ ) -> Optional [List [PylockDataClassT ]]:
76
+ """Get list value from dictionary and convert items to dataclass."""
77
+ if key not in d :
78
+ return None
79
+ value = d [key ]
80
+ if not isinstance (value , list ):
81
+ raise PylockValidationError (f"{ key !r} is not a list" )
82
+ result = []
83
+ for i , item in enumerate (value ):
84
+ if not isinstance (item , dict ):
85
+ raise PylockValidationError (
86
+ f"Item { i } in table { key !r} is not a dictionary"
87
+ )
88
+ result .append (expected_type .from_dict (item ))
89
+ return result
90
+
91
+
92
+ def _get_required_list_of_objects (
93
+ d : Dict [str , Any ], expected_type : Type [PylockDataClassT ], key : str
94
+ ) -> List [PylockDataClassT ]:
95
+ """Get required list value from dictionary and convert items to dataclass."""
96
+ result = _get_list_of_objects (d , expected_type , key )
97
+ if result is None :
98
+ raise PylockRequiredKeyError (key )
99
+ return result
100
+
101
+
102
+ def _validate_exactly_one_of (o : object , attrs : List [str ]) -> None :
103
+ """Validate that exactly one of the attributes is truthy."""
104
+ count = 0
105
+ for attr in attrs :
106
+ if getattr (o , attr ):
107
+ count += 1
108
+ if count != 1 :
109
+ raise PylockValidationError (f"Exactly one of { ', ' .join (attrs )} must be set" )
110
+
111
+
112
+ class PylockValidationError (Exception ):
113
+ pass
114
+
115
+
116
+ class PylockRequiredKeyError (PylockValidationError ):
117
+ def __init__ (self , key : str ) -> None :
118
+ super ().__init__ (f"Missing required key { key !r} " )
119
+ self .key = key
120
+
121
+
122
+ class PylockUnsupportedVersionError (PylockValidationError ):
123
+ pass
124
+
125
+
26
126
@dataclass
27
127
class PackageVcs :
28
128
type : str
29
129
url : Optional [str ]
30
- # (not supported) path: Optional[str]
130
+ path : Optional [str ]
31
131
requested_revision : Optional [str ]
32
132
commit_id : str
33
133
subdirectory : Optional [str ]
34
134
135
+ def __post_init__ (self ) -> None :
136
+ # TODO validate supported vcs type
137
+ _validate_exactly_one_of (self , ["url" , "path" ])
138
+
139
+ @classmethod
140
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
141
+ return cls (
142
+ type = _get_required (d , str , "type" ),
143
+ url = _get (d , str , "url" ),
144
+ path = _get (d , str , "path" ),
145
+ requested_revision = _get (d , str , "requested-revision" ),
146
+ commit_id = _get_required (d , str , "commit-id" ),
147
+ subdirectory = _get (d , str , "subdirectory" ),
148
+ )
149
+
35
150
36
151
@dataclass
37
152
class PackageDirectory :
38
153
path : str
39
154
editable : Optional [bool ]
40
155
subdirectory : Optional [str ]
41
156
157
+ @classmethod
158
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
159
+ return cls (
160
+ path = _get_required (d , str , "path" ),
161
+ editable = _get (d , bool , "editable" ),
162
+ subdirectory = _get (d , str , "subdirectory" ),
163
+ )
164
+
42
165
43
166
@dataclass
44
167
class PackageArchive :
45
168
url : Optional [str ]
46
- # (not supported) path: Optional[str]
169
+ path : Optional [str ]
47
170
# (not supported) size: Optional[int]
48
171
# (not supported) upload_time: Optional[datetime]
49
172
hashes : Dict [str , str ]
50
173
subdirectory : Optional [str ]
51
174
175
+ def __post_init__ (self ) -> None :
176
+ _validate_exactly_one_of (self , ["url" , "path" ])
177
+
178
+ @classmethod
179
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
180
+ return cls (
181
+ url = _get (d , str , "url" ),
182
+ path = _get (d , str , "path" ),
183
+ hashes = _get_required (d , dict , "hashes" ),
184
+ subdirectory = _get (d , str , "subdirectory" ),
185
+ )
186
+
52
187
53
188
@dataclass
54
189
class PackageSdist :
55
190
name : str
56
191
# (not supported) upload_time: Optional[datetime]
57
192
url : Optional [str ]
58
- # (not supported) path: Optional[str]
193
+ path : Optional [str ]
59
194
# (not supported) size: Optional[int]
60
195
hashes : Dict [str , str ]
61
196
197
+ def __post_init__ (self ) -> None :
198
+ _validate_exactly_one_of (self , ["url" , "path" ])
199
+
200
+ @classmethod
201
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
202
+ return cls (
203
+ name = _get_required (d , str , "name" ),
204
+ url = _get (d , str , "url" ),
205
+ path = _get (d , str , "path" ),
206
+ hashes = _get_required (d , dict , "hashes" ),
207
+ )
208
+
62
209
63
210
@dataclass
64
211
class PackageWheel :
65
212
name : str
66
213
# (not supported) upload_time: Optional[datetime]
67
214
url : Optional [str ]
68
- # (not supported) path: Optional[str]
215
+ path : Optional [str ]
69
216
# (not supported) size: Optional[int]
70
217
hashes : Dict [str , str ]
71
218
219
+ def __post_init__ (self ) -> None :
220
+ _validate_exactly_one_of (self , ["url" , "path" ])
221
+
222
+ @classmethod
223
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
224
+ wheel = cls (
225
+ name = _get_required (d , str , "name" ),
226
+ url = _get (d , str , "url" ),
227
+ path = _get (d , str , "path" ),
228
+ hashes = _get_required (d , dict , "hashes" ),
229
+ )
230
+ return wheel
231
+
72
232
73
233
@dataclass
74
234
class Package :
@@ -86,24 +246,48 @@ class Package:
86
246
# (not supported) attestation_identities: Optional[List[Dict[str, Any]]]
87
247
# (not supported) tool: Optional[Dict[str, Any]]
88
248
249
+ def __post_init__ (self ) -> None :
250
+ _validate_exactly_one_of (
251
+ self , ["vcs" , "directory" , "archive" , "sdist" , "wheels" ]
252
+ )
253
+
254
+ @classmethod
255
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
256
+ package = cls (
257
+ name = _get_required (d , str , "name" ),
258
+ version = _get (d , str , "version" ),
259
+ vcs = _get_object (d , PackageVcs , "vcs" ),
260
+ directory = _get_object (d , PackageDirectory , "directory" ),
261
+ archive = _get_object (d , PackageArchive , "archive" ),
262
+ sdist = _get_object (d , PackageSdist , "sdist" ),
263
+ wheels = _get_list_of_objects (d , PackageWheel , "wheels" ),
264
+ )
265
+ return package
266
+
89
267
@classmethod
90
268
def from_install_requirement (cls , ireq : InstallRequirement , base_dir : Path ) -> Self :
91
269
base_dir = base_dir .resolve ()
92
270
dist = ireq .get_dist ()
93
271
download_info = ireq .download_info
94
272
assert download_info
95
- package = cls (name = dist .canonical_name )
273
+ package_version = None
274
+ package_vcs = None
275
+ package_directory = None
276
+ package_archive = None
277
+ package_sdist = None
278
+ package_wheels = None
96
279
if ireq .is_direct :
97
280
if isinstance (download_info .info , VcsInfo ):
98
- package . vcs = PackageVcs (
281
+ package_vcs = PackageVcs (
99
282
type = download_info .info .vcs ,
100
283
url = download_info .url ,
284
+ path = None ,
101
285
requested_revision = download_info .info .requested_revision ,
102
286
commit_id = download_info .info .commit_id ,
103
287
subdirectory = download_info .subdirectory ,
104
288
)
105
289
elif isinstance (download_info .info , DirInfo ):
106
- package . directory = PackageDirectory (
290
+ package_directory = PackageDirectory (
107
291
path = (
108
292
Path (url_to_path (download_info .url ))
109
293
.resolve ()
@@ -120,38 +304,49 @@ def from_install_requirement(cls, ireq: InstallRequirement, base_dir: Path) -> S
120
304
elif isinstance (download_info .info , ArchiveInfo ):
121
305
if not download_info .info .hashes :
122
306
raise NotImplementedError ()
123
- package . archive = PackageArchive (
307
+ package_archive = PackageArchive (
124
308
url = download_info .url ,
309
+ path = None ,
125
310
hashes = download_info .info .hashes ,
126
311
subdirectory = download_info .subdirectory ,
127
312
)
128
313
else :
129
314
# should never happen
130
315
raise NotImplementedError ()
131
316
else :
132
- package . version = str (dist .version )
317
+ package_version = str (dist .version )
133
318
if isinstance (download_info .info , ArchiveInfo ):
134
319
if not download_info .info .hashes :
135
320
raise NotImplementedError ()
136
321
link = Link (download_info .url )
137
322
if link .is_wheel :
138
- package . wheels = [
323
+ package_wheels = [
139
324
PackageWheel (
140
325
name = link .filename ,
141
326
url = download_info .url ,
327
+ path = None ,
142
328
hashes = download_info .info .hashes ,
143
329
)
144
330
]
145
331
else :
146
- package . sdist = PackageSdist (
332
+ package_sdist = PackageSdist (
147
333
name = link .filename ,
148
334
url = download_info .url ,
335
+ path = None ,
149
336
hashes = download_info .info .hashes ,
150
337
)
151
338
else :
152
339
# should never happen
153
340
raise NotImplementedError ()
154
- return package
341
+ return cls (
342
+ name = dist .canonical_name ,
343
+ version = package_version ,
344
+ vcs = package_vcs ,
345
+ directory = package_directory ,
346
+ archive = package_archive ,
347
+ sdist = package_sdist ,
348
+ wheels = package_wheels ,
349
+ )
155
350
156
351
157
352
@dataclass
@@ -165,8 +360,38 @@ class Pylock:
165
360
packages : List [Package ] = dataclasses .field (default_factory = list )
166
361
# (not supported) tool: Optional[Dict[str, Any]]
167
362
363
+ def _validate_version (self ) -> None :
364
+ if not self .lock_version :
365
+ raise PylockRequiredKeyError ("lock-version" )
366
+ try :
367
+ lock_version = Version (self .lock_version )
368
+ except InvalidVersion :
369
+ raise PylockUnsupportedVersionError (
370
+ f"invalid pylock version { self .lock_version !r} "
371
+ )
372
+ if lock_version < Version ("1" ) or lock_version >= Version ("2" ):
373
+ raise PylockUnsupportedVersionError (
374
+ f"pylock version { lock_version } is not supported"
375
+ )
376
+ if lock_version > Version ("1.0" ):
377
+ logging .warning ("pylock minor version %s is not supported" , lock_version )
378
+
379
+ def __post_init__ (self ) -> None :
380
+ self ._validate_version ()
381
+
168
382
def as_toml (self ) -> str :
169
- return tomli_w .dumps (dataclasses .asdict (self , dict_factory = _toml_dict_factory ))
383
+ return tomli_w .dumps (self .to_dict ())
384
+
385
+ def to_dict (self ) -> Dict [str , Any ]:
386
+ return dataclasses .asdict (self , dict_factory = _toml_dict_factory )
387
+
388
+ @classmethod
389
+ def from_dict (cls , d : Dict [str , Any ]) -> Self :
390
+ return cls (
391
+ lock_version = _get_required (d , str , "lock-version" ),
392
+ created_by = _get_required (d , str , "created-by" ),
393
+ packages = _get_required_list_of_objects (d , Package , "packages" ),
394
+ )
170
395
171
396
@classmethod
172
397
def from_install_requirements (
0 commit comments