Skip to content

Commit d652fe3

Browse files
committed
Core: new handling of termination requests [WIP]
TBD: disable passing IPFIX and Session messages to plugin instances in context.c
1 parent cbc4418 commit d652fe3

File tree

14 files changed

+1405
-91
lines changed

14 files changed

+1405
-91
lines changed

src/core/configurator/configurator.cpp

Lines changed: 302 additions & 36 deletions
Large diffs are not rendered by default.

src/core/configurator/configurator.hpp

Lines changed: 91 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
* \file src/core/configurator/configurator.hpp
33
* \author Lukas Hutak <[email protected]>
44
* \brief Main pipeline configurator (header file)
5-
* \date 2018
5+
* \date 2018-2020
66
*/
77

8-
/* Copyright (C) 2018 CESNET, z.s.p.o.
8+
/* Copyright (C) 2018-2020 CESNET, z.s.p.o.
99
*
1010
* Redistribution and use in source and binary forms, with or without
1111
* modification, are permitted provided that the following conditions
@@ -52,33 +52,15 @@
5252
#include "instance_outmgr.hpp"
5353
#include "instance_output.hpp"
5454
#include "plugin_mgr.hpp"
55+
#include "controller.hpp"
5556

5657
extern "C" {
5758
#include <ipfixcol2.h>
5859
#include "../context.h"
5960
}
6061

61-
/** Main configurator of the internal pipeline */
62+
// Main configurator of the internal pipeline
6263
class ipx_configurator {
63-
private:
64-
/** Size of ring buffers */
65-
uint32_t ring_size;
66-
/** Directory with definitions of Information Elements */
67-
std::string iemgr_dir;
68-
69-
/** Manager of Information Elements */
70-
fds_iemgr_t *iemgr;
71-
/** Vector of running instances of input plugins */
72-
std::vector<std::unique_ptr<ipx_instance_input> > running_inputs;
73-
/** Vector of running instances of intermediate plugins */
74-
std::vector<std::unique_ptr<ipx_instance_intermediate> > running_inter;
75-
/** Vector of running instances of output plugins */
76-
std::vector<std::unique_ptr<ipx_instance_output> > running_outputs;
77-
78-
void model_check(const ipx_config_model &model);
79-
fds_iemgr_t *iemgr_load(const std::string dir);
80-
enum ipx_verb_level verbosity_str2level(const std::string &verb);
81-
8264
public:
8365
/** Minimal size of ring buffers between instances of plugins */
8466
static constexpr uint32_t RING_MIN_SIZE = 128;
@@ -94,37 +76,100 @@ class ipx_configurator {
9476
ipx_configurator(const ipx_configurator &) = delete;
9577
ipx_configurator& operator=(const ipx_configurator &) = delete;
9678

97-
/** Plugin manager */
79+
/** Plugin manager (can be modified by a user i.e. add paths, change behaviour, etc) */
9880
ipx_plugin_mgr plugins;
9981

10082
/**
101-
* \brief Start the configuration with a new model
102-
*
103-
* Check if the model is valid (consists of at least one input and one output instance,
104-
* etc.) and load definitions of Information Elements (i.e. iemgr_set_dir() must be defined).
105-
* Then try to load all plugins, initialized them and, finally, start their execution threads.
106-
*
107-
* \param[in] model Configuration model (description of input/intermediate/output instances)
108-
* \throw runtime_error in case of any error, e.g. failed to find plugins, failed to load
109-
* definitions of Information Elements, failed to initialize an instance, etc.
83+
* @brief Define a path to the directory of Information Elements definitions
84+
* @param[in] path Path
11085
*/
111-
void start(const ipx_config_model &model);
112-
113-
/**
114-
* \brief Stop and destroy all instances of all plugins
115-
*/
116-
void stop();
86+
void
87+
iemgr_set_dir(const std::string &path);
88+
/**
89+
* @brief Define a size of ring buffers
90+
* @param[in] size Size
91+
*/
92+
void
93+
set_buffer_size(uint32_t size);
11794

118-
/**
119-
* \brief Define a path to the directory of Information Elements definitions
120-
* \param[in] path Path
121-
*/
122-
void iemgr_set_dir(const std::string &path);
12395
/**
124-
* \brief Define a size of ring buffers
125-
* \param[in] size Size
96+
* @brief Run the collector based on a configuration from the controller
97+
*
98+
* @note
99+
* Return code depends on the way how the collector was terminated. For example, if the
100+
* termination was caused by invalid configuration of a plugin, memory allocation during
101+
* processing, etc. return code corresponds to EXIT_FAILURE value.
102+
* However, if the collector was terminated by a plugin, which informed the configurator
103+
* that there are no more flow data for processing, return value is EXIT_SUCCESS.
104+
*
105+
* @param[in] ctrl Configuration controller
106+
* @return Return code of the application
126107
*/
127-
void set_buffer_size(uint32_t size);
108+
int
109+
run(ipx_controller *ctrl);
110+
111+
private:
112+
/// State type
113+
enum class STATUS {
114+
RUNNING, ///< No configuration change in progress
115+
STOP_SLOW, ///< Stop stop in progress
116+
STOP_FAST, ///< Fast stop in progress
117+
} m_state; ///< Configuration state
118+
119+
/** Size of ring buffers */
120+
uint32_t m_ring_size;
121+
/** Directory with definitions of Information Elements */
122+
std::string m_iemgr_dir;
123+
124+
/** Manager of Information Elements */
125+
fds_iemgr_t *m_iemgr;
126+
/** Vector of running instances of input plugins */
127+
std::vector<std::unique_ptr<ipx_instance_input> > m_running_inputs;
128+
/** Vector of running instances of intermediate plugins */
129+
std::vector<std::unique_ptr<ipx_instance_intermediate> > m_running_inter;
130+
/** Vector of running instances of output plugins */
131+
std::vector<std::unique_ptr<ipx_instance_output> > m_running_outputs;
132+
133+
// Internal functions
134+
void
135+
model_check(const ipx_config_model &model);
136+
fds_iemgr_t *
137+
iemgr_load(const std::string dir);
138+
enum ipx_verb_level
139+
verbosity_str2level(const std::string &verb);
140+
141+
void
142+
startup(const ipx_config_model &model);
143+
void
144+
cleanup();
145+
146+
bool
147+
termination_handle(const struct ipx_cpipe_req &req, ipx_controller *ctrl);
148+
void
149+
termination_send_msg();
150+
151+
void
152+
termination_stop_all();
153+
void
154+
termination_stop_partly(const ipx_ctx_t *ctx);
128155
};
129156

130157
#endif //IPFIXCOL_CONFIGURATOR_H
158+
159+
///**
160+
// * \brief Start the configuration with a new model
161+
// *
162+
// * Check if the model is valid (consists of at least one input and one output instance,
163+
// * etc.) and load definitions of Information Elements (i.e. iemgr_set_dir() must be defined).
164+
// * Then try to load all plugins, initialized them and, finally, start their execution threads.
165+
// *
166+
// * \param[in] model Configuration model (description of input/intermediate/output instances)
167+
// * \throw runtime_error in case of any error, e.g. failed to find plugins, failed to load
168+
// * definitions of Information Elements, failed to initialize an instance, etc.
169+
// */
170+
//void start(const ipx_config_model &model);
171+
//
172+
///**
173+
// * \brief Stop and destroy all instances of all plugins
174+
// */
175+
//void stop();
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/**
2+
* @file src/core/configurator/controller.hpp
3+
* @author Lukas Hutak <[email protected]>
4+
* @brief Abstract class for configuration controllers (header file)
5+
* @date 2020
6+
*
7+
* Copyright(c) 2020 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#ifndef IPFIXCOL2_CONTROLLER_BASE
12+
#define IPFIXCOL2_CONTROLLER_BASE
13+
14+
#include <string>
15+
#include <stdexcept>
16+
17+
#include "model.hpp"
18+
19+
extern "C" {
20+
#include "../verbose.h"
21+
#include "cpipe.h"
22+
}
23+
24+
/**
25+
* @brief Abstract class for configuration controllers
26+
*
27+
* The purpose of this class is to provide common API for configuration and notification
28+
* about changes in the configuration of the collector. A controller implementation, which
29+
* has to provide at least a configuration model getter, should be implemented as a wrapper over
30+
* configuration stored in a file, database, SysRepo, etc.
31+
*
32+
* @note
33+
* By default, the pipeline configurator, which is using this controller, automatically
34+
* registers termination signal handlers for SIGINT and SIGTERM. If necessary, the controller
35+
* is supposed to register additional handlers in start_before().
36+
*/
37+
class ipx_controller {
38+
public:
39+
/// Forward declaration of a controller error
40+
class error;
41+
42+
/// Configuration status code
43+
enum class OP_STATUS {
44+
SUCCESS, ///< Configuration operation has been successfully applied
45+
FAILED ///< Configuration operation has failed (no changes)
46+
};
47+
48+
/**
49+
* @brief Get configuration model of the collector
50+
*
51+
* The function is called to get the current configuration model during startup
52+
* or reconfiguration procedure (if supported by the controller).
53+
* @note It is guaranteed that this function is called for the first time after start_before()
54+
* function.
55+
* @return Configuration model
56+
* @throw ipx_controller::error in case of failure (e.g. unable to get the model)
57+
*/
58+
virtual ipx_config_model
59+
model_get() = 0;
60+
61+
/**
62+
* @brief Function called before start of the collector
63+
*
64+
* The controller might initialize its internal structures (for example, establish connection
65+
* to a database, configuration repository, etc.) and register additional signal handlers.
66+
* @throw ipx_controller::error in case of failure
67+
*/
68+
virtual void
69+
start_before()
70+
{
71+
IPX_DEBUG(m_name, "Configuration process has started...", '\0');
72+
};
73+
74+
/**
75+
* @brief Function called after start of the collector
76+
* @note
77+
* If startup fails, termination functions i.e. terminate_before() and terminate_after()
78+
* are not called!
79+
* @param[in] status Operation status (success/failure)
80+
* @param[in] msg Human readable result (usually a description of what went wrong)
81+
* @note The function should not throw any exception!
82+
*/
83+
virtual void
84+
start_after(OP_STATUS status, std::string msg)
85+
{
86+
switch (status) {
87+
case OP_STATUS::SUCCESS:
88+
IPX_INFO(m_name, "Collector started successfully!", '\0');
89+
break;
90+
case OP_STATUS::FAILED:
91+
IPX_ERROR(m_name, "Collector failed to start: %s", msg.c_str());
92+
break;
93+
}
94+
};
95+
96+
/**
97+
* @brief Function is called when a termination request is received
98+
*
99+
* The termination process itself cannot be stopped as there might be situations that
100+
* cannot be solved by the controller, such as a memory allocation error in a instance
101+
* of pipeline plugin etc.
102+
*
103+
* @note
104+
* This function might be called multiple times before termination is complete. It can
105+
* happen, for example, if another termination request is received before the termination
106+
* process is complete.
107+
* @note
108+
* The function is called only if the collector has been previously successfully initialized.
109+
* @note The function should not throw any exception!
110+
* @param[in] msg Contains information about the reason/source of termination
111+
*/
112+
virtual void
113+
terminate_on_request(const struct ipx_cpipe_req &req, std::string msg)
114+
{
115+
(void) req;
116+
IPX_INFO(m_name, "Received a termination request (%s)!", msg.c_str());
117+
};
118+
119+
/**
120+
* @brief Function called after termination
121+
*
122+
* All plugin instances have been already stopped and their context has been destroyed.
123+
* The controller might destroy its internal structures here.
124+
* @note The function should not throw any exception!
125+
*/
126+
virtual void
127+
terminate_after()
128+
{
129+
IPX_DEBUG(m_name, "Termination process completed!", '\0');
130+
};
131+
132+
private:
133+
const char *m_name = "Configurator";
134+
};
135+
136+
/**
137+
* @brief Controller custom error
138+
*/
139+
class ipx_controller::error : public std::runtime_error {
140+
public:
141+
/**
142+
* \brief Manager error constructor (string constructor)
143+
* \param[in] msg An error message
144+
*/
145+
explicit error(const std::string &msg) : std::runtime_error(msg) {};
146+
/**
147+
* \brief Manager error constructor (char constructor)
148+
* \param[in] msg An error message
149+
*/
150+
explicit error(const char *msg) : std::runtime_error(msg) {};
151+
};
152+
153+
#endif // IPFIXCOL2_CONTROLLER_BASE

0 commit comments

Comments
 (0)