Skip to content

Commit 1b32c49

Browse files
lordgamezszaszm
authored andcommitted
MINIFICPP-2448 Add minifi.sh flowStatus command
Closes #1941 Signed-off-by: Marton Szasz <szaszm@apache.org>
1 parent 91d3c05 commit 1b32c49

29 files changed

+1412
-22
lines changed

OPS.md

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,25 @@
1919
This readme defines operational commands for managing instances.
2020

2121
## Table of Contents
22-
2322
- [Description](#description)
24-
- [Managing](#managing-minifi)
25-
- [Commands](#commands)
23+
- [Managing MiNiFi](#managing-minifi)
24+
- [SSL](#ssl)
25+
- [Commands](#commands)
26+
- [Specifying connecting information](#specifying-connecting-information)
27+
- [Start Command](#start-command)
28+
- [Stop command](#stop-command)
29+
- [List connections command](#list-connections-command)
30+
- [List components command](#list-components-command)
31+
- [Clear connection command](#clear-connection-command)
32+
- [GetSize command](#getsize-command)
33+
- [Update flow](#update-flow)
34+
- [Get full connection command](#get-full-connection-command)
35+
- [Get manifest command](#get-manifest-command)
36+
- [Flowstatus command](#flowstatus-command)
37+
- [Processor](#processor)
38+
- [Connection](#connection)
39+
- [Instance](#instance)
40+
- [System Diagnostics](#system-diagnostics)
2641

2742
## Description
2843

@@ -109,3 +124,57 @@ Provides a list of full connections, if any.
109124
./minifi-controller --manifest
110125

111126
Writes the agent manifest json to standard output
127+
128+
#### Flowstatus command
129+
./minificontroller --flowstatus "processor:TailFile:health,stats,bulletins"
130+
131+
The command returns the flow status for the specified query in JSON format.
132+
The query consists of the query type, the element identifier, and the query options. Each part is separated by the ':' colon character. Multiple query options are specified as a comma-separated list. In some query types the identifier is omitted, in this case only the query type and the query options are specified. Multiple queries can also be specified in a flowStatus command, in this case the queries are separated by the ';' semicolon character. For example: `./minificontroller --flowstatus "processor:TailFile:health,stats,bulletins;processor:LogAttribute:stats"`
133+
134+
Supported query types:
135+
136+
##### Processor
137+
138+
To query the processors, use the `processor` flag and specify the processor (by ID, name or "all") followed by one of the processor options. The processor options are below:
139+
140+
- health: The processor's run status, whether or not it has bulletins.
141+
- bulletins: A list of all the current bulletins (if there are any).
142+
- stats: The current stats of the processor.
143+
144+
An example query to get the health and stats of the "GenerateFlowFile" processor is below.
145+
146+
`./minificontroller --flowstatus "processor:GenerateFlowFile:health,stats"`
147+
148+
##### Connection
149+
150+
To query the connections, use the `connection` flag and specify the connection (by ID, name or "all") followed by one of the connection options. The connection options are below:
151+
152+
- health: The processor's run status, whether or not it has bulletins.
153+
154+
An example query to get the health and stats of the "Connection1" connection is below.
155+
156+
`./minificontroller --flowstatus "connection:Connection1:health"`
157+
158+
##### Instance
159+
160+
To query the status of the MiNiFi instance, use the `instance` flag followed by one of the instance options. The instance options are below.
161+
162+
- health: The instance reporting the aggregated state of the connections, and whether or not it has bulletins.
163+
- bulletins: A list of all the current bulletins (if there are any).
164+
- stats: The aggregated stats of all processors (bytes read, written, transferred, and flowfiles transferred).
165+
166+
An example query to get the all the statuses of the instance is below.
167+
168+
`./minificontroller --flowstatus "instance:health,stats,bulletins"`
169+
170+
##### System Diagnostics
171+
172+
To query the system diagnostics, use the `systemdiagnostics` flag followed by one of the system diagnostics options. The system diagnostics options are below.
173+
174+
- processorstats: The system processor stats. This includes the available processors and load average.
175+
- contentrepositoryusage: Disk usage stats on the partition or volume where the content repository is located.
176+
- flowfilerepositoryusage: Disk usage stats on the partition or volume where the flowfile repository is located.
177+
178+
An example query to get the processor stats, content repository usage and FlowFile repository usage from the system diagnostics is below.
179+
180+
`./minificontroller --flowStatus "systemdiagnostics:processorstats,contentrepositoryusage,flowfilerepositoryusage"`

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,19 @@ MiNiFi can then be stopped by issuing:
521521

522522
$ ./bin/minifi.sh stop
523523

524+
### Query flow status
525+
526+
To query the status of the flow, you can use the following command on Unix systems:
527+
528+
$ ./bin/minifi.sh flowStatus [host:optional] [port:optional] "<query>"
529+
530+
On Windows systems, you can use the following command:
531+
532+
$ ./bin/flowstatus-minifi.bat [host:optional] [port:optional] "<query>"
533+
534+
The query can look like the following: "processor:TailFile:health,stats,bulletins". For more information on the query syntax and options, please see the [flow status documentation](OPS.md#Flowstatus-command).
535+
Note: The command requires minifi controller to be enabled in the minifi.properties file.
536+
524537
### Running as a docker container
525538
You can use the officially released image pulled from the [apache/nifi-minifi-cpp](https://hub.docker.com/r/apache/nifi-minifi-cpp) repository on dockerhub or you can use your locally built image.
526539
The container can be run with a specific configuration by mounting the locally edited configuration files to your docker container.

bin/flowstatus-minifi.bat

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
@echo off &setlocal enabledelayedexpansion
2+
rem
3+
rem Licensed to the Apache Software Foundation (ASF) under one or more
4+
rem contributor license agreements. See the NOTICE file distributed with
5+
rem this work for additional information regarding copyright ownership.
6+
rem The ASF licenses this file to You under the Apache License, Version 2.0
7+
rem (the "License"); you may not use this file except in compliance with
8+
rem the License. You may obtain a copy of the License at
9+
rem
10+
rem http://www.apache.org/licenses/LICENSE-2.0
11+
rem
12+
rem Unless required by applicable law or agreed to in writing, software
13+
rem distributed under the License is distributed on an "AS IS" BASIS,
14+
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
rem See the License for the specific language governing permissions and
16+
rem limitations under the License.
17+
rem
18+
19+
set "bin_dir=%~dp0"
20+
21+
if not exist "%bin_dir%\minificontroller.exe" (
22+
echo MiNiFi Controller is not installed
23+
exit /b
24+
)
25+
26+
if "%~1"=="" (
27+
echo MiNiFi flowStatus operation requires a flow status query parameter like "processor:TailFile:health,stats,bulletins"
28+
goto usage
29+
)
30+
31+
if "%~2"=="" (
32+
"%bin_dir%\minificontroller.exe" --flowstatus "%~1"
33+
exit /b
34+
)
35+
36+
if "%~3"=="" (
37+
"%bin_dir%\minificontroller.exe" --port "%~1" --flowstatus "%~2"
38+
exit /b
39+
)
40+
41+
"%bin_dir%\minificontroller.exe" --host "%~1" --port "%~2" --flowstatus "%~3"
42+
exit /b
43+
44+
:usage
45+
echo Usage: flowStatus.bat [^<host^>] [^<port^>] ^<flowstatus^>
46+
exit /b

bin/minifi.sh

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,25 @@ status_service() {
183183
fi
184184
}
185185

186+
flowStatus() {
187+
if ! [ -f "${bin_dir}/minificontroller" ]; then
188+
echo "MiNiFi Controller is not installed"
189+
return
190+
fi
191+
if [ "$#" -lt 2 ]; then
192+
echo "MiNiFi flowStatus operation requires a flow status query parameter like \"processor:TailFile:health,stats,bulletins\""
193+
return
194+
fi
195+
196+
if [ "$#" -lt 3 ]; then
197+
exec "${bin_dir}/minificontroller" --flowstatus "$2"
198+
elif [ "$#" -lt 4 ]; then
199+
exec "${bin_dir}/minificontroller" --port "$2" --flowstatus "$3"
200+
else
201+
exec "${bin_dir}/minificontroller" --host "$2" --port "$3" --flowstatus "$4"
202+
fi
203+
}
204+
186205
case "$1" in
187206
start)
188207
start_service
@@ -207,7 +226,10 @@ case "$1" in
207226
uninstall "$@"
208227
echo "Service minifi uninstalled. Please remove the ${MINIFI_HOME} directory manually."
209228
;;
229+
flowStatus)
230+
flowStatus "$@"
231+
;;
210232
*)
211-
echo "Usage: minifi.sh {start|stop|run|restart|status|install|uninstall}"
233+
echo "Usage: minifi.sh {start|stop|run|restart|status|install|uninstall|flowStatus}"
212234
;;
213235
esac

controller/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,6 @@ if (NOT WIN32)
6363
endif()
6464

6565
install(TARGETS minifi-controller RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT bin)
66+
if(WIN32)
67+
install(FILES "${CMAKE_SOURCE_DIR}/bin/flowstatus-minifi.bat" DESTINATION bin COMPONENT bin)
68+
endif()

controller/Controller.cpp

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
namespace org::apache::nifi::minifi::controller {
3434

3535
bool sendSingleCommand(const utils::net::SocketData& socket_data, uint8_t op, const std::string& value) {
36-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
36+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
3737
if (connection_stream->initialize() < 0) {
3838
return false;
3939
}
@@ -56,7 +56,7 @@ bool clearConnection(const utils::net::SocketData& socket_data, const std::strin
5656
}
5757

5858
bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& file) {
59-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
59+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
6060
if (connection_stream->initialize() < 0) {
6161
return false;
6262
}
@@ -75,7 +75,7 @@ bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, co
7575
uint16_t connections = 0;
7676
connection_stream->read(connections);
7777
out << connections << " are full" << std::endl;
78-
for (int i = 0; i < connections; i++) {
78+
for (uint16_t i = 0; i < connections; i++) {
7979
std::string fullcomponent;
8080
connection_stream->read(fullcomponent);
8181
out << fullcomponent << " is full" << std::endl;
@@ -85,7 +85,7 @@ bool updateFlow(const utils::net::SocketData& socket_data, std::ostream &out, co
8585
}
8686

8787
bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream &out) {
88-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
88+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
8989
if (connection_stream->initialize() < 0) {
9090
return false;
9191
}
@@ -103,7 +103,7 @@ bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream
103103
uint16_t connections = 0;
104104
connection_stream->read(connections);
105105
out << connections << " are full" << std::endl;
106-
for (int i = 0; i < connections; i++) {
106+
for (uint16_t i = 0; i < connections; i++) {
107107
std::string fullcomponent;
108108
connection_stream->read(fullcomponent);
109109
out << fullcomponent << " is full" << std::endl;
@@ -113,7 +113,7 @@ bool getFullConnections(const utils::net::SocketData& socket_data, std::ostream
113113
}
114114

115115
bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &out, const std::string& connection) {
116-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
116+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
117117
if (connection_stream->initialize() < 0) {
118118
return false;
119119
}
@@ -137,7 +137,7 @@ bool getConnectionSize(const utils::net::SocketData& socket_data, std::ostream &
137137
}
138138

139139
bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
140-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
140+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
141141
if (connection_stream->initialize() < 0) {
142142
return false;
143143
}
@@ -154,7 +154,7 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out
154154
if (show_header)
155155
out << "Components:" << std::endl;
156156

157-
for (int i = 0; i < responses; i++) {
157+
for (uint16_t i = 0; i < responses; i++) {
158158
std::string name;
159159
connection_stream->read(name, false);
160160
std::string status;
@@ -165,7 +165,7 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out
165165
}
166166

167167
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header) {
168-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
168+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
169169
if (connection_stream->initialize() < 0) {
170170
return false;
171171
}
@@ -182,7 +182,7 @@ bool listConnections(const utils::net::SocketData& socket_data, std::ostream &ou
182182
if (show_header)
183183
out << "Connection Names:" << std::endl;
184184

185-
for (int i = 0; i < responses; i++) {
185+
for (uint16_t i = 0; i < responses; i++) {
186186
std::string name;
187187
connection_stream->read(name, false);
188188
out << name << std::endl;
@@ -191,7 +191,7 @@ bool listConnections(const utils::net::SocketData& socket_data, std::ostream &ou
191191
}
192192

193193
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out) {
194-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
194+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
195195
if (connection_stream->initialize() < 0) {
196196
return false;
197197
}
@@ -210,7 +210,7 @@ bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out)
210210
}
211211

212212
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
213-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
213+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
214214
if (connection_stream->initialize() < 0) {
215215
return false;
216216
}
@@ -229,7 +229,7 @@ bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out) {
229229
}
230230

231231
nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir) {
232-
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
232+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
233233
if (connection_stream->initialize() < 0) {
234234
return nonstd::make_unexpected("Could not connect to remote host " + socket_data.host + ":" + std::to_string(socket_data.port));
235235
}
@@ -267,4 +267,24 @@ nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData&
267267
return {};
268268
}
269269

270+
bool getFlowStatus(const utils::net::SocketData& socket_data, const std::string& status_query, std::ostream &out) {
271+
const auto connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
272+
if (connection_stream->initialize() < 0) {
273+
return false;
274+
}
275+
io::BufferStream buffer;
276+
auto op = static_cast<uint8_t>(c2::Operation::describe);
277+
buffer.write(&op, 1);
278+
buffer.write("flowstatus");
279+
buffer.write(status_query);
280+
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
281+
return false;
282+
}
283+
connection_stream->read(op);
284+
std::string manifest;
285+
connection_stream->read(manifest, true);
286+
out << manifest << std::endl;
287+
return true;
288+
}
289+
270290
} // namespace org::apache::nifi::minifi::controller

controller/Controller.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out
3636
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
3737
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out);
3838
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);
39+
bool getFlowStatus(const utils::net::SocketData& socket_data, const std::string& status_query, std::ostream &out);
3940
nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir);
4041

4142
} // namespace org::apache::nifi::minifi::controller

controller/MiNiFiController.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ int main(int argc, char **argv) {
151151
argument_parser.add_argument("--updateflow")
152152
.metavar("FLOW_CONFIG_PATH")
153153
.help("Updates the flow of the agent using the provided flow file");
154+
argument_parser.add_argument("--flowstatus")
155+
.metavar("FLOW_STATUS_QUERY")
156+
.help("Returns flow status for the provided query");
154157

155158
auto addFlagOption = [&](std::string_view name, const std::string& help) {
156159
argument_parser.add_argument(name)
@@ -275,6 +278,12 @@ int main(int argc, char **argv) {
275278
else
276279
std::cout << "Debug bundle written to " << std::filesystem::path(*debug_path) / "debug.tar.gz";
277280
}
281+
282+
if (const auto& status_query = argument_parser.present("--flowstatus")) {
283+
if (!minifi::controller::getFlowStatus(socket_data, *status_query, std::cout)) {
284+
std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
285+
}
286+
}
278287
} catch (const std::exception &exc) {
279288
// catch anything thrown within try block that derives from std::exception
280289
std::cerr << exc.what() << std::endl;

0 commit comments

Comments
 (0)