Skip to content

Commit 5a0f263

Browse files
committed
feat(cli): add CLI with CI convenience functions
- Add nipyapi CLI using Google Fire (optional install: nipyapi[cli]) - Add nipyapi.ci module with 9 CI convenience functions - CLI features: JSON output, log capture, error wrapping, CI auto-detect - Add docs/cli_quickstart.md for quick testing
1 parent 4507a52 commit 5a0f263

File tree

13 files changed

+1220
-0
lines changed

13 files changed

+1220
-0
lines changed

docs/cli_quickstart.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# nipyapi CLI Quickstart
2+
3+
## Prerequisites
4+
5+
```bash
6+
brew install uv
7+
```
8+
9+
## Install
10+
11+
```bash
12+
uv pip install "nipyapi[cli] @ git+https://github.com/Chaffelson/nipyapi.git@feature/cli"
13+
```
14+
15+
## Configure
16+
17+
```bash
18+
export NIFI_API_ENDPOINT="https://your-nifi-host/nifi-api"
19+
export NIFI_BEARER_TOKEN="your-jwt-token"
20+
```
21+
22+
## Explore
23+
24+
```bash
25+
# Check connection
26+
nipyapi system get_nifi_version_info
27+
28+
# List registry clients
29+
nipyapi versioning list_registry_clients
30+
31+
# Get a specific registry client by name
32+
nipyapi versioning get_registry_client "ConnectorFlowRegistryClient"
33+
```
34+
35+
## Deploy a Flow
36+
37+
```bash
38+
# Get the registry client ID
39+
REGISTRY_ID=$(nipyapi versioning get_registry_client "ConnectorFlowRegistryClient" | jq -r '.id')
40+
41+
# Deploy a flow from the registry
42+
nipyapi ci deploy_flow \
43+
--registry_client_id "$REGISTRY_ID" \
44+
--bucket "connectors" \
45+
--flow "postgresql"
46+
```

nipyapi/ci/__init__.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""
2+
CI convenience functions for NiFi flow management.
3+
4+
These functions wrap nipyapi operations with:
5+
- Environment variable support for CI/CD platforms
6+
- Sensible defaults
7+
- Simplified interfaces
8+
9+
They return plain dicts and raise exceptions on error.
10+
The CLI handles output formatting, logging capture, and exit codes.
11+
12+
Example:
13+
nipyapi ci ensure_registry --repo owner/repo
14+
nipyapi ci deploy_flow --bucket flows --flow my-flow
15+
nipyapi ci start_flow
16+
"""
17+
18+
from .change_version import change_version
19+
from .cleanup import cleanup
20+
from .configure_params import configure_params
21+
from .deploy_flow import deploy_flow
22+
from .ensure_registry import ensure_registry
23+
from .get_status import get_status
24+
from .revert_flow import revert_flow
25+
from .start_flow import start_flow
26+
from .stop_flow import stop_flow
27+
28+
__all__ = [
29+
"ensure_registry",
30+
"deploy_flow",
31+
"start_flow",
32+
"stop_flow",
33+
"get_status",
34+
"configure_params",
35+
"change_version",
36+
"revert_flow",
37+
"cleanup",
38+
]

nipyapi/ci/change_version.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""
2+
change_version - change the version of a deployed flow.
3+
"""
4+
5+
import logging
6+
import os
7+
from typing import Optional
8+
9+
import nipyapi
10+
11+
log = logging.getLogger(__name__)
12+
13+
14+
def change_version(
15+
process_group_id: Optional[str] = None,
16+
target_version: Optional[str] = None,
17+
branch: Optional[str] = None,
18+
) -> dict:
19+
"""
20+
Change the version of a deployed flow.
21+
22+
Args:
23+
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
24+
target_version: Version to change to (commit SHA, tag, branch).
25+
Env: NIFI_TARGET_VERSION. If None, changes to latest.
26+
branch: Branch to use. Env: NIFI_FLOW_BRANCH
27+
28+
Returns:
29+
dict with previous_version, new_version, version_state
30+
31+
Raises:
32+
ValueError: Missing required parameters or not under version control
33+
"""
34+
process_group_id = process_group_id or os.environ.get("NIFI_PROCESS_GROUP_ID")
35+
target_version = target_version or os.environ.get("NIFI_TARGET_VERSION") or None
36+
branch = branch or os.environ.get("NIFI_FLOW_BRANCH") or None
37+
38+
if not process_group_id:
39+
raise ValueError("process_group_id is required (or set NIFI_PROCESS_GROUP_ID)")
40+
41+
nipyapi.profiles.switch("env")
42+
43+
log.info("Changing version for: %s", process_group_id)
44+
45+
# Get process group
46+
process_group = nipyapi.canvas.get_process_group(process_group_id, "id")
47+
if not process_group:
48+
raise ValueError(f"Process group not found: {process_group_id}")
49+
50+
# Get current version info
51+
version_info = nipyapi.versioning.get_version_info(process_group)
52+
if not version_info or not version_info.version_control_information:
53+
raise ValueError(
54+
f"Process group '{process_group.component.name}' is not under version control"
55+
)
56+
57+
current_vci = version_info.version_control_information
58+
previous_version = current_vci.version
59+
60+
log.debug("Current version: %s (%s)", previous_version, current_vci.state)
61+
62+
if target_version:
63+
log.info("Target version: %s", target_version)
64+
else:
65+
log.info("Target version: latest")
66+
67+
# Change version
68+
nipyapi.versioning.update_git_flow_ver(
69+
process_group=process_group,
70+
target_version=target_version,
71+
branch=branch,
72+
)
73+
74+
# Get updated version info
75+
updated_pg = nipyapi.canvas.get_process_group(process_group_id, "id")
76+
updated_vci = nipyapi.versioning.get_version_info(updated_pg)
77+
new_version = updated_vci.version_control_information.version
78+
new_state = updated_vci.version_control_information.state
79+
80+
log.info("Changed from %s to %s (%s)", previous_version[:12], new_version[:12], new_state)
81+
82+
return {
83+
"previous_version": previous_version,
84+
"new_version": new_version,
85+
"version_state": new_state,
86+
}

