diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fc652a..febc0de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [Version 1.4.1](https://github.com/dataiku/dss-plugin-pi-server/releases/tag/v1.4.1) - Feature release - 2025-11-26 + +- Add a AF hierarchy downloader + ## [Version 1.4.0](https://github.com/dataiku/dss-plugin-pi-server/releases/tag/v1.4.0) - Feature release - 2025-09-22 - Add write recipe diff --git a/plugin.json b/plugin.json index 517456c..27db1a9 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id": "pi-system", - "version": "1.4.0", + "version": "1.4.1", "meta": { "label": "PI System", "description": "Retrieve data from your OSIsoft PI System servers", diff --git a/python-connectors/pi-system_hierarchy/connector.json b/python-connectors/pi-system_hierarchy/connector.json index 4185155..f66a4ed 100644 --- a/python-connectors/pi-system_hierarchy/connector.json +++ b/python-connectors/pi-system_hierarchy/connector.json @@ -21,7 +21,7 @@ "label": " ", "type": "BOOLEAN", "description": "Show advanced parameters", - "defaultValue": false + "defaultValue": true }, { "name": "server_url", @@ -60,15 +60,15 @@ "label": " ", "type": "BOOLEAN", "description": "Use batch mode", - "visibilityCondition": "model.show_advanced_parameters==true && model.must_retrieve_metrics==true", - "defaultValue": false + "visibilityCondition": "model.show_advanced_parameters==true", + "defaultValue": true }, { "name": "batch_size", "label": " ", "type": "INT", "description": "Batch size", - "visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true && model.must_retrieve_metrics==true", + "visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true", "minI": 1, "defaultValue": 500 }, diff --git a/python-connectors/pi-system_hierarchy/connector.py b/python-connectors/pi-system_hierarchy/connector.py index 030d73b..fac8d13 100644 --- a/python-connectors/pi-system_hierarchy/connector.py +++ b/python-connectors/pi-system_hierarchy/connector.py @@ -1,3 +1,4 @@ +import datetime from dataiku.connector import Connector from osisoft_client import OSIsoftClient from safe_logger import SafeLogger @@ -31,6 +32,8 @@ def __init__(self, config, plugin_config): is_debug_mode=is_debug_mode, network_timer=self.network_timer ) + self.use_batch_mode = config.get("use_batch_mode", False) + self.batch_size = config.get("batch_size", 500) def get_read_schema(self): return None @@ -38,15 +41,27 @@ def get_read_schema(self): def generate_rows(self, dataset_schema=None, dataset_partitioning=None, partition_id=None, records_limit = -1): limit = RecordsLimit(records_limit) + start_time = datetime.datetime.now() headers = self.client.get_requests_headers() json_response = self.client.get(url=self.database_endpoint, headers=headers, params={}, error_source="traverse") - next_url = self.client.extract_link_with_key(json_response, "Elements") + server_name = json_response.get("ExtendedProperties", {}).get("DefaultPIServer", {}).get("Value", "Unknown server name") - for item in self.recurse_next_item(next_url): - if limit.is_reached(): - break - yield item + if self.use_batch_mode: + for item in self.batch_next_item(json_response, parent=server_name, type="Database"): + if limit.is_reached(): + break + yield item + else: + next_url = self.client.extract_link_with_key(json_response, "Elements") + for item in self.recurse_next_item(next_url): + if limit.is_reached(): + break + yield item + end_time = datetime.datetime.now() + duration = end_time - start_time + logger.info("generate_rows overall duration = {}s".format(duration.microseconds/1000000 + duration.seconds)) + logger.info("Network timer:{}".format(self.network_timer.get_report())) def recurse_next_item(self, next_url, parent=None, type=None): logger.info("recurse_next_item") @@ -82,6 +97,151 @@ def recurse_next_item(self, next_url, parent=None, type=None): "Id": item.get("Id") } + def batch_next_item(self, next_item, parent=None, type=None): + todo_list = [] + todo_list.append( + { + "url": self.client.extract_link_with_key(next_item, "Elements"), + "parent": "\\\\" + parent + "\\" + next_item.get("Name"), + "type": "Database" + } + ) + batch_requests_parameters= [] + parent_of_batched_items = [] + while todo_list: + item = todo_list.pop() + request_kwargs = { + "url": item.get("url"), + "headers": self.client.get_requests_headers() + } + batch_requests_parameters.append(request_kwargs) + parent_of_batched_items.append(item.get("parent")) + if not todo_list or len(batch_requests_parameters) > self.batch_size: + json_responses = self.client._batch_requests(batch_requests_parameters) + batch_requests_parameters = [] + for parent_of_batched_item, json_response in zip(parent_of_batched_items, json_responses): + response_content = json_response.get("Content", {}) + links = response_content.get("Links", {}) + next_link = links.get("Next", {}) + # do something if there is a next link... + if next_link: + todo_list.append( + { + "url": next_link + } + ) + retrieved_items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) + for retrieved_item in retrieved_items: + retrieved_item_path = retrieved_item.get("Path") + elements_url = self.client.extract_link_with_key(retrieved_item, "Elements") + attributes_url = self.client.extract_link_with_key(retrieved_item, "Attributes") + if elements_url: + todo_list.append( + { + "url": elements_url, + "type": "Element", + "parent": parent_of_batched_item + "\\" + retrieved_item.get("Name") + } + ) + if attributes_url: + todo_list.append( + { + "url": attributes_url, + "type": "Attribute", + "parent": parent_of_batched_item + "\\" + retrieved_item.get("Name") + } + ) + yield { + "ItemType": type, + "Name": retrieved_item.get("Name"), + "Type": retrieved_item.get("Type"), + "Description": retrieved_item.get("Description"), + "Path": retrieved_item.get("Path"), + "LinkPath": "{}\\{}".format(parent_of_batched_item, retrieved_item.get("Name")), + "Parent": parent_of_batched_item, + "DefaultUnitsName": retrieved_item.get("DefaultUnitsName"), + "TemplateName": retrieved_item.get("TemplateName"), + "CategoryNames": retrieved_item.get("CategoryNames"), + "ExtendedProperties": retrieved_item.get("ExtendedProperties"), + "Step": retrieved_item.get("Step"), + "WebId": retrieved_item.get("WebId"), + "Id": retrieved_item.get("Id") + } + parent_of_batched_items = [] + + + def batch_recurse_next_item(self, next_items, parents=None, type=None): + # logger.info("batch_recurse_next_item") + if not isinstance(next_items, list): + next_items = [next_items] + if not isinstance(parents, list): + parents = [parents] + batch_requests_parameters= [] + types = [] + items_parents_names = [] + for next_item in next_items: + next_item_name = next_item.get("Path") + next_elements_url = self.client.extract_link_with_key(next_item, "Elements") + if next_elements_url: + request_kwargs = { + "url": next_elements_url, + "headers": self.client.get_requests_headers() + } + batch_requests_parameters.append(request_kwargs) + types.append("Element") + items_parents_names.append(next_item_name) + next_attributes_url = self.client.extract_link_with_key(next_item, "Attributes") + if next_attributes_url: + request_kwargs = { + "url": next_attributes_url, + "headers": self.client.get_requests_headers() + } + batch_requests_parameters.append(request_kwargs) + types.append("Attribute") + items_parents_names.append(next_item_name) + if batch_requests_parameters: + json_responses = self.client._batch_requests(batch_requests_parameters) + # for json_response in json_responses: + # # Here we process recurse based on each response in the batch + # # Instead we could process all responses and batch all of them in one go... + # response_content = json_response.get("Content", {}) + # if OSIsoftConstants.DKU_ERROR_KEY in response_content: + # # Do something ? + # pass + # items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) + # batched_items = self.batch_recurse_next_item(items) + # for item in batched_items: + # yield item + # approach 2: + next_batch_items = [] + for json_response in json_responses: + response_content = json_response.get("Content", {}) + links = response_content.get("Links", {}) + next_link = links.get("Next", {}) + # do something if there is a next link... + items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) + next_batch_items.extend(items) + batched_items = self.batch_recurse_next_item(next_batch_items, parents=items_parents_names) + for item in batched_items: + yield item + + for item, parent in zip(next_items, parents): + yield { + "ItemType": type, + "Name": item.get("Name"), + "Type": item.get("Type"), + "Description": item.get("Description"), + "Path": item.get("Path"), + "Parent": parent, + "DefaultUnitsName": item.get("DefaultUnitsName"), + "TemplateName": item.get("TemplateName"), + "CategoryNames": item.get("CategoryNames"), + "ExtendedProperties": item.get("ExtendedProperties"), + "Step": item.get("Step"), + "WebId": item.get("WebId"), + "Id": item.get("Id") + } + def get_writer(self, dataset_schema=None, dataset_partitioning=None, partition_id=None, write_mode="OVERWRITE"): raise NotImplementedError diff --git a/python-lib/osisoft_constants.py b/python-lib/osisoft_constants.py index 20c8efa..ce9b74a 100644 --- a/python-lib/osisoft_constants.py +++ b/python-lib/osisoft_constants.py @@ -405,7 +405,7 @@ class OSIsoftConstants(object): "Security": "{base_url}/eventframes/{webid}/security", "SecurityEntries": "{base_url}/eventframes/{webid}/securityentries" } - PLUGIN_VERSION = "1.4.0-beta.1" + PLUGIN_VERSION = "1.4.1-beta.1" VALUE_COLUMN_SUFFIX = "_val" WEB_API_PATH = "piwebapi" WRITE_HEADERS = {'X-Requested-With': 'XmlHttpRequest'}