1+ from dataclasses import dataclass , field
12from typing import Any
2- from ruamel .yaml import YAML
33
44from pipeline_migration .types import FilePath
55
6+ from ruamel .yaml import YAML , CommentedMap , CommentedSeq
7+ from ruamel .yaml .comments import CommentedBase
8+
9+
10+ __all__ = [
11+ "BlockSequenceIndentation" ,
12+ "create_yaml_obj" ,
13+ "dump_yaml" ,
14+ "is_true" ,
15+ "load_yaml" ,
16+ "YAMLStyle" ,
17+ ]
18+
619
720def is_true (value : str ) -> bool :
821 return value .strip ().lower () == "true"
922
1023
11- def create_yaml_obj ():
24+ @dataclass
25+ class BlockSequenceIndentation :
26+ indentations : dict [int , int ] = field (default_factory = dict )
27+
28+ @property
29+ def is_consistent (self ) -> bool :
30+ return len (self .levels ) == 1
31+
32+ @property
33+ def levels (self ) -> list [int ]:
34+ return list (self .indentations .keys ())
35+
36+
37+ @dataclass
38+ class _NodePath :
39+ node : CommentedBase
40+ field : str = ""
41+
42+
43+ @dataclass
44+ class YAMLStyle :
45+ indentation : BlockSequenceIndentation
46+
47+ preserve_quotes : bool = True
48+ width : int = 8192
49+
50+ @classmethod
51+ def _detect_block_sequence_indentation (cls , node : CommentedBase ) -> BlockSequenceIndentation :
52+
53+ parent_nodes : list [_NodePath ] = []
54+ block_seq_indentations = BlockSequenceIndentation ()
55+ indentations = block_seq_indentations .indentations
56+
57+ def _walk (node : CommentedBase ) -> None :
58+ if isinstance (node , CommentedMap ):
59+ parent_nodes .append (_NodePath (node = node ))
60+ for key , value in node .items ():
61+ parent_nodes [- 1 ].field = key
62+ _walk (value )
63+ parent_nodes .pop ()
64+ elif isinstance (node , CommentedSeq ):
65+ levels = node .lc .col - parent_nodes [- 1 ].node .lc .col
66+ if levels in indentations :
67+ indentations [levels ] += 1
68+ else :
69+ indentations [levels ] = 1
70+ for item in node :
71+ _walk (item )
72+
73+ _walk (node )
74+
75+ return block_seq_indentations
76+
77+ @classmethod
78+ def detect (cls , file_path : FilePath ) -> "YAMLStyle" :
79+ doc = load_yaml (file_path )
80+ indentation = cls ._detect_block_sequence_indentation (doc )
81+ return cls (indentation = indentation )
82+
83+
84+ def create_yaml_obj (style : YAMLStyle | None = None ):
1285 yaml = YAML ()
13- yaml .preserve_quotes = True
14- yaml .width = 8192
86+ if style is None :
87+ return yaml
88+
89+ yaml .preserve_quotes = style .preserve_quotes
90+ yaml .width = style .width
91+
92+ offset = 0
93+ if style .indentation .is_consistent :
94+ offset = style .indentation .levels [0 ]
95+ sequence = offset + 2
96+ yaml .indent (sequence = sequence , offset = offset )
97+
1598 return yaml
1699
17100
@@ -20,6 +103,6 @@ def load_yaml(yaml_file: FilePath) -> Any:
20103 return create_yaml_obj ().load (f )
21104
22105
23- def dump_yaml (yaml_file : FilePath , data : Any ) :
106+ def dump_yaml (yaml_file : FilePath , data : Any , style : YAMLStyle | None = None ) -> None :
24107 with open (yaml_file , "w" , encoding = "utf-8" ) as f :
25- create_yaml_obj ().dump (data , f )
108+ create_yaml_obj (style ).dump (data , f )
0 commit comments