nipyapi/ci/cleanup.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# pylint: disable=broad-exception-caught,duplicate-code
2+
"""
3+
cleanup - stop and delete a deployed process group.
4+
"""
5+
6+
import logging
7+
import os
8+
from typing import Optional
9+
10+
import nipyapi
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
def cleanup(
16+
process_group_id: Optional[str] = None,
17+
force: bool = True,
18+
delete_parameter_context: bool = True,
19+
) -> dict:
20+
"""
21+
Stop and delete a process group, optionally including its parameter context.
22+
23+
Args:
24+
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
25+
force: Force deletion even with running components (default: True)
26+
delete_parameter_context: Delete parameter context too (default: True)
27+
28+
Returns:
29+
dict with deleted, deleted_name
30+
31+
Raises:
32+
ValueError: Missing required parameters
33+
"""
34+
process_group_id = process_group_id or os.environ.get("NIFI_PROCESS_GROUP_ID")
35+
36+
if not process_group_id:
37+
raise ValueError("process_group_id is required (or set NIFI_PROCESS_GROUP_ID)")
38+
39+
nipyapi.profiles.switch("env")
40+
41+
log.info("Cleaning up process group: %s", process_group_id)
42+
43+
# Get process group
44+
try:
45+
process_group = nipyapi.canvas.get_process_group(process_group_id, "id")
46+
except nipyapi.nifi.rest.ApiException as e:
47+
if e.status == 404:
48+
log.info("Process group not found - may already be deleted")
49+
return {
50+
"deleted": "false",
51+
"deleted_name": "",
52+
"message": "Process group not found",
53+
}
54+
raise
55+
56+
if not process_group:
57+
return {
58+
"deleted": "false",
59+
"deleted_name": "",
60+
"message": "Process group not found",
61+
}
62+
63+
pg_name = process_group.component.name
64+
log.debug("Found process group: %s", pg_name)
65+
66+
# Get parameter context reference before deletion
67+
param_ctx_id = None
68+
param_ctx_name = None
69+
if delete_parameter_context and process_group.component.parameter_context:
70+
param_ctx_id = process_group.component.parameter_context.id
71+
param_ctx_name = process_group.component.parameter_context.component.name
72+
log.debug("Found parameter context: %s", param_ctx_name)
73+
74+
# Stop first
75+
log.debug("Stopping process group...")
76+
nipyapi.canvas.schedule_process_group(process_group_id, scheduled=False)
77+
78+
# Delete process group
79+
log.debug("Deleting process group...")
80+
nipyapi.canvas.delete_process_group(process_group, force=force)
81+
log.info("Deleted process group: %s", pg_name)
82+
83+
# Delete parameter context if requested
84+
if delete_parameter_context and param_ctx_id:
85+
try:
86+
log.debug("Deleting parameter context: %s", param_ctx_name)
87+
ctx = nipyapi.parameters.get_parameter_context(param_ctx_id, identifier_type="id")
88+
nipyapi.parameters.delete_parameter_context(ctx)
89+
log.info("Deleted parameter context: %s", param_ctx_name)
90+
except Exception as e:
91+
log.warning("Could not delete parameter context: %s", e)
92+
93+
return {
94+
"deleted": "true",
95+
"deleted_name": pg_name,
96+
}

