Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## [Version 1.4.1](https://github.com/dataiku/dss-plugin-pi-server/releases/tag/v1.4.1) - Feature release - 2025-11-26

- Add a AF hierarchy downloader

## [Version 1.4.0](https://github.com/dataiku/dss-plugin-pi-server/releases/tag/v1.4.0) - Feature release - 2025-09-22

- Add write recipe
Expand Down
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "pi-system",
"version": "1.4.0",
"version": "1.4.1",
"meta": {
"label": "PI System",
"description": "Retrieve data from your OSIsoft PI System servers",
Expand Down
8 changes: 4 additions & 4 deletions python-connectors/pi-system_hierarchy/connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"label": " ",
"type": "BOOLEAN",
"description": "Show advanced parameters",
"defaultValue": false
"defaultValue": true
},
{
"name": "server_url",
Expand Down Expand Up @@ -60,15 +60,15 @@
"label": " ",
"type": "BOOLEAN",
"description": "Use batch mode",
"visibilityCondition": "model.show_advanced_parameters==true && model.must_retrieve_metrics==true",
"defaultValue": false
"visibilityCondition": "model.show_advanced_parameters==true",
"defaultValue": true
},
{
"name": "batch_size",
"label": " ",
"type": "INT",
"description": "Batch size",
"visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true && model.must_retrieve_metrics==true",
"visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true",
"minI": 1,
"defaultValue": 500
},
Expand Down
170 changes: 165 additions & 5 deletions python-connectors/pi-system_hierarchy/connector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from dataiku.connector import Connector
from osisoft_client import OSIsoftClient
from safe_logger import SafeLogger
Expand Down Expand Up @@ -31,22 +32,36 @@ def __init__(self, config, plugin_config):
is_debug_mode=is_debug_mode,
network_timer=self.network_timer
)
self.use_batch_mode = config.get("use_batch_mode", False)
self.batch_size = config.get("batch_size", 500)

def get_read_schema(self):
return None

def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
partition_id=None, records_limit = -1):
limit = RecordsLimit(records_limit)
start_time = datetime.datetime.now()

headers = self.client.get_requests_headers()
json_response = self.client.get(url=self.database_endpoint, headers=headers, params={}, error_source="traverse")
next_url = self.client.extract_link_with_key(json_response, "Elements")
server_name = json_response.get("ExtendedProperties", {}).get("DefaultPIServer", {}).get("Value", "Unknown server name")

for item in self.recurse_next_item(next_url):
if limit.is_reached():
break
yield item
if self.use_batch_mode:
for item in self.batch_next_item(json_response, parent=server_name, type="Database"):
if limit.is_reached():
break
yield item
else:
next_url = self.client.extract_link_with_key(json_response, "Elements")
for item in self.recurse_next_item(next_url):
if limit.is_reached():
break
yield item
end_time = datetime.datetime.now()
duration = end_time - start_time
logger.info("generate_rows overall duration = {}s".format(duration.microseconds/1000000 + duration.seconds))
logger.info("Network timer:{}".format(self.network_timer.get_report()))

def recurse_next_item(self, next_url, parent=None, type=None):
logger.info("recurse_next_item")
Expand Down Expand Up @@ -82,6 +97,151 @@ def recurse_next_item(self, next_url, parent=None, type=None):
"Id": item.get("Id")
}

