diff --git a/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml b/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml
index baea7957..59968ee2 100644
--- a/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml
+++ b/demos/data-lakehouse-iceberg-trino-spark/create-nifi-ingestion-job.yaml
@@ -50,6 +50,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
+ import requests
import urllib3
# As of 2022-08-29 we cant use "https://nifi:8443" here because
The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.
@@ -66,70 +67,36 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")
- organization = "stackabletech"
- repository = "demos"
- branch = "main"
- version = "main"
- directory = "demos/data-lakehouse-iceberg-trino-spark"
- flow_name = "LakehouseKafkaIngest"
+ response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.json")
- # Check if the GitHub flow registry client already exists
- flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
+ filename = "/tmp/LakehouseKafkaIngest.json"
+ with open(filename, "wb") as f:
+ f.write(response.content)
- github_client = None
- for client in flow_registry_clients:
- if client.component.name == "GitHubFlowRegistryClient":
- github_client = client
- print("Found existing GitHub flow registry client")
- break
+ pg_id = get_root_pg_id()
- if not github_client:
- print("Creating new GitHub flow registry client")
- github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
- body={
- "revision": {"version": 0},
- "component": {
- "name": "GitHubFlowRegistryClient",
- "type": "org.apache.nifi.github.GitHubFlowRegistryClient",
- "properties": {
- "Repository Owner": organization,
- "Repository Name": repository,
- },
- "bundle": {
- "group": "org.apache.nifi",
- "artifact": "nifi-github-nar",
- "version": "2.2.0",
- },
- },
- }
- )
+ if not nipyapi.config.nifi_config.api_client:
+ nipyapi.config.nifi_config.api_client = ApiClient()
- pg_id = get_root_pg_id()
+ header_params = {}
+ header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
+ header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
- try:
- # Create process group from the file in the Git repo
- nipyapi.nifi.ProcessGroupsApi().create_process_group(
- id=pg_id,
- body={
- "revision": {"version": 0},
- "component": {
- "position": {"x": 300, "y": 10},
- "versionControlInformation": {
- "registryId": github_client.component.id,
- "flowId": flow_name,
- "bucketId": directory,
- "branch": branch,
- "version": version,
- },
- },
- },
- )
- except ValueError as e:
- # Ignore, because nipyapi can't handle non-int versions yet
- if "invalid literal for int() with base 10" in str(e):
- print("Ignoring ValueError")
- else:
- raise e
+ nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
+ path_params={'pg_id': pg_id},
+ header_params=header_params,
+ _return_http_data_only=True,
+ post_params=[
+ ('id', pg_id),
+ ('groupName', 'LakehouseKafkaIngest'),
+ ('positionX', 100),
+ ('positionY', 10),
+ ('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
+ ],
+ files={
+ 'file': filename
+ },
+ auth_settings=['tokenAuth'])
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
diff --git a/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml b/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml
index 82005839..52d9b4db 100644
--- a/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml
+++ b/demos/nifi-kafka-druid-earthquake-data/create-nifi-ingestion-job.yaml
@@ -59,6 +59,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
+ import requests
import urllib3
# As of 2022-08-29 we cant use "https://nifi:8443" here because The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.
@@ -75,70 +76,35 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")
- organization = "stackabletech"
- repository = "demos"
- branch = "main"
- version = "main"
- directory = "demos/nifi-kafka-druid-earthquake-data"
- flow_name = "IngestEarthquakesToKafka"
+ response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-earthquake-data/IngestEarthquakesToKafka.json")
+ filename = "/tmp/IngestEarthquakesToKafka.json"
+ with open(filename, "wb") as f:
+ f.write(response.content)
- # Check if the GitHub flow registry client already exists
- flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
-
- github_client = None
- for client in flow_registry_clients:
- if client.component.name == "GitHubFlowRegistryClient":
- github_client = client
- print("Found existing GitHub flow registry client")
- break
+ pg_id = get_root_pg_id()
- if not github_client:
- print("Creating new GitHub flow registry client")
- github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
- body={
- "revision": {"version": 0},
- "component": {
- "name": "GitHubFlowRegistryClient",
- "type": "org.apache.nifi.github.GitHubFlowRegistryClient",
- "properties": {
- "Repository Owner": organization,
- "Repository Name": repository,
- },
- "bundle": {
- "group": "org.apache.nifi",
- "artifact": "nifi-github-nar",
- "version": "2.2.0",
- },
- },
- }
- )
+ if not nipyapi.config.nifi_config.api_client:
+ nipyapi.config.nifi_config.api_client = ApiClient()
- pg_id = get_root_pg_id()
+ header_params = {}
+ header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
+ header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
- try:
- # Create process group from the file in the Git repo
- nipyapi.nifi.ProcessGroupsApi().create_process_group(
- id=pg_id,
- body={
- "revision": {"version": 0},
- "component": {
- "position": {"x": 300, "y": 10},
- "versionControlInformation": {
- "registryId": github_client.component.id,
- "flowId": flow_name,
- "bucketId": directory,
- "branch": branch,
- "version": version,
- },
- },
- },
- )
- except ValueError as e:
- # Ignore, because nipyapi can't handle non-int versions yet
- if "invalid literal for int() with base 10" in str(e):
- print("Ignoring ValueError")
- else:
- raise e
+ nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
+ path_params={'pg_id': pg_id},
+ header_params=header_params,
+ _return_http_data_only=True,
+ post_params=[
+ ('id', pg_id),
+ ('groupName', 'IngestEarthquakesToKafka'),
+ ('positionX', 100),
+ ('positionY', 10),
+ ('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
+ ],
+ files={
+ 'file': filename
+ },
+ auth_settings=['tokenAuth'])
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
diff --git a/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml b/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml
index ac5961f1..51d20195 100644
--- a/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml
+++ b/demos/nifi-kafka-druid-water-level-data/create-nifi-ingestion-job.yaml
@@ -59,6 +59,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
+ import requests
import urllib3
# As of 2022-08-29 we cant use "https://nifi:8443" here because The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.
@@ -75,70 +76,36 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")
- organization = "stackabletech"
- repository = "demos"
- branch = "main"
- version = "main"
- directory = "demos/nifi-kafka-druid-water-level-data"
- flow_name = "IngestWaterLevelsToKafka"
+ response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/nifi-kafka-druid-water-level-data/IngestWaterLevelsToKafka.json")
- # Check if the GitHub flow registry client already exists
- flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries
+ filename = "/tmp/IngestWaterLevelsToKafka.json"
+ with open(filename, "wb") as f:
+ f.write(response.content)
- github_client = None
- for client in flow_registry_clients:
- if client.component.name == "GitHubFlowRegistryClient":
- github_client = client
- print("Found existing GitHub flow registry client")
- break
+ pg_id = get_root_pg_id()
- if not github_client:
- print("Creating new GitHub flow registry client")
- github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
- body={
- "revision": {"version": 0},
- "component": {
- "name": "GitHubFlowRegistryClient",
- "type": "org.apache.nifi.github.GitHubFlowRegistryClient",
- "properties": {
- "Repository Owner": organization,
- "Repository Name": repository,
- },
- "bundle": {
- "group": "org.apache.nifi",
- "artifact": "nifi-github-nar",
- "version": "2.2.0",
- },
- },
- }
- )
+ if not nipyapi.config.nifi_config.api_client:
+ nipyapi.config.nifi_config.api_client = ApiClient()
- pg_id = get_root_pg_id()
+ header_params = {}
+ header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
+ header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
- try:
- # Create process group from the file in the Git repo
- nipyapi.nifi.ProcessGroupsApi().create_process_group(
- id=pg_id,
- body={
- "revision": {"version": 0},
- "component": {
- "position": {"x": 300, "y": 10},
- "versionControlInformation": {
- "registryId": github_client.component.id,
- "flowId": flow_name,
- "bucketId": directory,
- "branch": branch,
- "version": version,
- },
- },
- },
- )
- except ValueError as e:
- # Ignore, because nipyapi can't handle non-int versions yet
- if "invalid literal for int() with base 10" in str(e):
- print("Ignoring ValueError")
- else:
- raise e
+ nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
+ path_params={'pg_id': pg_id},
+ header_params=header_params,
+ _return_http_data_only=True,
+ post_params=[
+ ('id', pg_id),
+ ('groupName', 'IngestWaterLevelsToKafka'),
+ ('positionX', 100),
+ ('positionY', 10),
+ ('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
+ ],
+ files={
+ 'file': filename
+ },
+ auth_settings=['tokenAuth'])
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
diff --git a/demos/signal-processing/create-nifi-ingestion-job.yaml b/demos/signal-processing/create-nifi-ingestion-job.yaml
index 1a858678..41c5699d 100644
--- a/demos/signal-processing/create-nifi-ingestion-job.yaml
+++ b/demos/signal-processing/create-nifi-ingestion-job.yaml
@@ -70,6 +70,7 @@ data:
from nipyapi.security import service_login
import nipyapi
import os
+ import requests
import urllib3
# As of 2022-08-29 we cant use "https://nifi:8443" here because The request contained an invalid host header [nifi:8443] in the request [/nifi-api]. Check for request manipulation or third-party intercept.
@@ -86,60 +87,36 @@ data:
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")
- organization = "stackabletech"
- repository = "demos"
- branch = "main"
- version = "main"
- directory = "demos/signal-processing"
- flow_name = "DownloadAndWriteToDB"
+ response = requests.get("https://raw.githubusercontent.com/stackabletech/demos/refs/heads/main/demos/signal-processing/DownloadAndWriteToDB.json")
- # Register the flow registry client
- response = nipyapi.nifi.ControllerApi().create_flow_registry_client(
- body={
- "revision": {"version": 0},
- "component": {
- "name": "GitHubFlowRegistryClient",
- "type": "org.apache.nifi.github.GitHubFlowRegistryClient",
- "properties": {
- "Repository Owner": organization,
- "Repository Name": repository,
- },
- "bundle": {
- "group": "org.apache.nifi",
- "artifact": "nifi-github-nar",
- "version": "2.2.0",
- },
- },
- }
- )
+ filename = "/tmp/DownloadAndWriteToDB.json"
+ with open(filename, "wb") as f:
+ f.write(response.content)
pg_id = get_root_pg_id()
- print(f"pgid={pg_id}")
- try:
- # Create process group from the file in the Git repo
- nipyapi.nifi.ProcessGroupsApi().create_process_group(
- id=pg_id,
- body={
- "revision": {"version": 0},
- "component": {
- "position": {"x": 300, "y": 10},
- "versionControlInformation": {
- "registryId": response.component.id,
- "flowId": flow_name,
- "bucketId": directory,
- "branch": branch,
- "version": version,
- },
- },
- },
- )
- except ValueError as e:
- # Ignore, because nipyapi can't handle non-int versions yet
- if "invalid literal for int() with base 10" in str(e):
- print("Ignoring ValueError")
- else:
- raise e
+ if not nipyapi.config.nifi_config.api_client:
+ nipyapi.config.nifi_config.api_client = ApiClient()
+
+ header_params = {}
+ header_params['Accept'] = nipyapi.config.nifi_config.api_client.select_header_accept(['application/json'])
+ header_params['Content-Type'] = nipyapi.config.nifi_config.api_client.select_header_content_type(['multipart/form-data'])
+
+ nipyapi.config.nifi_config.api_client.call_api('/process-groups/{pg_id}/process-groups/upload', 'POST',
+ path_params={'pg_id': pg_id},
+ header_params=header_params,
+ _return_http_data_only=True,
+ post_params=[
+ ('id', pg_id),
+ ('groupName', 'DownloadAndWriteToDB'),
+ ('positionX', 100),
+ ('positionY', 10),
+ ('clientId', nipyapi.nifi.FlowApi().generate_client_id()),
+ ],
+ files={
+ 'file': filename
+ },
+ auth_settings=['tokenAuth'])
# Update the controller services with the correct password
for controller in list_all_controllers(pg_id):