66import json
77import logging
88import os
9+ import subprocess
910import sys
11+ from collections .abc import Callable
1012from dataclasses import asdict , dataclass , replace
1113from pathlib import Path
1214from typing import Any , Dict , Iterable , List , Optional , Set , Tuple
@@ -42,20 +44,20 @@ def _read_cached_index(path: Path) -> Tuple[datetime.datetime, Any]:
4244 return cache_date , data
4345
4446
45- def _write_index_to_cache (data : Any , path : Path ):
47+ def _write_index_to_cache (data : Any , path : Path ) -> None :
4648 j = json .loads (data )
4749 j ["cache_date" ] = datetime .datetime .now ().isoformat ()
4850 with open (path , "w" ) as f :
4951 json .dump (j , f , indent = 2 )
5052
5153
52- def _fetch_remote_index ():
54+ def _fetch_remote_index () -> Any :
5355 r = Request (INDEX_URL )
5456 with urlopen (r ) as resp :
5557 return resp .read ()
5658
5759
58- def get_extension_index (cache_dir : Path ) -> Set [ Ext ] :
60+ def get_extension_index (cache_dir : Path ) -> Any :
5961 index_file = cache_dir / "index.json"
6062 os .makedirs (cache_dir , exist_ok = True )
6163
@@ -154,7 +156,7 @@ def _filter_invalid(o: Dict[str, Any]) -> bool:
154156
155157def _filter_compatible (o : Dict [str , Any ], cli_version : Version ) -> bool :
156158 minCliVersion = parse (o ["metadata" ]["azext.minCliCoreVersion" ])
157- return cli_version >= minCliVersion
159+ return bool ( cli_version >= minCliVersion )
158160
159161
160162def _transform_dict_to_obj (o : Dict [str , Any ]) -> Ext :
@@ -211,6 +213,93 @@ def _filter_updated(e: Tuple[Ext, Ext]) -> bool:
211213 return prev != new
212214
213215
216+ @dataclass (frozen = True )
217+ class AttrPos :
218+ file : str
219+ line : int
220+ column : int
221+
222+
223+ def nix_get_value (attr_path : str ) -> Optional [str ]:
224+ try :
225+ output = (
226+ subprocess .run (
227+ [
228+ "nix-instantiate" ,
229+ "--eval" ,
230+ "--strict" ,
231+ "--json" ,
232+ "-E" ,
233+ f"with import ./. {{ }}; { attr_path } " ,
234+ ],
235+ stdout = subprocess .PIPE ,
236+ text = True ,
237+ check = True ,
238+ )
239+ .stdout .rstrip ()
240+ .strip ('"' )
241+ )
242+ except subprocess .CalledProcessError as e :
243+ logger .error ("failed to nix-instantiate: %s" , e )
244+ return None
245+ return output
246+
247+
248+ def nix_unsafe_get_attr_pos (attr : str , attr_path : str ) -> Optional [AttrPos ]:
249+ try :
250+ output = subprocess .run (
251+ [
252+ "nix-instantiate" ,
253+ "--eval" ,
254+ "--strict" ,
255+ "--json" ,
256+ "-E" ,
257+ f'with import ./. {{ }}; (builtins.unsafeGetAttrPos "{ attr } " { attr_path } )' ,
258+ ],
259+ stdout = subprocess .PIPE ,
260+ text = True ,
261+ check = True ,
262+ ).stdout .rstrip ()
263+ except subprocess .CalledProcessError as e :
264+ logger .error ("failed to unsafeGetAttrPos: %s" , e )
265+ return None
266+ if output == "null" :
267+ logger .error ("failed to unsafeGetAttrPos: nix-instantiate returned 'null'" )
268+ return None
269+ pos = json .loads (output )
270+ return AttrPos (pos ["file" ], pos ["line" ] - 1 , pos ["column" ])
271+
272+
273+ def edit_file (file : str , rewrite : Callable [[str ], str ]) -> None :
274+ with open (file , "r" ) as f :
275+ lines = f .readlines ()
276+ lines = [rewrite (line ) for line in lines ]
277+ with open (file , "w" ) as f :
278+ f .writelines (lines )
279+
280+
281+ def edit_file_at_pos (pos : AttrPos , rewrite : Callable [[str ], str ]) -> None :
282+ with open (pos .file , "r" ) as f :
283+ lines = f .readlines ()
284+ lines [pos .line ] = rewrite (lines [pos .line ])
285+ with open (pos .file , "w" ) as f :
286+ f .writelines (lines )
287+
288+
289+ def read_value_at_pos (pos : AttrPos ) -> str :
290+ with open (pos .file , "r" ) as f :
291+ lines = f .readlines ()
292+ return value_from_nix_line (lines [pos .line ])
293+
294+
295+ def value_from_nix_line (line : str ) -> str :
296+ return line .split ("=" )[1 ].strip ().strip (";" ).strip ('"' )
297+
298+
299+ def replace_value_in_nix_line (new : str ) -> Callable [[str ], str ]:
300+ return lambda line : line .replace (value_from_nix_line (line ), new )
301+
302+
214303def main () -> None :
215304 sh = logging .StreamHandler (sys .stderr )
216305 sh .setFormatter (
@@ -247,6 +336,7 @@ def main() -> None:
247336 help = "whether to commit changes to git" ,
248337 )
249338 args = parser .parse_args ()
339+ cli_version = parse (args .cli_version )
250340
251341 repo = git .Repo (Path ("." ).resolve (), search_parent_directories = True )
252342 # Workaround for https://github.com/gitpython-developers/GitPython/issues/1923
@@ -258,7 +348,57 @@ def main() -> None:
258348 assert index ["formatVersion" ] == "1" # only support formatVersion 1
259349 extensions_remote = index ["extensions" ]
260350
261- cli_version = parse (args .cli_version )
351+ if args .extension :
352+ logger .info (f"updating extension: { args .extension } " )
353+
354+ ext = Optional [Ext ]
355+ for _ext_name , extension in extensions_remote .items ():
356+ extension = processExtension (
357+ extension , cli_version , args .extension , requirements = True
358+ )
359+ if extension :
360+ ext = extension
361+ break
362+ if not ext :
363+ logger .error (f"Extension { args .extension } not found in index" )
364+ exit (1 )
365+
366+ version_pos = nix_unsafe_get_attr_pos (
367+ "version" , f"azure-cli-extensions.{ ext .pname } "
368+ )
369+ if not version_pos :
370+ logger .error (
371+ f"no position for attribute 'version' found on attribute path { ext .pname } "
372+ )
373+ exit (1 )
374+ version = read_value_at_pos (version_pos )
375+ current_version = parse (version )
376+
377+ if ext .version == current_version :
378+ logger .info (
379+ f"no update needed for { ext .pname } , latest version is { ext .version } "
380+ )
381+ return
382+ logger .info ("updated extensions:" )
383+ logger .info (f" { ext .pname } { current_version } -> { ext .version } " )
384+ edit_file_at_pos (version_pos , replace_value_in_nix_line (str (ext .version )))
385+
386+ current_hash = nix_get_value (f"azure-cli-extensions.{ ext .pname } .src.outputHash" )
387+ if not current_hash :
388+ logger .error (
389+ f"no attribute 'src.outputHash' found on attribute path { ext .pname } "
390+ )
391+ exit (1 )
392+ edit_file (version_pos .file , lambda line : line .replace (current_hash , ext .hash ))
393+
394+ if args .commit :
395+ commit_msg = (
396+ f"azure-cli-extensions.{ ext .pname } : { current_version } -> { ext .version } "
397+ )
398+ _commit (repo , commit_msg , [Path (version_pos .file )], actor )
399+ return
400+
401+ logger .info ("updating generated extension set" )
262402
263403 extensions_remote_filtered = set ()
264404 for _ext_name , extension in extensions_remote .items ():
0 commit comments