Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dev_nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ env:
IMAGE_NAME: nifi
# TODO (@NickLarsenNZ): Use a versioned image with stackable0.0.0-dev or stackableXX.X.X so that
# the demo is reproducable for the release and it will be automatically replaced for the release branch.
IMAGE_VERSION: 1.28.1-postgresql
IMAGE_VERSION: 2.2.0-postgresql
REGISTRY_PATH: stackable
DOCKERFILE_PATH: "demos/signal-processing/Dockerfile-nifi"

Expand Down
1 change: 1 addition & 0 deletions demos/IngestEarthquakesToKafka.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions demos/IngestWaterLevelsToKafka.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions demos/LakehouseKafkaIngest.json

Large diffs are not rendered by default.

Large diffs are not rendered by default.

7,893 changes: 0 additions & 7,893 deletions demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ spec:
containers:
- name: create-nifi-ingestion-job
image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
command: ["bash", "-c", "curl -O https://raw.githubusercontent.com/stackabletech/demos/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.xml && python -u /tmp/script/script.py"]
command:
- bash
- -euo
- pipefail
- -c
- python -u /tmp/script/script.py
volumeMounts:
- name: script
mountPath: /tmp/script
Expand All @@ -41,37 +46,104 @@ metadata:
name: create-nifi-ingestion-job-script
data:
script.py: |
from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller
from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller, update_controller
from nipyapi.security import service_login
from nipyapi.templates import get_template, upload_template, deploy_template
import nipyapi
import os
import urllib3

# As of 2022-08-29 we cant use "https://nifi:8443" here because <h2>The request contained an invalid host header [<code>nifi:8443</code>] in the request [<code>/nifi-api</code>]. Check for request manipulation or third-party intercept.</h2>
ENDPOINT = f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443" # For local testing / developing replace it, afterwards change back to f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443"
USERNAME = "admin"
PASSWORD = open("/nifi-admin-credentials-secret/admin").read()
TEMPLATE_NAME = "LakehouseKafkaIngest"
TEMPLATE_FILE = f"{TEMPLATE_NAME}.xml"
PASSWORD = PASSWORD = open("/nifi-admin-credentials-secret/admin").read()

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

nipyapi.config.nifi_config.host = f"{ENDPOINT}/nifi-api"
nipyapi.config.nifi_config.verify_ssl = False

print("Logging in")
print(f"Logging in as {USERNAME}")
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")

pg_id = get_root_pg_id()
organization = "stackabletech"
repository = "demos"
branch = "release-25.3"
version = "release-25.3"
directory = "demos/data-lakehouse-iceberg-trino-spark"
flow_name = "LakehouseKafkaIngest"

# Check if the GitHub flow registry client already exists
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries

github_client = None
for client in flow_registry_clients:
if client.component.name == "GitHubFlowRegistryClient":
github_client = client
print("Found existing GitHub flow registry client")
break

upload_template(pg_id, TEMPLATE_FILE)
if not github_client:
print("Creating new GitHub flow registry client")
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
body={
"revision": {"version": 0},
"component": {
"name": "GitHubFlowRegistryClient",
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
"properties": {
"Repository Owner": organization,
"Repository Name": repository,
},
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-github-nar",
"version": "2.2.0",
},
},
}
)

pg_id = get_root_pg_id()

template_id = get_template(TEMPLATE_NAME).id
deploy_template(pg_id, template_id, 200, 0)
try:
# Create process group from the file in the Git repo
nipyapi.nifi.ProcessGroupsApi().create_process_group(
id=pg_id,
body={
"revision": {"version": 0},
"component": {
"position": {"x": 300, "y": 10},
"versionControlInformation": {
"registryId": github_client.component.id,
"flowId": flow_name,
"bucketId": directory,
"branch": branch,
"version": version,
},
},
},
)
except ValueError as e:
# Ignore, because nipyapi can't handle non-int versions yet
if "invalid literal for int() with base 10" in str(e):
print("Ignoring ValueError")
else:
raise e

for controller in list_all_controllers():
schedule_controller(controller, scheduled=True)
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
# If `Kafka3ConnectionService` is started before `StandardRestrictedSSLContextService`, scheduling it will fail in the first iteration
# But it should succeed in the second attempt, since by then `StandardRestrictedSSLContextService` is started
max_retries = 2
for _ in range(max_retries):
controllers = list_all_controllers(pg_id)
for controller in controllers:
if controller.component.state != "ENABLED":
try:
schedule_controller(controller, scheduled=True)
print(f"Scheduled controller: {controller.component.name}")
except Exception as e:
print(f"Failed to schedule controller {controller.component.name}: {e}")

schedule_process_group(pg_id, scheduled=True)

Large diffs are not rendered by default.

Loading