def batch_next_item(self, next_item, parent=None, type=None):
todo_list = []
todo_list.append(
{
"url": self.client.extract_link_with_key(next_item, "Elements"),
"parent": "\\\\" + parent + "\\" + next_item.get("Name"),
"type": "Database"
}
)
batch_requests_parameters= []
parent_of_batched_items = []
while todo_list:
item = todo_list.pop()
request_kwargs = {
"url": item.get("url"),
"headers": self.client.get_requests_headers()
}
batch_requests_parameters.append(request_kwargs)
parent_of_batched_items.append(item.get("parent"))
if not todo_list or len(batch_requests_parameters) > self.batch_size:
json_responses = self.client._batch_requests(batch_requests_parameters)
batch_requests_parameters = []
for parent_of_batched_item, json_response in zip(parent_of_batched_items, json_responses):
response_content = json_response.get("Content", {})
links = response_content.get("Links", {})
next_link = links.get("Next", {})
# do something if there is a next link...
if next_link:
todo_list.append(
{
"url": next_link
}
)
retrieved_items = response_content.get(OSIsoftConstants.API_ITEM_KEY, [])
for retrieved_item in retrieved_items:
retrieved_item_path = retrieved_item.get("Path")
elements_url = self.client.extract_link_with_key(retrieved_item, "Elements")
attributes_url = self.client.extract_link_with_key(retrieved_item, "Attributes")
if elements_url:
todo_list.append(
{
"url": elements_url,
"type": "Element",
"parent": parent_of_batched_item + "\\" + retrieved_item.get("Name")
}
)
if attributes_url:
todo_list.append(
{
"url": attributes_url,
"type": "Attribute",
"parent": parent_of_batched_item + "\\" + retrieved_item.get("Name")
}
)
yield {
"ItemType": type,
"Name": retrieved_item.get("Name"),
"Type": retrieved_item.get("Type"),
"Description": retrieved_item.get("Description"),
"Path": retrieved_item.get("Path"),
"LinkPath": "{}\\{}".format(parent_of_batched_item, retrieved_item.get("Name")),
"Parent": parent_of_batched_item,
"DefaultUnitsName": retrieved_item.get("DefaultUnitsName"),
"TemplateName": retrieved_item.get("TemplateName"),
"CategoryNames": retrieved_item.get("CategoryNames"),
"ExtendedProperties": retrieved_item.get("ExtendedProperties"),
"Step": retrieved_item.get("Step"),
"WebId": retrieved_item.get("WebId"),
"Id": retrieved_item.get("Id")
}
parent_of_batched_items = []


def batch_recurse_next_item(self, next_items, parents=None, type=None):
# logger.info("batch_recurse_next_item")
if not isinstance(next_items, list):
next_items = [next_items]
if not isinstance(parents, list):
parents = [parents]
batch_requests_parameters= []
types = []
items_parents_names = []
for next_item in next_items:
next_item_name = next_item.get("Path")
next_elements_url = self.client.extract_link_with_key(next_item, "Elements")
if next_elements_url:
request_kwargs = {
"url": next_elements_url,
"headers": self.client.get_requests_headers()
}
batch_requests_parameters.append(request_kwargs)
types.append("Element")
items_parents_names.append(next_item_name)
next_attributes_url = self.client.extract_link_with_key(next_item, "Attributes")
if next_attributes_url:
request_kwargs = {
"url": next_attributes_url,
"headers": self.client.get_requests_headers()
}
batch_requests_parameters.append(request_kwargs)
types.append("Attribute")
items_parents_names.append(next_item_name)
if batch_requests_parameters:
json_responses = self.client._batch_requests(batch_requests_parameters)
# for json_response in json_responses:
# # Here we process recurse based on each response in the batch
# # Instead we could process all responses and batch all of them in one go...
# response_content = json_response.get("Content", {})
# if OSIsoftConstants.DKU_ERROR_KEY in response_content:
# # Do something ?
# pass
# items = response_content.get(OSIsoftConstants.API_ITEM_KEY, [])
# batched_items = self.batch_recurse_next_item(items)
# for item in batched_items:
# yield item
# approach 2:
next_batch_items = []
for json_response in json_responses:
response_content = json_response.get("Content", {})
links = response_content.get("Links", {})
next_link = links.get("Next", {})
# do something if there is a next link...
items = response_content.get(OSIsoftConstants.API_ITEM_KEY, [])
next_batch_items.extend(items)
batched_items = self.batch_recurse_next_item(next_batch_items, parents=items_parents_names)
for item in batched_items:
yield item

for item, parent in zip(next_items, parents):
yield {
"ItemType": type,
"Name": item.get("Name"),
"Type": item.get("Type"),
"Description": item.get("Description"),
"Path": item.get("Path"),
"Parent": parent,
"DefaultUnitsName": item.get("DefaultUnitsName"),
"TemplateName": item.get("TemplateName"),
"CategoryNames": item.get("CategoryNames"),
"ExtendedProperties": item.get("ExtendedProperties"),
"Step": item.get("Step"),
"WebId": item.get("WebId"),
"Id": item.get("Id")
}

def get_writer(self, dataset_schema=None, dataset_partitioning=None,
partition_id=None, write_mode="OVERWRITE"):
raise NotImplementedError
Expand Down
2 changes: 1 addition & 1 deletion python-lib/osisoft_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class OSIsoftConstants(object):
"Security": "{base_url}/eventframes/{webid}/security",
"SecurityEntries": "{base_url}/eventframes/{webid}/securityentries"
}
PLUGIN_VERSION = "1.4.0-beta.1"
PLUGIN_VERSION = "1.4.1-beta.1"
VALUE_COLUMN_SUFFIX = "_val"
WEB_API_PATH = "piwebapi"
WRITE_HEADERS = {'X-Requested-With': 'XmlHttpRequest'}