Skip to content

Commit 7826bd1

Browse files
committed
NIFI-14157: Allow InvokeScriptedProcessor scripts to implement OnPrimaryNodeStateChange
1 parent 4795805 commit 7826bd1

File tree

4 files changed

+99
-1
lines changed

4 files changed

+99
-1
lines changed

nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
3131
import org.apache.nifi.annotation.lifecycle.OnScheduled;
3232
import org.apache.nifi.annotation.lifecycle.OnStopped;
33+
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
34+
import org.apache.nifi.annotation.notification.PrimaryNodeState;
3335
import org.apache.nifi.components.PropertyDescriptor;
3436
import org.apache.nifi.components.RequiredPermission;
3537
import org.apache.nifi.components.ValidationContext;
@@ -212,7 +214,6 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
212214
public void setup(final ProcessContext context) {
213215
scriptingComponentHelper.setupVariables(context);
214216
setup();
215-
216217
invokeScriptedProcessorMethod("onScheduled", context);
217218
}
218219

@@ -232,6 +233,12 @@ public void setup() {
232233
}
233234
}
234235

236+
@OnPrimaryNodeStateChange
237+
public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
238+
239+
invokeScriptedProcessorMethod("onPrimaryNodeStateChange", newState);
240+
}
241+
235242
/**
236243
* Handles changes to this processor's properties. If changes are made to
237244
* script- or engine-related properties, the script will be reloaded.

nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,31 @@ public void testReadRecordsWithRecordPath() throws Exception {
241241
ff.assertContentEquals("48\n47\n14\n");
242242
}
243243

244+
/**
245+
* Tests a script that has a Groovy Processor that implements its own onPrimaryNodeStateChange
246+
*
247+
* @throws Exception Any error encountered while testing
248+
*/
249+
@Test
250+
public void testOnPrimaryNodeStateChange() {
251+
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
252+
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_OnPrimaryStateChange.groovy");
253+
runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy");
254+
InvokeScriptedProcessor invokeScriptedProcessor = ((InvokeScriptedProcessor) scriptingComponent);
255+
invokeScriptedProcessor.setup(runner.getProcessContext());
256+
runner.setIsConfiguredForClustering(true);
257+
runner.run(1, false, true);
258+
runner.setPrimaryNode(true);
259+
runner.clearTransferState();
260+
runner.run(1, true, false);
261+
runner.assertAllFlowFilesTransferred("success");
262+
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship("success");
263+
assertNotNull(flowFiles);
264+
assertEquals(1, flowFiles.size());
265+
MockFlowFile flowFile = flowFiles.get(0);
266+
flowFile.assertAttributeEquals("isPrimaryNode", "true");
267+
}
268+
244269
private static class OverrideInvokeScriptedProcessor extends InvokeScriptedProcessor {
245270

246271
private int numTimesModifiedCalled = 0;
@@ -258,4 +283,8 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
258283
numTimesModifiedCalled++;
259284
}
260285
}
286+
287+
private static class OnPrimaryNodeStateChangeMethodWasCalledException extends RuntimeException {
288+
289+
}
261290
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange
20+
import org.apache.nifi.annotation.notification.PrimaryNodeState
21+
import org.apache.nifi.processor.AbstractProcessor
22+
import org.apache.nifi.processor.ProcessContext
23+
import org.apache.nifi.processor.ProcessSession
24+
import org.apache.nifi.processor.Relationship
25+
26+
27+
class MyRecordProcessor extends AbstractProcessor {
28+
29+
def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build()
30+
def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build()
31+
32+
static boolean primaryNode = false
33+
34+
@OnPrimaryNodeStateChange
35+
void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
36+
primaryNode = true
37+
}
38+
39+
@Override
40+
Set<Relationship> getRelationships() {
41+
[REL_SUCCESS, REL_FAILURE] as Set<Relationship>
42+
}
43+
44+
@Override
45+
void onTrigger(ProcessContext context, ProcessSession session) {
46+
def flowFile = session.create()
47+
session.putAttribute(flowFile, 'isPrimaryNode', primaryNode.toString())
48+
session.transfer(flowFile, REL_SUCCESS)
49+
}
50+
}
51+
52+
processor = new MyRecordProcessor()

nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.nifi.annotation.lifecycle.OnShutdown;
2727
import org.apache.nifi.annotation.lifecycle.OnStopped;
2828
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
29+
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
30+
import org.apache.nifi.annotation.notification.PrimaryNodeState;
2931
import org.apache.nifi.components.DescribedValue;
3032
import org.apache.nifi.components.PropertyDescriptor;
3133
import org.apache.nifi.components.ValidationContext;
@@ -979,6 +981,14 @@ public void setIsConfiguredForClustering(final boolean isConfiguredForClustering
979981

980982
@Override
981983
public void setPrimaryNode(boolean primaryNode) {
984+
if (context.isPrimary() != primaryNode) {
985+
try {
986+
ReflectionUtils.invokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, processor,
987+
primaryNode ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED);
988+
} catch (final Exception e) {
989+
Assertions.fail("Could not invoke methods annotated with @OnPrimaryNodeStateChange annotation due to: " + e);
990+
}
991+
}
982992
context.setPrimaryNode(primaryNode);
983993
}
984994

0 commit comments

Comments
 (0)