nipyapi/ci/configure_params.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
configure_params - set parameter values on a process group.
3+
"""
4+
5+
import json
6+
import logging
7+
import os
8+
from typing import Any, Dict, Optional, Union
9+
10+
import nipyapi
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
def configure_params(
16+
process_group_id: Optional[str] = None,
17+
parameters: Optional[Union[str, Dict[str, Any]]] = None,
18+
) -> dict:
19+
"""
20+
Configure parameters on a process group's parameter context.
21+
22+
Args:
23+
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
24+
parameters: JSON string or dict of parameter name -> value pairs.
25+
Env: NIFI_PARAMETERS
26+
27+
Returns:
28+
dict with parameters_updated, parameters_count, context_name
29+
30+
Raises:
31+
ValueError: Missing required parameters or invalid JSON
32+
"""
33+
process_group_id = process_group_id or os.environ.get("NIFI_PROCESS_GROUP_ID")
34+
parameters = parameters or os.environ.get("NIFI_PARAMETERS")
35+
36+
if not process_group_id:
37+
raise ValueError("process_group_id is required (or set NIFI_PROCESS_GROUP_ID)")
38+
if not parameters:
39+
raise ValueError("parameters is required (or set NIFI_PARAMETERS)")
40+
41+
# Parse JSON string if provided
42+
if isinstance(parameters, str):
43+
try:
44+
parameters = json.loads(parameters)
45+
except json.JSONDecodeError as e:
46+
raise ValueError(f"Invalid JSON in parameters: {e}") from e
47+
48+
if not isinstance(parameters, dict):
49+
raise ValueError("parameters must be a JSON object with key-value pairs")
50+
51+
nipyapi.profiles.switch("env")
52+
53+
log.info("Configuring %d parameter(s) on %s", len(parameters), process_group_id)
54+
55+
# Get process group
56+
pg = nipyapi.canvas.get_process_group(process_group_id, identifier_type="id")
57+
if not pg:
58+
raise ValueError(f"Process group not found: {process_group_id}")
59+
60+
log.debug("Found process group: %s", pg.component.name)
61+
62+
# Check for parameter context
63+
if not pg.component.parameter_context:
64+
raise ValueError(
65+
f"Process group '{pg.component.name}' has no parameter context. "
66+
"Attach a parameter context before configuring parameters."
67+
)
68+
69+
ctx_ref = pg.component.parameter_context
70+
ctx_name = ctx_ref.component.name
71+
log.debug("Parameter context: %s", ctx_name)
72+
73+
# Get full context
74+
ctx = nipyapi.parameters.get_parameter_context(ctx_ref.id, identifier_type="id")
75+
76+
# Prepare parameters
77+
prepared_params = []
78+
for param_name, param_value in parameters.items():
79+
param = nipyapi.parameters.prepare_parameter(
80+
name=param_name,
81+
value=str(param_value),
82+
sensitive=False,
83+
)
84+
prepared_params.append(param)
85+
log.debug("Setting %s = %s", param_name, param_value)
86+
87+
# Apply parameters
88+
ctx.component.parameters = prepared_params
89+
nipyapi.parameters.update_parameter_context(ctx)
90+
91+
updated_params = list(parameters.keys())
92+
log.info("Updated %d parameter(s): %s", len(updated_params), ", ".join(updated_params))
93+
94+
return {
95+
"parameters_updated": ",".join(updated_params),
96+
"parameters_count": str(len(updated_params)),
97+
"context_name": ctx_name,
98+
}

0 commit comments

Comments
 (0)