|
5 | 5 | from azureml.data.datapath import DataPath |
6 | 6 | import os |
7 | 7 | import glob |
8 | | -from azureml.data.dataset_error_handling import DatasetValidationError, DatasetExecutionError |
9 | | -from azureml.data.dataset_type_definitions import PromoteHeadersBehavior |
10 | 8 | import arcus.azureml.environment.environment as env |
11 | 9 | from arcus.azureml.experimenting import trainer |
12 | 10 | from arcus.azureml.experimenting import aml_trainer |
13 | | -from azureml.dataprep.api.functions import get_portable_path |
14 | | -from azureml.dataprep import col, get_stream_properties, SummaryColumnsValue, SummaryFunction |
| 11 | +from azureml.data.dataset_error_handling import DatasetValidationError, DatasetExecutionError |
| 12 | +from azureml.data.dataset_type_definitions import PromoteHeadersBehavior |
15 | 13 |
|
16 | 14 | class AzureMLEnvironment(env.WorkEnvironment): |
17 | 15 | is_connected: bool = False |
18 | 16 | __config_file: str = '.azureml/config.json' |
19 | 17 | __workspace: Workspace = None |
20 | 18 | __datastore_path: str = 'data' |
21 | 19 |
|
22 | | - def __init__(self, config_file: str = None, datastore_path: str = None, subscription_id: str = None, |
| 20 | + def __init__(self, config_file: str = None, datastore_path: str = None, subscription_id: str = None, connect_workspace: bool = True, |
23 | 21 | resource_group: str = None, workspace_name: str = None, write_config: bool = False, from_context: bool = False): |
24 | 22 | ''' |
25 | 23 | This allows a user to specify to work connected or disconnected. |
@@ -48,58 +46,118 @@ def __init__(self, config_file: str = None, datastore_path: str = None, subscrip |
48 | 46 | if write_config: |
49 | 47 | self.__workspace.write_config(self.__config_file) |
50 | 48 |
|
51 | | - else: |
| 49 | + elif connect_workspace: |
52 | 50 | # A config file is passed, so we'll validate the existance and connect |
53 | 51 | if not os.path.exists(self.__config_file): |
54 | 52 | raise FileNotFoundError('The config file ' + self.__config_file + ' does not exist. Please verify and try again') |
55 | 53 | # There is a config file, so we'll connect |
56 | 54 | self.__connect_from_config_file(self.__config_file) |
57 | 55 |
|
58 | | - self.is_connected = True |
| 56 | + self.is_connected = connect_workspace |
59 | 57 |
|
60 | | - def load_tabular_dataset(self, dataset_name: str) -> pd.DataFrame: |
| 58 | + @classmethod |
| 59 | + def CreateFromContext(cls, datastore_path: str = None): |
| 60 | + ''' |
| 61 | + Creates a WorkEnvironment and returns the correct implementation, based on the configuration |
| 62 | + Args: |
| 63 | + datastore_path (str): the name of a DataStore in AzureML that contains Datasets |
| 64 | + Returns: |
| 65 | + AzureMLEnvironment: an instance of WorkEnvironment allowing the user to work connected. |
| 66 | + ''' |
| 67 | + return cls(datastore_path = datastore_path, from_context=True) |
| 68 | + |
| 69 | + @classmethod |
| 70 | + def Create(cls, subscription_id: str = None, resource_group: str = None, workspace_name: str = None, |
| 71 | + write_config: bool = False, config_file: str = None, datastore_path: str = None): |
| 72 | + ''' |
| 73 | + Creates a WorkEnvironment and returns the correct implementation, based on the configuration |
| 74 | + Args: |
| 75 | + subscription_id (str): The subscription id where the AzureML service resides |
| 76 | + resource_group (str): The resource group that contains the AzureML workspace |
| 77 | + workspace_name (str): Name of the AzureML workspace |
| 78 | + write_config (bool): If True, the WorkSpace configuration will be persisted in the given (or default) config file |
| 79 | + config_file (str): The name of the config file (defaulting to .azureml/config.json) that contains the Workspace parameters |
| 80 | + datastore_path (str): the name of a DataStore in AzureML that contains Datasets |
| 81 | + Returns: |
| 82 | + AzureMLEnvironment: an instance of WorkEnvironment allowing the user to work connected. |
| 83 | + ''' |
| 84 | + return cls(config_file = config_file, datastore_path = datastore_path, |
| 85 | + subscription_id=subscription_id, resource_group=resource_group, |
| 86 | + workspace_name= workspace_name, write_config = write_config) |
| 87 | + |
| 88 | + def load_tabular_dataset(self, dataset_name: str, cloud_storage: bool = True) -> pd.DataFrame: |
61 | 89 | ''' |
62 | | - Loads a tabular dataset by a given name. the implementation will load the Dataset by name from the AzureML Workspace |
| 90 | + Loads a tabular dataset by a given name. |
| 91 | + The implementation will load the Dataset by name from the AzureML Workspace |
| 92 | + When configured locally, the data frame will be loaded from a file in the datastore_path with name {dataset_name}.csv |
63 | 93 | Args: |
64 | 94 | dataset_name (str): The name of the dataset to load |
| 95 | + cloud_storage (bool): When changed to False, the dataset will be loaded from the local folder |
65 | 96 | Returns: |
66 | 97 | pd.DataFrame: The dataset, loaded as a DataFrame |
67 | 98 | ''' |
68 | 99 | # Connecting data set |
69 | | - _dataset = Dataset.get_by_name(self.__workspace, name=dataset_name) |
70 | | - return _dataset.to_pandas_dataframe() |
| 100 | + if cloud_storage: |
| 101 | + _dataset = Dataset.get_by_name(self.__workspace, name=dataset_name) |
| 102 | + return _dataset.to_pandas_dataframe() |
| 103 | + else: |
| 104 | + _file_name = os.path.join(self.__datastore_path, dataset_name + '.csv') |
| 105 | + return pd.read_csv(_file_name) |
71 | 106 |
|
72 | | - def load_tabular_partition(self, partition_name: str, datastore_name: str = None, columns: np.array = None, first_row_header: bool = False) -> pd.DataFrame: |
| 107 | + |
| 108 | + def load_tabular_partition(self, partition_name: str, datastore_name: str = None, columns: np.array = None, first_row_header: bool = False, cloud_storage: bool = True) -> pd.DataFrame: |
73 | 109 | ''' |
74 | | - Loads a partition from a tabular dataset. the implementation will connect to the DataStore and get all delimited files matching the partition_name |
| 110 | + Loads a partition from a tabular dataset. |
| 111 | + The implementation will connect to the DataStore and get all delimited files matching the partition_name |
| 112 | + When configured locally, the implementation will append all files in the datastore_path with name {partition_name}.csv |
75 | 113 | Args: |
76 | 114 | partition_name (str): The name of the partition as a wildcard filter. Example: B* will take all files starting with B, ending with csv |
77 | 115 | columns: (np.array): The column names to assign to the dataframe |
78 | | - datastore_path (str): The name of a DataStore in AzureML that contains Datasets |
| 116 | + datastore_path (str): The name of a DataStore that contains Datasets |
| 117 | + cloud_storage (bool): When changed to False, the dataset will be loaded from the local folder |
79 | 118 | Returns: |
80 | 119 | pd.DataFrame: The dataset, loaded as a DataFrame |
81 | 120 | ''' |
82 | 121 | if not datastore_name: |
83 | 122 | # No datastore name is given, so we'll take the default one |
84 | 123 | datastore_name = self.__datastore_path |
85 | 124 |
|
86 | | - # Connecting data store |
87 | | - datastore = Datastore(self.__workspace, name=datastore_name) |
88 | | - try: |
89 | | - _header = PromoteHeadersBehavior.ALL_FILES_HAVE_SAME_HEADERS if first_row_header else False |
90 | | - _aml_dataset = Dataset.Tabular.from_delimited_files(header=_header, |
91 | | - path=DataPath(datastore, '/' + partition_name + '.csv')) #, set_column_types=columns |
92 | | - _df = _aml_dataset.to_pandas_dataframe() |
93 | | - except DatasetValidationError as dsvalex: |
94 | | - if 'provided path is not valid' in str(dsvalex): |
| 125 | + if cloud_storage: |
| 126 | + # Connecting data store |
| 127 | + datastore = Datastore(self.__workspace, name=datastore_name) |
| 128 | + try: |
| 129 | + _header = PromoteHeadersBehavior.ALL_FILES_HAVE_SAME_HEADERS if first_row_header else False |
| 130 | + _aml_dataset = Dataset.Tabular.from_delimited_files(header=_header, |
| 131 | + path=DataPath(datastore, '/' + partition_name + '.csv')) #, set_column_types=columns |
| 132 | + _df = _aml_dataset.to_pandas_dataframe() |
| 133 | + except DatasetValidationError as dsvalex: |
| 134 | + if 'provided path is not valid' in str(dsvalex): |
| 135 | + return None |
| 136 | + else: |
| 137 | + raise |
| 138 | + else: |
| 139 | + # Reading data from sub files in a folder |
| 140 | + _folder_path = datastore_name |
| 141 | + _partition_files = glob.glob(_folder_path + '/' + partition_name + '.csv') |
| 142 | + _record_found = False |
| 143 | + _df = None |
| 144 | + for filename in _partition_files: |
| 145 | + _header = 0 if first_row_header else None |
| 146 | + df = pd.read_csv(filename, index_col=None, header=_header) |
| 147 | + if not _record_found: |
| 148 | + _df = df |
| 149 | + _record_found = True |
| 150 | + else: |
| 151 | + _df = _df.append(df) |
| 152 | + |
| 153 | + if not _record_found: |
95 | 154 | return None |
96 | | - else: |
97 | | - raise |
| 155 | + |
98 | 156 | if columns != None: |
99 | 157 | _df.columns = columns |
100 | 158 | return _df |
101 | 159 |
|
102 | | - def start_experiment(self, name: str) -> trainer.Trainer: |
| 160 | + def start_experiment(self, name: str) -> aml_trainer.AzureMLTrainer: |
103 | 161 | ''' |
104 | 162 | Creates a new experiment (or connects to an existing one), using the give name |
105 | 163 |
|
@@ -128,6 +186,9 @@ def __print_connection_info(self): |
128 | 186 | print('>> Resource group:', self.__workspace.resource_group) |
129 | 187 |
|
130 | 188 | def capture_filedataset_layout(self, dataset_name: str, output_path: str): |
| 189 | + from azureml.dataprep.api.functions import get_portable_path |
| 190 | + from azureml.dataprep import col, get_stream_properties, SummaryColumnsValue, SummaryFunction |
| 191 | + |
131 | 192 | dataset = self.__workspace.datasets[dataset_name] |
132 | 193 | files_column = 'Path' |
133 | 194 | PORTABLE_PATH = 'PortablePath' |
|
0 commit comments