Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dev_nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ env:
IMAGE_NAME: nifi
# TODO (@NickLarsenNZ): Use a versioned image with stackable0.0.0-dev or stackableXX.X.X so that
# the demo is reproducable for the release and it will be automatically replaced for the release branch.
IMAGE_VERSION: 1.28.1-postgresql
IMAGE_VERSION: 2.2.0-postgresql
REGISTRY_PATH: stackable
DOCKERFILE_PATH: "demos/signal-processing/Dockerfile-nifi"

Expand Down

Large diffs are not rendered by default.

7,893 changes: 0 additions & 7,893 deletions demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ spec:
containers:
- name: create-nifi-ingestion-job
image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
command: ["bash", "-c", "curl -O https://raw.githubusercontent.com/stackabletech/demos/main/demos/data-lakehouse-iceberg-trino-spark/LakehouseKafkaIngest.xml && python -u /tmp/script/script.py"]
command:
- bash
- -euo
- pipefail
- -c
- python -u /tmp/script/script.py
volumeMounts:
- name: script
mountPath: /tmp/script
Expand All @@ -41,9 +46,8 @@ metadata:
name: create-nifi-ingestion-job-script
data:
script.py: |
from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller
from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller, update_controller
from nipyapi.security import service_login
from nipyapi.templates import get_template, upload_template, deploy_template
import nipyapi
import os
import urllib3
Expand All @@ -52,26 +56,94 @@ data:
ENDPOINT = f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443" # For local testing / developing replace it, afterwards change back to f"https://nifi-node-default-0.nifi-node-default.{os.environ['NAMESPACE']}.svc.cluster.local:8443"
USERNAME = "admin"
PASSWORD = open("/nifi-admin-credentials-secret/admin").read()
TEMPLATE_NAME = "LakehouseKafkaIngest"
TEMPLATE_FILE = f"{TEMPLATE_NAME}.xml"

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

nipyapi.config.nifi_config.host = f"{ENDPOINT}/nifi-api"
nipyapi.config.nifi_config.verify_ssl = False

print("Logging in")
print(f"Logging in as {USERNAME}")
service_login(username=USERNAME, password=PASSWORD)
print("Logged in")

pg_id = get_root_pg_id()
organization = "stackabletech"
repository = "demos"
branch = "main"
version = "main"
directory = "demos/data-lakehouse-iceberg-trino-spark"
flow_name = "LakehouseKafkaIngest"

# Check if the GitHub flow registry client already exists
flow_registry_clients = nipyapi.nifi.ControllerApi().get_flow_registry_clients().registries

github_client = None
for client in flow_registry_clients:
if client.component.name == "GitHubFlowRegistryClient":
github_client = client
print("Found existing GitHub flow registry client")
break

upload_template(pg_id, TEMPLATE_FILE)
if not github_client:
print("Creating new GitHub flow registry client")
github_client = nipyapi.nifi.ControllerApi().create_flow_registry_client(
body={
"revision": {"version": 0},
"component": {
"name": "GitHubFlowRegistryClient",
"type": "org.apache.nifi.github.GitHubFlowRegistryClient",
"properties": {
"Repository Owner": organization,
"Repository Name": repository,
},
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-github-nar",
"version": "2.2.0",
},
},
}
)

pg_id = get_root_pg_id()

template_id = get_template(TEMPLATE_NAME).id
deploy_template(pg_id, template_id, 200, 0)
try:
# Create process group from the file in the Git repo
nipyapi.nifi.ProcessGroupsApi().create_process_group(
id=pg_id,
body={
"revision": {"version": 0},
"component": {
"position": {"x": 300, "y": 10},
"versionControlInformation": {
"registryId": github_client.component.id,
"flowId": flow_name,
"bucketId": directory,
"branch": branch,
"version": version,
},
},
},
)
except ValueError as e:
# Ignore, because nipyapi can't handle non-int versions yet
if "invalid literal for int() with base 10" in str(e):
print("Ignoring ValueError")
else:
raise e

for controller in list_all_controllers():
schedule_controller(controller, scheduled=True)
# Scheduling the `Kafka3ConnectionService` fails, if it is started before `StandardRestrictedSSLContextService`, since it depends on it
# To work around this, we try to schedule the controllers multiple times
# If `Kafka3ConnectionService` is started before `StandardRestrictedSSLContextService`, scheduling it will fail in the first iteration
# But it should succeed in the second attempt, since by then `StandardRestrictedSSLContextService` is started
max_retries = 2
for _ in range(max_retries):
controllers = list_all_controllers(pg_id)
for controller in controllers:
if controller.component.state != "ENABLED":
try:
schedule_controller(controller, scheduled=True)
print(f"Scheduled controller: {controller.component.name}")
except Exception as e:
print(f"Failed to schedule controller {controller.component.name}: {e}")

schedule_process_group(pg_id, scheduled=True)

Large diffs are not rendered by default.

Loading