Skip to content

Commit dbabc28

Browse files
committed
feat(cli): add CLI with CI convenience functions for NiFi automation
- Add nipyapi CLI with 'ci' subcommand for NiFi CI/CD operations - Profile auto-resolution: switch() without args auto-detects env vars or profile file - CI functions: deploy_flow, start_flow, stop_flow, get_status, cleanup, configure_params - New functions: configure_inherited_params, purge_flowfiles, upload_asset - get_status defaults to root process group when no PG specified - Log control via NIFI_LOG_LEVEL and NIFI_LOG_ON_ERROR env vars - Suppress SSL warnings to keep CI output clean - Convert snake_case to kebab-case for GitHub Actions output format - resolve_git_ref to resolve tags/branches to SHAs
1 parent 4507a52 commit dbabc28

24 files changed

+4754
-16
lines changed

docs/cli_quickstart.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# nipyapi CLI Quickstart
2+
3+
## Prerequisites
4+
5+
```bash
6+
brew install uv
7+
```
8+
9+
## Install
10+
11+
```bash
12+
uv pip install "nipyapi[cli] @ git+https://github.com/Chaffelson/nipyapi.git@feature/cli"
13+
```
14+
15+
## Configure
16+
17+
```bash
18+
export NIFI_API_ENDPOINT="https://your-nifi-host/nifi-api"
19+
export NIFI_BEARER_TOKEN="your-jwt-token"
20+
```
21+
22+
## Explore
23+
24+
```bash
25+
# Check connection
26+
nipyapi system get_nifi_version_info
27+
28+
# List registry clients
29+
nipyapi versioning list_registry_clients
30+
31+
# Get a specific registry client by name
32+
nipyapi versioning get_registry_client "ConnectorFlowRegistryClient"
33+
```
34+
35+
## Deploy a Flow
36+
37+
```bash
38+
# Get the registry client ID
39+
REGISTRY_ID=$(nipyapi versioning get_registry_client "ConnectorFlowRegistryClient" | jq -r '.id')
40+
41+
# Deploy a flow from the registry
42+
nipyapi ci deploy_flow \
43+
--registry_client_id "$REGISTRY_ID" \
44+
--bucket "connectors" \
45+
--flow "postgresql"
46+
```

