Skip to content

Commit 5cb732a

Browse files
committed
feat(canvas): add name param and auto_stop to update_processor
- update_processor now accepts name param for renaming processors - add auto_stop param (default False) to handle running processors - revert_flow_ver now refreshes revision internally to prevent stale errors
1 parent 4c8c1d6 commit 5cb732a

File tree

4 files changed

+92
-17
lines changed

4 files changed

+92
-17
lines changed

nipyapi/canvas.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -735,36 +735,80 @@ def update_process_group(pg, update, refresh=True):
735735
)
736736

737737

738-
def update_processor(processor, update, refresh=True):
738+
def update_processor(processor, update=None, name=None, refresh=True, auto_stop=False):
739739
"""
740-
Updates configuration parameters for a given Processor.
740+
Updates a Processor's configuration and/or name.
741741
742-
An example update would be:
743-
nifi.ProcessorConfigDTO(scheduling_period='3s')
742+
For configuration changes, pass a ProcessorConfigDTO:
743+
nifi.ProcessorConfigDTO(scheduling_period='3s')
744+
745+
For renaming, pass the new name. Both can be provided together.
746+
747+
Processors must be stopped for certain updates (including renaming).
748+
If auto_stop is True (default), the processor will be stopped before
749+
updating and restarted afterward if it was originally running.
744750
745751
Args:
746752
processor (ProcessorEntity): The Processor to target for update
747-
update (ProcessorConfigDTO): The new configuration parameters
753+
update (ProcessorConfigDTO, optional): Configuration parameters to update
754+
name (str, optional): New name for the processor
748755
refresh (bool): Whether to refresh the Processor object state
749-
before applying the update
756+
before applying the update. Default True.
757+
auto_stop (bool): If True, automatically stop the processor before
758+
updating and restart afterward if it was running. Default False.
750759
751760
Returns:
752761
:class:`~nipyapi.nifi.models.ProcessorEntity`: The updated ProcessorEntity
753762
763+
Raises:
764+
ValueError: If neither update nor name is provided, or if update is not
765+
a ProcessorConfigDTO, or if processor is running and auto_stop=False.
754766
"""
755-
if not isinstance(update, nipyapi.nifi.ProcessorConfigDTO):
767+
if update is None and name is None:
768+
raise ValueError("Must provide 'update' (ProcessorConfigDTO) and/or 'name'")
769+
if update is not None and not isinstance(update, nipyapi.nifi.ProcessorConfigDTO):
756770
raise ValueError("update param is not an instance of nifi.ProcessorConfigDTO")
771+
757772
with nipyapi.utils.rest_exceptions():
758773
if refresh:
759774
processor = get_processor(processor.id, "id")
760-
return nipyapi.nifi.ProcessorsApi().update_processor(
775+
776+
was_running = processor.component.state == "RUNNING"
777+
778+
if was_running and not auto_stop:
779+
raise ValueError(
780+
f"Processor '{processor.component.name}' is running. "
781+
"Stop it first or set auto_stop=True."
782+
)
783+
784+
# Stop if running
785+
if was_running:
786+
schedule_processor(processor, scheduled=False, refresh=True)
787+
processor = get_processor(processor.id, "id")
788+
789+
# Build the update DTO with whatever fields are provided
790+
dto_kwargs = {"id": processor.component.id}
791+
if name is not None:
792+
dto_kwargs["name"] = name
793+
if update is not None:
794+
dto_kwargs["config"] = update
795+
796+
result = nipyapi.nifi.ProcessorsApi().update_processor(
761797
id=processor.id,
762798
body=nipyapi.nifi.ProcessorEntity(
763-
component=nipyapi.nifi.ProcessorDTO(config=update, id=processor.id),
799+
id=processor.id,
764800
revision=processor.revision,
801+
component=nipyapi.nifi.ProcessorDTO(**dto_kwargs),
765802
),
766803
)
767804

805+
# Restart if it was running
806+
if was_running:
807+
schedule_processor(result, scheduled=True, refresh=True)
808+
result = get_processor(result.id, "id")
809+
810+
return result
811+
768812

769813
def get_variable_registry(process_group, ancestors=True):
770814
"""

nipyapi/versioning.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,9 +1059,15 @@ def revert_flow_ver(process_group, wait=False):
10591059
assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
10601060

10611061
with nipyapi.utils.rest_exceptions():
1062+
# Refresh version control info to get current revision
1063+
vci = nipyapi.nifi.VersionsApi().get_version_information(process_group.id)
1064+
# Also get fresh process group revision
1065+
fresh_pg = nipyapi.canvas.get_process_group(process_group.id, "id")
1066+
vci.process_group_revision = fresh_pg.revision
1067+
10621068
revert_request = nipyapi.nifi.VersionsApi().initiate_revert_flow_version(
10631069
id=process_group.id,
1064-
body=nipyapi.nifi.VersionsApi().get_version_information(process_group.id),
1070+
body=vci,
10651071
)
10661072

10671073
if not wait:

tests/test_canvas.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -287,14 +287,39 @@ def test_delete_processor(fix_proc):
287287

288288

289289
def test_update_processor(fix_proc):
290-
# TODO: Add way more tests to this
290+
"""Test update_processor with config, name, and both."""
291291
f_p1 = fix_proc.generate()
292-
update = nifi.ProcessorConfigDTO(
293-
scheduling_period='3s'
294-
)
295-
r1 = canvas.update_processor(f_p1, update)
292+
original_name = f_p1.component.name
293+
294+
# Test config update (processor is stopped, no auto_stop needed)
295+
update = nifi.ProcessorConfigDTO(scheduling_period='3s')
296+
r1 = canvas.update_processor(f_p1, update=update)
297+
assert r1 is not None
298+
299+
# Test invalid update type
296300
with pytest.raises(ValueError, match='update param is not an instance'):
297-
_ = canvas.update_processor(f_p1, 'FakeNews')
301+
canvas.update_processor(f_p1, update='FakeNews')
302+
303+
# Test rename (processor is stopped, no auto_stop needed)
304+
new_name = original_name + '_RENAMED'
305+
r2 = canvas.update_processor(r1, name=new_name)
306+
assert r2.component.name == new_name
307+
308+
# Test rename back
309+
r3 = canvas.update_processor(r2, name=original_name)
310+
assert r3.component.name == original_name
311+
312+
# Test both config and name together
313+
update2 = nifi.ProcessorConfigDTO(scheduling_period='5s')
314+
r4 = canvas.update_processor(r3, update=update2, name=original_name + '_BOTH')
315+
assert r4.component.name == original_name + '_BOTH'
316+
317+
# Restore name
318+
canvas.update_processor(r4, name=original_name)
319+
320+
# Test error when neither update nor name provided
321+
with pytest.raises(ValueError, match="Must provide"):
322+
canvas.update_processor(f_p1)
298323

299324

300325
def test_purge_connection():

tests/test_versioning.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ def test_revert_flow_ver_wait_false(fix_deployed_git_flow):
617617

618618
def test_revert_flow_ver_already_up_to_date(fix_deployed_git_flow):
619619
"""Test revert on flow that's already UP_TO_DATE."""
620-
# Ensure flow is UP_TO_DATE
620+
# Ensure flow is UP_TO_DATE (function refreshes revision internally)
621621
vci = versioning.get_version_info(fix_deployed_git_flow.pg)
622622
assert vci.version_control_information.state == 'UP_TO_DATE'
623623

0 commit comments

Comments
 (0)