1+ from typing import List
2+ import pandas as pd
3+
4+ from pathlib import Path
5+
6+ from electricity_network_file_parser .dataclasses import PropertyDescription
7+
8+ class FileParser :
9+ def __init__ (self , file_path ):
10+ self .file_path = Path (file_path )
11+ self .data_frames :dict [str , pd .DataFrame ] = {}
12+
13+ with open (self .file_path , mode = 'r' ) as file :
14+ lines = file .readlines ()
15+
16+ self .entity_dict = self .create_entity_dict (lines )
17+ self .parse_entities_dict = { }
18+
19+ def is_integer (self , s : str ):
20+ to_check = s
21+
22+ if to_check .startswith ('-' ) or to_check .startswith ('+' ):
23+ to_check = to_check [1 :]
24+
25+ return to_check .isdigit ()
26+
27+ def parse_value (self , value ):
28+ if value == 'True' :
29+ return True
30+ elif value == 'False' :
31+ return False
32+ elif self .is_integer (value ):
33+ return int (value )
34+ elif value .startswith ("'" ) and value .endswith ("'" ):
35+ return value [1 :- 1 ]
36+ else :
37+ return float (value .replace ("," , "." ))
38+
39+ def extend_dictionary (self , dict_to_extend : dict , dict_extension : dict ):
40+ for key , value in dict_extension .items ():
41+ dict_to_extend [key ] = value
42+
43+ def parse_property_line (self , property_line : str ):
44+ property_name = property_line [1 : property_line .index (" " )]
45+ property_attributes = property_line [property_line .index (" " ):]
46+ col_name = ''
47+ value = ''
48+ reading_value = False
49+ reading_string = False
50+ property_dict = {}
51+ for char in property_attributes :
52+ if char == ':' and not reading_value :
53+ reading_value = True
54+ elif not reading_value :
55+ col_name += char
56+ elif char == ' ' and not reading_string and value != '' :
57+ property_dict [col_name .strip ()] = self .parse_value (value )
58+ reading_value = False
59+ value = ''
60+ col_name = ''
61+ elif reading_value :
62+ if char == "'" and not reading_string :
63+ reading_string = True
64+ elif char == "'" and reading_string :
65+ reading_string = False
66+ value += char
67+
68+ property_dict [col_name .strip ()] = self .parse_value (value )
69+ return PropertyDescription (property_name , property_dict )
70+
71+ def parse_entities (self , lines : List [str ], property_attributes_to_parse : List [str ]):
72+ parsed_property_types = []
73+ data_instance = {}
74+ data_instances = []
75+ for line in lines :
76+ line_stripped = line .strip ()
77+ property_name = ""
78+ if " " in line_stripped :
79+ property_name = line_stripped [1 : line_stripped .index (" " )]
80+ started_new_entity = property_name in parsed_property_types and property_name == "General"
81+ all_property_types_parsed = len (parsed_property_types ) == len (property_attributes_to_parse )
82+ if started_new_entity or all_property_types_parsed :
83+ if not all_property_types_parsed :
84+ print (f"Not all property types are present for entity { line_stripped } " )
85+ data_instances .append (data_instance )
86+ data_instance = {}
87+ parsed_property_types = []
88+ if property_name in property_attributes_to_parse :
89+ general_properties = self .parse_property_line (line_stripped )
90+ self .extend_dictionary (data_instance , general_properties .property_attributes )
91+ parsed_property_types .append (general_properties .property_type )
92+
93+ if len (data_instance .items ()) > 0 :
94+ data_instances .append (data_instance )
95+
96+ return pd .DataFrame (data_instances )
97+
98+ def create_entity_dict (self , lines ):
99+ entity_indices = [i for i , line in enumerate (lines ) if line .strip ().startswith ("[" ) and line .strip ().endswith ("]" )]
100+ entity_start_indices = [val for i , val in enumerate (entity_indices ) if i % 2 == 0 ]
101+ entity_end_indices = [val for i ,val in enumerate (entity_indices ) if i % 2 != 0 ]
102+
103+ entity_dict = {}
104+
105+ for i in range (0 , len (entity_start_indices )):
106+ entity_name = lines [entity_start_indices [i ]].strip ()[1 :- 1 ]
107+ entity_dict [entity_name ] = lines [entity_start_indices [i ] + 1 :entity_end_indices [i ]]
108+ return entity_dict
109+
110+ def group_data_frame_by_columns (self , df : pd .DataFrame , columns_to_group_by : List [str ]) -> pd .DataFrame :
111+ for col in columns_to_group_by :
112+ df [col ] = df .apply (lambda x , col = col : - 1 if pd .isnull (x [col ]) or pd .isna (x [col ]) else x [col ], axis = 1 )
113+ result = df .groupby (columns_to_group_by ).size ().reset_index ().rename (columns = {0 :'count' })
114+ return result
115+
116+ def get_records_containing_field_values (self , df : pd .DataFrame , fields : dict ) -> pd .DataFrame :
117+ query = " and " .join ([f"{ key } == { value } " for key , value in fields .items ()])
118+ return df .query (query )
119+
120+ def parse_cable_types (self , cables_df : pd .DataFrame ) -> pd .DataFrame :
121+ pass
122+
123+ def write_all_data_frames (self , file_name : str = "data.xlsx" ):
124+ self .data_frames ["CABLETYPE" ] = self .get_cable_type_data_as_dataframe ()
125+ with pd .ExcelWriter (file_name ) as writer :
126+ for name , dataframe in self .data_frames .items ():
127+ dataframe .to_excel (writer , sheet_name = name , index = False )
128+
129+ def parse_file (self ):
130+ for key , value in self .parse_entities_dict .items ():
131+ if key in self .parse_entities_dict .keys () and key in self .entity_dict .keys ():
132+ if key not in self .data_frames .keys ():
133+ self .data_frames [key ] = pd .DataFrame ()
134+ self .data_frames [key ] = pd .concat ([self .data_frames [key ], self .parse_entities (self .entity_dict [key ], value )])
135+ if "CABLE" in self .data_frames .keys ():
136+ self .data_frames ["CABLETYPE" ] = self .parse_cable_types (self .data_frames ["CABLE" ])
137+
138+ def write_all_data_frames (self , file_name : str = "data.xlsx" ):
139+ with pd .ExcelWriter (file_name ) as writer :
140+ for name , dataframe in self .data_frames .items ():
141+ dataframe .to_excel (writer , sheet_name = name , index = False )
0 commit comments