nipyapi/ci/__init__.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""
2+
CI convenience functions for NiFi flow management.
3+
4+
These functions wrap nipyapi operations with:
5+
- Environment variable support for CI/CD platforms
6+
- Sensible defaults
7+
- Simplified interfaces
8+
9+
They return plain dicts and raise exceptions on error.
10+
The CLI handles output formatting, logging capture, and exit codes.
11+
12+
Example:
13+
nipyapi ci ensure_registry --repo owner/repo
14+
nipyapi ci deploy_flow --bucket flows --flow my-flow
15+
nipyapi ci start_flow
16+
17+
Profile resolution:
18+
CI functions auto-detect configuration source:
19+
1. If NIFI_API_ENDPOINT env var is set, use environment variables
20+
2. Otherwise, use first profile from ~/.nipyapi/profiles.yml if it exists
21+
3. Fall back to development examples if neither available
22+
"""
23+
24+
from .change_version import change_version
25+
from .cleanup import cleanup
26+
from .configure_inherited_params import configure_inherited_params
27+
from .configure_params import configure_params
28+
from .deploy_flow import deploy_flow
29+
from .ensure_registry import ensure_registry
30+
from .get_status import get_status
31+
from .get_versions import get_versions
32+
from .purge_flowfiles import purge_flowfiles
33+
from .resolve_git_ref import resolve_git_ref
34+
from .revert_flow import revert_flow
35+
from .start_flow import start_flow
36+
from .stop_flow import stop_flow
37+
from .upload_asset import upload_asset
38+
39+
__all__ = [
40+
"ensure_registry",
41+
"deploy_flow",
42+
"start_flow",
43+
"stop_flow",
44+
"get_status",
45+
"get_versions",
46+
"configure_params",
47+
"configure_inherited_params",
48+
"change_version",
49+
"revert_flow",
50+
"cleanup",
51+
"purge_flowfiles",
52+
"resolve_git_ref",
53+
"upload_asset",
54+
]

nipyapi/ci/change_version.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
"""
2+
change_version - change the version of a deployed flow.
3+
"""
4+
5+
import logging
6+
import os
7+
from typing import Optional
8+
9+
import nipyapi
10+
11+
from .resolve_git_ref import resolve_git_ref
12+
13+
log = logging.getLogger(__name__)
14+
15+
16+
def change_version( # pylint: disable=too-many-arguments,too-many-positional-arguments
17+
process_group_id: Optional[str] = None,
18+
target_version: Optional[str] = None,
19+
branch: Optional[str] = None,
20+
token: Optional[str] = None,
21+
repo: Optional[str] = None,
22+
provider: Optional[str] = None,
23+
) -> dict:
24+
"""
25+
Change the version of a deployed flow.
26+
27+
Args:
28+
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
29+
target_version: Version to change to (commit SHA, tag, or branch name).
30+
Env: NIFI_TARGET_VERSION. If None, changes to latest.
31+
branch: Branch to use. Env: NIFI_FLOW_BRANCH
32+
token: Git token for resolving tags. Env: GH_REGISTRY_TOKEN or GL_REGISTRY_TOKEN
33+
repo: Repository in owner/repo format. Env: NIFI_REGISTRY_REPO
34+
provider: Git provider (github/gitlab). Env: NIFI_REGISTRY_PROVIDER
35+
36+
Returns:
37+
dict with previous_version, new_version, version_state
38+
39+
Raises:
40+
ValueError: Missing required parameters or not under version control
41+
"""
42+
process_group_id = process_group_id or os.environ.get("NIFI_PROCESS_GROUP_ID")
43+
target_version = target_version or os.environ.get("NIFI_TARGET_VERSION") or None
44+
branch = branch or os.environ.get("NIFI_FLOW_BRANCH") or None
45+
provider = provider or os.environ.get("NIFI_REGISTRY_PROVIDER", "github")
46+
repo = repo or os.environ.get("NIFI_REGISTRY_REPO")
47+
48+
# Get token based on provider
49+
if not token:
50+
if provider == "gitlab":
51+
token = os.environ.get("GL_REGISTRY_TOKEN")
52+
else:
53+
token = os.environ.get("GH_REGISTRY_TOKEN")
54+
55+
if not process_group_id:
56+
raise ValueError("process_group_id is required (or set NIFI_PROCESS_GROUP_ID)")
57+
58+
nipyapi.profiles.switch()
59+
60+
log.info("Changing version for: %s", process_group_id)
61+
62+
# Get process group
63+
process_group = nipyapi.canvas.get_process_group(process_group_id, "id")
64+
if not process_group:
65+
raise ValueError(f"Process group not found: {process_group_id}")
66+
67+
# Get current version info
68+
version_info = nipyapi.versioning.get_version_info(process_group)
69+
if not version_info or not version_info.version_control_information:
70+
raise ValueError(
71+
f"Process group '{process_group.component.name}' is not under version control"
72+
)
73+
74+
current_vci = version_info.version_control_information
75+
previous_version = current_vci.version
76+
77+
log.debug("Current version: %s (%s)", previous_version, current_vci.state)
78+
79+
# Resolve target version (tag/branch) to SHA if needed
80+
resolved_version = resolve_git_ref(target_version, repo, token, provider)
81+
82+
if resolved_version:
83+
log.info("Target version: %s (resolved to %s)", target_version, resolved_version[:12])
84+
else:
85+
log.info("Target version: latest")
86+
87+
# Change version
88+
nipyapi.versioning.update_git_flow_ver(
89+
process_group=process_group,
90+
target_version=resolved_version,
91+
branch=branch,
92+
)
93+
94+
# Get updated version info
95+
updated_pg = nipyapi.canvas.get_process_group(process_group_id, "id")
96+
updated_vci = nipyapi.versioning.get_version_info(updated_pg)
97+
new_version = updated_vci.version_control_information.version
98+
new_state = updated_vci.version_control_information.state
99+
100+
log.info("Changed from %s to %s (%s)", previous_version[:12], new_version[:12], new_state)
101+
102+
return {
103+
"previous_version": previous_version,
104+
"new_version": new_version,
105+
"version_state": new_state,
106+
}

nipyapi/ci/cleanup.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# pylint: disable=broad-exception-caught,duplicate-code
2+
"""
3+
cleanup - stop and delete a deployed process group.
4+
"""
5+
6+
import logging
7+
import os
8+
from typing import Optional
9+
10+
import nipyapi
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
def _env_bool(name, default=False):
16+
"""Parse a boolean from environment variable."""
17+
value = os.environ.get(name, "").lower()
18+
if value in ("true", "1", "yes"):
19+
return True
20+
if value in ("false", "0", "no"):
21+
return False
22+
return default
23+
24+
25+
# pylint: disable=too-many-arguments,too-many-positional-arguments
26+
# pylint: disable=too-many-branches,too-many-statements
27+
def cleanup(
28+
process_group_id: Optional[str] = None,
29+
stop_only: Optional[bool] = None,
30+
force: Optional[bool] = None,
31+
delete_parameter_context: Optional[bool] = None,
32+
disable_controllers: Optional[bool] = None,
33+
) -> dict:
34+
"""
35+
Stop and optionally delete a process group.
36+
37+
By default, this function stops the process group and disables its controllers,
38+
but does NOT delete it or its parameter context. Use explicit flags or
39+
environment variables for destructive operations.
40+
41+
Args:
42+
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
43+
stop_only: Only stop processors, don't delete anything.
44+
Env: NIFI_STOP_ONLY (default: false)
45+
force: Force deletion even with queued FlowFiles.
46+
Env: NIFI_FORCE_DELETE (default: false)
47+
delete_parameter_context: Also delete the parameter context.
48+
Env: NIFI_DELETE_PARAMETER_CONTEXT (default: false)
49+
WARNING: Only use if you're certain no other process groups share
50+
this context (e.g., Openflow connectors share contexts).
51+
disable_controllers: Disable controller services after stopping.
52+
Env: NIFI_DISABLE_CONTROLLERS (default: true)
53+
54+
Returns:
55+
dict with stopped/deleted status and names
56+
57+
Raises:
58+
ValueError: Missing required parameters
59+
60+
Examples:
61+
# Just stop the flow (safest)
62+
nipyapi ci cleanup --process_group_id PG_ID --stop_only
63+
64+
# Stop and delete process group only (safe for shared contexts)
65+
nipyapi ci cleanup --process_group_id PG_ID
66+
67+
# Full cleanup including parameter context (CI/CD pipelines)
68+
nipyapi ci cleanup --process_group_id PG_ID --delete_parameter_context --force
69+
70+
# Via environment variables (for CI/CD)
71+
NIFI_DELETE_PARAMETER_CONTEXT=true NIFI_FORCE_DELETE=true nipyapi ci cleanup
72+
"""
73+
# Resolve from env vars with safe defaults
74+
process_group_id = process_group_id or os.environ.get("NIFI_PROCESS_GROUP_ID")
75+
if stop_only is None:
76+
stop_only = _env_bool("NIFI_STOP_ONLY", default=False)
77+
if force is None:
78+
force = _env_bool("NIFI_FORCE_DELETE", default=False)
79+
if delete_parameter_context is None:
80+
delete_parameter_context = _env_bool("NIFI_DELETE_PARAMETER_CONTEXT", default=False)
81+
if disable_controllers is None:
82+
disable_controllers = _env_bool("NIFI_DISABLE_CONTROLLERS", default=True)
83+
84+
if not process_group_id:
85+
raise ValueError("process_group_id is required (or set NIFI_PROCESS_GROUP_ID)")
86+
87+
nipyapi.profiles.switch()
88+
89+
log.info("Cleaning up process group: %s", process_group_id)
90+
91+
# Get process group
92+
try:
93+
process_group = nipyapi.canvas.get_process_group(process_group_id, "id")
94+
except nipyapi.nifi.rest.ApiException as e:
95+
if e.status == 404:
96+
log.info("Process group not found - may already be deleted")
97+
return {
98+
"stopped": "false",
99+
"deleted": "false",
100+
"process_group_name": "",
101+
"message": "Process group not found",
102+
}
103+
raise
104+
105+
if not process_group:
106+
return {
107+
"stopped": "false",
108+
"deleted": "false",
109+
"process_group_name": "",
110+
"message": "Process group not found",
111+
}
112+
113+
pg_name = process_group.component.name
114+
log.debug("Found process group: %s", pg_name)
115+
116+
# Stop processors
117+
log.debug("Stopping processors...")
118+
nipyapi.canvas.schedule_process_group(process_group_id, scheduled=False)
119+
log.info("Stopped processors in: %s", pg_name)
120+
121+
# Disable controllers if requested
122+
if disable_controllers:
123+
log.debug("Disabling controller services...")
124+
try:
125+
nipyapi.canvas.schedule_all_controllers(process_group_id, scheduled=False)
126+
log.info("Disabled controller services in: %s", pg_name)
127+
except Exception as e:
128+
log.warning("Could not disable all controllers: %s", e)
129+
130+
# If stop_only, we're done
131+
if stop_only:
132+
return {
133+
"stopped": "true",
134+
"deleted": "false",
135+
"process_group_name": pg_name,
136+
"message": "Stopped only (no deletion)",
137+
}
138+
139+
# Get parameter context reference before deletion
140+
param_ctx_id = None
141+
param_ctx_name = None
142+
if delete_parameter_context and process_group.component.parameter_context:
143+
param_ctx_id = process_group.component.parameter_context.id
144+
param_ctx_name = process_group.component.parameter_context.component.name
145+
log.debug("Will delete parameter context: %s", param_ctx_name)
146+
147+
# Delete process group
148+
log.debug("Deleting process group...")
149+
nipyapi.canvas.delete_process_group(process_group, force=force)
150+
log.info("Deleted process group: %s", pg_name)
151+
152+
# Delete parameter context if explicitly requested
153+
param_ctx_deleted = False
154+
if delete_parameter_context and param_ctx_id:
155+
try:
156+
log.debug("Deleting parameter context: %s", param_ctx_name)
157+
ctx = nipyapi.parameters.get_parameter_context(param_ctx_id, identifier_type="id")
158+
nipyapi.parameters.delete_parameter_context(ctx)
159+
log.info("Deleted parameter context: %s", param_ctx_name)
160+
param_ctx_deleted = True
161+
except Exception as e:
162+
log.warning("Could not delete parameter context: %s", e)
163+
164+
return {
165+
"stopped": "true",
166+
"deleted": "true",
167+
"process_group_name": pg_name,
168+
"parameter_context_deleted": str(param_ctx_deleted).lower(),
169+
}

0 commit comments

Comments
 (0)