diff --git a/src/plugins/output/json-kafka/CMakeLists.txt b/src/plugins/output/json-kafka/CMakeLists.txt index 900fe757..ccc6224f 100644 --- a/src/plugins/output/json-kafka/CMakeLists.txt +++ b/src/plugins/output/json-kafka/CMakeLists.txt @@ -7,16 +7,20 @@ add_library(json-kafka-output MODULE src/Storage.hpp src/Kafka.cpp src/Kafka.hpp + src/pattern-matching/HyperscanCppInterface.hh + src/pattern-matching/PatternMatching.hh ) find_package(LibRDKafka 0.9.3 REQUIRED) find_package(ZLIB REQUIRED) +find_package(HS MODULE REQUIRED) include_directories( ${LIBRDKAFKA_INCLUDE_DIRS} # librdkafka ) target_link_libraries(json-kafka-output ${LIBRDKAFKA_LIBRARIES} + ${HS_LIBRARIES} ) install( diff --git a/src/plugins/output/json-kafka/README.rst b/src/plugins/output/json-kafka/README.rst index 9edee258..9565acab 100644 --- a/src/plugins/output/json-kafka/README.rst +++ b/src/plugins/output/json-kafka/README.rst @@ -104,9 +104,12 @@ Don't forget to remove (or comment) outputs that you don't want to use! Send to Kafka 127.0.0.1 - ipfix false - unassigned + + * + ipfix + unassigned + @@ -188,11 +191,13 @@ at the same time if the outputs are not in collision with each other. :``name``: Identification name of the output. Used only for readability. :``brokers``: Initial list of brokers as a CSV list of broker "host" or "host:port". - :``topic``: - Kafka topic to produce to. - :``partition``: - Partition number to produce to. If the value is "unassigned", then the default random - distribution is used. [default: "unassigned"] + :``patternTopic``: + This section determines the output topic of ipfix message with a pattern. + In order to load balance or separate output topics of ipfix messages, specify the + `regex`, `topic`, and `partition` [default: "unassigned"]. This means, that + if `regex` is matched in the ipfix message, the message is produced to the + `topic`:`partition` in Kafka. It is possible to define more than one pattern, + if none of the `regex` patterns is matched, the first topic will be selected. :``brokerVersion``: Older broker versions (before 0.10.0) provide no way for a client to query for supported protocol features making it impossible for the client to know what features diff --git a/src/plugins/output/json-kafka/src/Config.cpp b/src/plugins/output/json-kafka/src/Config.cpp index 42a021b8..8bb4117a 100644 --- a/src/plugins/output/json-kafka/src/Config.cpp +++ b/src/plugins/output/json-kafka/src/Config.cpp @@ -72,14 +72,16 @@ enum params_xml_nodes { // Kafka output KAFKA_NAME, /**< Name of the output */ KAFKA_BROKERS, /**< List of brokers */ - KAFKA_TOPIC, /**< Topic */ - KAFKA_PARTION, /**< Producer partition */ KAFKA_BVERSION, /**< Broker fallback version */ KAFKA_BLOCKING, /**< Block when queue is full */ KAFKA_PERF_TUN, /**< Add performance tuning options */ KAFKA_PROPERTY, /**< Additional librdkafka property */ KAFKA_PROP_KEY, /**< Property key */ KAFKA_PROP_VALUE, /**< Property value */ + KAFKA_PATTERN, /**< Regex patterns to determine output topic */ + KAFKA_PATTERN_REGEX, /**< Pattern regex */ + KAFKA_PATTERN_TOPIC, /**< Pattern output topic */ + KAFKA_PATTERN_PARTITION, /**< pattern output topic partition */ }; /** Definition of the \ of \ node */ @@ -89,16 +91,23 @@ static const struct fds_xml_args args_kafka_prop[] = { FDS_OPTS_END }; +/** Definition of the \ of \ node */ +static const struct fds_xml_args args_kafka_pattern_topic[] = { + FDS_OPTS_ELEM(KAFKA_PATTERN_REGEX, "regex", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(KAFKA_PATTERN_TOPIC, "topic", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(KAFKA_PATTERN_PARTITION, "partition", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), + FDS_OPTS_END +}; + /** Definition of the \ node */ static const struct fds_xml_args args_kafka[] = { FDS_OPTS_ELEM(KAFKA_NAME, "name", FDS_OPTS_T_STRING, 0), FDS_OPTS_ELEM(KAFKA_BROKERS, "brokers", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(KAFKA_TOPIC, "topic", FDS_OPTS_T_STRING, 0), - FDS_OPTS_ELEM(KAFKA_PARTION, "partition", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), FDS_OPTS_ELEM(KAFKA_BVERSION, "brokerVersion", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT), FDS_OPTS_ELEM(KAFKA_BLOCKING, "blocking", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), FDS_OPTS_ELEM(KAFKA_PERF_TUN, "performanceTuning", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT), FDS_OPTS_NESTED(KAFKA_PROPERTY, "property", args_kafka_prop, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), + FDS_OPTS_NESTED(KAFKA_PATTERN, "patternTopic", args_kafka_pattern_topic, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), FDS_OPTS_END }; @@ -203,6 +212,51 @@ Config::parse_kafka_property(struct cfg_kafka &kafka, fds_xml_ctx_t *property) kafka.properties.emplace(key, value); } +void +Config::parse_kafka_pattern(struct cfg_kafka &kafka, fds_xml_ctx_t *pattern) +{ + std::string regex, topic; + int32_t partition = RD_KAFKA_PARTITION_UA; + + // For partition parser + int32_t value; + char aux; + + const struct fds_xml_cont *content; + while (fds_xml_next(pattern, &content) != FDS_EOC) { + switch (content->id) { + case KAFKA_PATTERN_REGEX: + assert(content->type == FDS_OPTS_T_STRING); + regex = content->ptr_string; + break; + case KAFKA_PATTERN_TOPIC: + assert(content->type == FDS_OPTS_T_STRING); + topic = content->ptr_string; + break; + case KAFKA_PATTERN_PARTITION: + assert(content->type == FDS_OPTS_T_STRING); + if (strcasecmp(content->ptr_string, "unassigned") == 0) { + partition = RD_KAFKA_PARTITION_UA; + break; + } + + if (sscanf(content->ptr_string, "%" SCNi32 "%c", &value, &aux) != 1 || value < 0) { + throw std::invalid_argument("Invalid partition number of a output!"); + } + partition = value; + break; + default: + throw std::invalid_argument("Unexpected element within !"); + } + } + + if (regex.empty() || topic.empty()) { + throw std::invalid_argument("pattern key of a output cannot be empty!"); + } + + kafka.pattern_topics.emplace_back(cfg_pattern_topic{ regex, topic, partition }); +} + /** * \brief Parse "kafka" output parameters * @@ -215,7 +269,6 @@ Config::parse_kafka(fds_xml_ctx_t *kafka) { // Prepare default values struct cfg_kafka output; - output.partition = RD_KAFKA_PARTITION_UA; output.blocking = false; output.perf_tuning = true; @@ -234,22 +287,6 @@ Config::parse_kafka(fds_xml_ctx_t *kafka) assert(content->type == FDS_OPTS_T_STRING); output.brokers = content->ptr_string; break; - case KAFKA_TOPIC: - assert(content->type == FDS_OPTS_T_STRING); - output.topic = content->ptr_string; - break; - case KAFKA_PARTION: - assert(content->type == FDS_OPTS_T_STRING); - if (strcasecmp(content->ptr_string, "unassigned") == 0) { - output.partition = RD_KAFKA_PARTITION_UA; - break; - } - - if (sscanf(content->ptr_string, "%" SCNi32 "%c", &value, &aux) != 1 || value < 0) { - throw std::invalid_argument("Invalid partition number of a output!"); - } - output.partition = value; - break; case KAFKA_BVERSION: assert(content->type == FDS_OPTS_T_STRING); output.broker_fallback = content->ptr_string; @@ -266,6 +303,10 @@ Config::parse_kafka(fds_xml_ctx_t *kafka) assert(content->type == FDS_OPTS_T_CONTEXT); parse_kafka_property(output, content->ptr_ctx); break; + case KAFKA_PATTERN: + assert(content->type == FDS_OPTS_T_CONTEXT); + parse_kafka_pattern(output, content->ptr_ctx); + break; default: throw std::invalid_argument("Unexpected element within !"); } @@ -275,8 +316,8 @@ Config::parse_kafka(fds_xml_ctx_t *kafka) if (output.brokers.empty()) { throw std::invalid_argument("List of brokers must be specified!"); } - if (output.topic.empty()) { - throw std::invalid_argument("Topic of output must be specified!"); + if (output.pattern_topics.empty()) { + throw std::invalid_argument("Pattern Topic of output must be specified!"); } if (!output.broker_fallback.empty()) { // Try to check if version string is valid version (at least expect major + minor version) diff --git a/src/plugins/output/json-kafka/src/Config.hpp b/src/plugins/output/json-kafka/src/Config.hpp index c1448631..a30d2666 100644 --- a/src/plugins/output/json-kafka/src/Config.hpp +++ b/src/plugins/output/json-kafka/src/Config.hpp @@ -73,6 +73,15 @@ struct cfg_format { bool template_info; }; +struct cfg_pattern_topic +{ + std::string regex; + std::string topic; + int32_t partition; + + cfg_pattern_topic() = default; +}; + /** Output configuration base structure */ struct cfg_output { /** Plugin identification */ @@ -83,10 +92,8 @@ struct cfg_output { struct cfg_kafka : cfg_output { /// Comma separated list of IP[:Port] std::string brokers; - /// Produced topic - std::string topic; - /// Partition to which data should be send - int32_t partition; + /// Message regex pattern to determine output topic + std::vector pattern_topics; /// Broker version fallback (empty or X.X.X.X) std::string broker_fallback; /// Block conversion if sender buffer is full @@ -108,6 +115,7 @@ class Config { void default_set(); void parse_kafka(fds_xml_ctx_t *kafka); void parse_kafka_property(struct cfg_kafka &kafka, fds_xml_ctx_t *property); + void parse_kafka_pattern(struct cfg_kafka &kafka, fds_xml_ctx_t *pattern); void parse_outputs(fds_xml_ctx_t *outputs); void parse_params(fds_xml_ctx_t *params); diff --git a/src/plugins/output/json-kafka/src/Kafka.cpp b/src/plugins/output/json-kafka/src/Kafka.cpp index 965e90e6..f157e6af 100644 --- a/src/plugins/output/json-kafka/src/Kafka.cpp +++ b/src/plugins/output/json-kafka/src/Kafka.cpp @@ -41,6 +41,8 @@ #include "Config.hpp" #include "Kafka.hpp" +#include "pattern-matching/PatternMatching.hh" + #include #include @@ -55,7 +57,7 @@ * \param[in] ctx Instance context */ Kafka::Kafka(const struct cfg_kafka &cfg, ipx_ctx_t *ctx) - : Output(cfg.name, ctx), m_partition(cfg.partition) + : Output(cfg.name, ctx) { IPX_CTX_DEBUG(_ctx, "Initialization of Kafka connector in progress...", '\0'); IPX_CTX_INFO(_ctx, "The plugin was built against librdkafka %X, now using %X", @@ -107,14 +109,7 @@ Kafka::Kafka(const struct cfg_kafka &cfg, ipx_ctx_t *ctx) } kafka_cfg.release(); // Ownership has been successfully passed to the kafka - // Create the topic - m_topic.reset(rd_kafka_topic_new(m_kafka.get(), cfg.topic.c_str(), nullptr)); - if (!m_topic) { - rd_kafka_resp_err_t err_code = rd_kafka_last_error(); - const char *err_msg = rd_kafka_err2str(err_code); - throw std::runtime_error("rd_kafka_topic_new() failed: " + std::string(err_msg)); - } - + handle_pattern_topics(cfg.pattern_topics); // Start poller thread m_thread->stop = false; m_thread->ctx = ctx; @@ -126,6 +121,35 @@ Kafka::Kafka(const struct cfg_kafka &cfg, ipx_ctx_t *ctx) IPX_CTX_DEBUG(_ctx, "Kafka connector successfully initialized!", '\0') } +/** + * \brief Handles input pattern topics. At first create Kafka topic respectively and then register + * the regex pattern to the pattern matcher with topic id as a pattern id. + * \param[in] pattern_topics Pattern topics configuration. + */ +void +Kafka::handle_pattern_topics(const std::vector &pattern_topics) +{ + const auto &callback_id = pattern_matcher.register_callback(pattern_matching_callback); + + for (const auto &pattern: pattern_topics) + { + m_pattern_topics.emplace_back(nullptr, &rd_kafka_topic_destroy); + m_pattern_topics.back().reset(rd_kafka_topic_new(m_kafka.get(), pattern.topic.c_str(), nullptr)); + + m_topics_partition.emplace_back(pattern.partition); + + if (!m_pattern_topics.back()) { + rd_kafka_resp_err_t err_code = rd_kafka_last_error(); + const char *err_msg = rd_kafka_err2str(err_code); + throw std::runtime_error("rd_kafka_topic_new() failed: " + std::string(err_msg)); + } + + pattern_matcher.register_pattern(R"(/)" + pattern.regex + R"(/sa)", m_pattern_topics.size() - 1, callback_id); + } + + pattern_matcher.update_database(); +} + /** Destructor */ Kafka::~Kafka() { @@ -146,7 +170,9 @@ Kafka::~Kafka() } // Destruction must be called in this order! - m_topic.reset(nullptr); + for (auto &topic: m_pattern_topics) + topic.reset(nullptr); + m_kafka.reset(nullptr); IPX_CTX_DEBUG(_ctx, "Destruction of Kafka connector completed!", '\0'); @@ -161,7 +187,11 @@ Kafka::~Kafka() int Kafka::process(const char *str, size_t len) { - int rc = rd_kafka_produce(m_topic.get(), m_partition, m_produce_flags, + PatternMatchingUserData user_data{}; + + pattern_matcher.match_pattern(str, len, user_data); + + int rc = rd_kafka_produce(m_pattern_topics[user_data.pattern_id].get(), m_topics_partition[user_data.pattern_id], m_produce_flags, // Payload and length (without tailing new-line character) reinterpret_cast(const_cast(str)), len - 1, NULL, 0, // Optional key and its length diff --git a/src/plugins/output/json-kafka/src/Kafka.hpp b/src/plugins/output/json-kafka/src/Kafka.hpp index fb1ee4d3..dcf61f3d 100644 --- a/src/plugins/output/json-kafka/src/Kafka.hpp +++ b/src/plugins/output/json-kafka/src/Kafka.hpp @@ -43,10 +43,12 @@ #define JSON_KAFKA_H #include "Storage.hpp" +#include "pattern-matching/PatternMatching.hh" #include #include #include +#include #include /** JSON kafka connector */ @@ -66,6 +68,29 @@ class Kafka : public Output { using uniq_config = std::unique_ptr; using map_params = std::map; + struct PatternMatchingUserData + { + std::size_t pattern_id; + + PatternMatchingUserData() : pattern_id(0) + {} + }; + + PatternMatching::PatternMatching pattern_matcher; + + static std::size_t pattern_matching_callback(const std::size_t pattern_id, + const std::size_t, + const std::size_t, + PatternMatchingUserData &user_data) + { + user_data.pattern_id = pattern_id; + + static constexpr std::size_t DoStopPatternMatching = 1; + return DoStopPatternMatching; + } + + void handle_pattern_topics(const std::vector &); + /// Poller timeout for events (milliseconds) static constexpr int POLLER_TIMEOUT = 100; /// Flush timeout before shutdown of the connector (milliseconds) @@ -88,10 +113,10 @@ class Kafka : public Output { map_params m_params; /// Kafka object uniq_kafka m_kafka = {nullptr, &rd_kafka_destroy}; - /// Topic object - uniq_topic m_topic = {nullptr, &rd_kafka_topic_destroy}; - /// Producer partition - int32_t m_partition; + /// Topics of patterns + std::vector m_pattern_topics; + /// Topics partition ot produce + std::vector m_topics_partition; /// Producer flags int m_produce_flags; /// Polling thread diff --git a/src/plugins/output/json-kafka/src/pattern-matching/HyperscanCppInterface.hh b/src/plugins/output/json-kafka/src/pattern-matching/HyperscanCppInterface.hh new file mode 100644 index 00000000..f01b5e5f --- /dev/null +++ b/src/plugins/output/json-kafka/src/pattern-matching/HyperscanCppInterface.hh @@ -0,0 +1,264 @@ +/** + * \file src/plugins/output/json-kafka/src/pattern-matching/HyperscanCppInterface.hh + * \author Rasa Oskuei + * \brief This file is CPP wrapper of hyperscan C interface (header file) + * \date 2023 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef HYPERSCAN_CPP_INTERFACE_H +#define HYPERSCAN_CPP_INTERFACE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace PatternMatching { + +template +class HyperscanCppInterface +{ + template + struct MatchingContext + { + CallbackRefT callback; + UserData &user_data; + }; + + struct Pattern + { + std::string m_pattern; + std::size_t m_pattern_id; + unsigned m_flag; + + Pattern(const std::string &pattern, + const std::size_t pattern_id, + const unsigned flag) : m_pattern(pattern), + m_pattern_id(pattern_id), + m_flag(flag) + {} + }; + + std::unique_ptr m_database; + std::unique_ptr m_scratch; + std::vector m_patterns; + + bool m_is_database_up_to_date; + + std::unique_ptr compile_new_database() + { + std::vector patterns_cstr; + patterns_cstr.reserve(m_patterns.size()); + + for (const auto &pattern : m_patterns) + patterns_cstr.push_back(pattern.m_pattern.c_str()); + + std::vector flags; + flags.reserve(m_patterns.size()); + for (const auto &pattern : m_patterns) + flags.push_back(pattern.m_flag); + + std::vector ids; + flags.reserve(m_patterns.size()); + for (const auto &pattern : m_patterns) + ids.push_back(pattern.m_pattern_id); + + return std::unique_ptr(compile_database(HS_MODE_BLOCK, + patterns_cstr, ids, flags), + &hs_free_database); + } + + void add_pettern_and_flags(const std::string &pattern_str, const std::size_t pattern_id) + { + if (pattern_str.empty()) + return; + + if (pattern_str[0] != '/') + { + static constexpr auto error_message = "no leading '/' char"; + throw std::runtime_error(error_message); + } + + const std::size_t flags_start = pattern_str.find_last_of('/'); + + if (flags_start == std::string::npos) + { + static constexpr auto error_message = "no trailing '/' char"; + throw std::runtime_error(error_message); + } + + const unsigned flag = parse_flags(pattern_str.substr(flags_start + 1, pattern_str.size() - flags_start)); + + m_patterns.emplace_back(pattern_str.substr(1, flags_start - 1), pattern_id, flag); + } + + static unsigned parse_flags(const std::string &flags_str) + { + unsigned flags = 0; + + for (const auto &flag : flags_str) + switch (flag) + { + case 'i': + flags |= HS_FLAG_CASELESS; + break; + case 'm': + flags |= HS_FLAG_MULTILINE; + break; + case 's': + flags |= HS_FLAG_DOTALL; + break; + case 'a': + flags |= HS_FLAG_ALLOWEMPTY; + break; + default: { + const std::string error_message = std::string("Unsupported flag \'") + flag + "\'"; + throw std::runtime_error(error_message); + break; + } + } + + return flags; + } + + static hs_scratch_t *allocate_scratch(const hs_database_t *m_database) + { + hs_scratch_t *m_scratch = nullptr; + const hs_error_t err = hs_alloc_scratch(m_database, &m_scratch); + + if (err != HS_SUCCESS) + { + const std::string error_message("Could not allocate m_scratch space. Exiting."); + throw std::runtime_error(error_message); + } + return m_scratch; + } + + static hs_database_t *compile_database(const std::uint32_t hs_mode, + const std::vector &cstr_patterns, + const std::vector &ids, + const std::vector &flags) + { + if (cstr_patterns.empty()) + return nullptr; + + hs_database_t *m_database; + hs_compile_error_t *compile_err; + + const hs_error_t err = hs_compile_multi(cstr_patterns.data(), flags.data(), ids.data(), + cstr_patterns.size(), hs_mode, nullptr, &m_database, &compile_err); + + std::unique_ptr compile_err_ptr(compile_err, &hs_free_compile_error); + + if (err != HS_SUCCESS) + { + if (compile_err->expression < 0) + throw std::runtime_error(compile_err->message); + else + { + const std::string error_message = std::string("Pattern ") + + std::string(cstr_patterns[compile_err->expression]) + + std::string(" failed compilation with error: ") + compile_err->message; + throw std::runtime_error(error_message); + } + } + + return m_database; + } + + template + static int matching_callback(const unsigned int id, + const unsigned long long from, + const unsigned long long to, + const unsigned int, + void *const type_erased_context) + { + const auto &context = *reinterpret_cast *>(type_erased_context); + return static_cast(context.callback(id, from, to, context.user_data)); + } + +public: + HyperscanCppInterface() : m_database(nullptr, &hs_free_database), + m_scratch(nullptr, &hs_free_scratch), + m_is_database_up_to_date(true) + {} + + void register_pattern(const std::string &pattern, const std::size_t pattern_id) + { + m_is_database_up_to_date = false; + add_pettern_and_flags(pattern, pattern_id); + } + + void update_database() + { + if (m_is_database_up_to_date) + return; + + m_database = compile_new_database(); + m_scratch.reset(allocate_scratch(m_database.get())); + + m_is_database_up_to_date = true; + } + + template + void block_scan(const char *const sequence, const std::size_t sequence_len, CallbackRefT &&callback, UserData &user_data) + { + if (!this->m_database.get()) + return; + + MatchingContext context{ std::forward(callback), user_data }; + + const auto scan_error = hs_scan(m_database.get(), sequence, sequence_len, 0, m_scratch.get(), + matching_callback, &context); + + if (scan_error != HS_SUCCESS && scan_error != HS_SCAN_TERMINATED) + std::cerr << "hs_scan error, ERROR: Unable to block scan packet. Exiting. (" << scan_error << ")" << std::endl; + } +}; + +} // namespace PatternMatching + +#endif \ No newline at end of file diff --git a/src/plugins/output/json-kafka/src/pattern-matching/PatternMatching.hh b/src/plugins/output/json-kafka/src/pattern-matching/PatternMatching.hh new file mode 100644 index 00000000..064dc171 --- /dev/null +++ b/src/plugins/output/json-kafka/src/pattern-matching/PatternMatching.hh @@ -0,0 +1,158 @@ +/** + * \file src/plugins/output/json-kafka/src/pattern-matching/PatternMatching.hh + * \author Rasa Oskuei + * \brief Pattern matching abstaction (header file) + * \date 2023 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef PATTERN_MATCHING_H +#define PATTERN_MATCHING_H + +#include +#include +#include +#include +#include +#include + +#include "HyperscanCppInterface.hh" + +namespace PatternMatching { + +template +class PatternMatching +{ + struct CallbackBase + { + virtual std::size_t operator()(const std::size_t, + const std::size_t, + const std::size_t, + UserData &) = 0; + virtual ~CallbackBase() = default; + }; + + template + class Callback : public CallbackBase + { + using NonReferenceFunctorType = typename std::remove_reference::type; + typename std::conditional::value, + FunctorType, NonReferenceFunctorType>::type functor; + + public: + template + Callback(Arg &&arg) : functor{ std::forward(arg) } + {} + + virtual std::size_t operator()(const std::size_t pattern_id, + const std::size_t from, + const std::size_t to, + UserData &user_data) override final + { + return functor(pattern_id, from, to, user_data); + } + }; + + std::vector> m_callbacks; + + static constexpr std::size_t get_pattern_id_max_bits() + { + return 12; + } + + static constexpr std::size_t get_pattern_id(const std::size_t id) + { + return id & ((1 << get_pattern_id_max_bits()) - 1); + } + + static constexpr std::size_t get_callback_id(const std::size_t id) + { + return (id >> get_pattern_id_max_bits()); + } + +protected: + HyperscanCppInterface hyperscan; + + bool call_callbacks(const std::size_t id, + const std::size_t from, + const std::size_t to, + UserData &user_data) const + { + const std::size_t callback_id = get_callback_id(id); + const std::size_t module_pattern_id = get_pattern_id(id); + return static_cast((*m_callbacks[callback_id].get())(module_pattern_id, from, to, user_data)); + } + +public: + template + std::size_t register_callback(CallbackT &&functor) + { + m_callbacks.emplace_back(new Callback(std::forward(functor))); + + return m_callbacks.size() - 1; + } + + void register_pattern(const std::string &pattern, const std::size_t pattern_id, const std::size_t callback_id) + { + if (pattern_id >> get_pattern_id_max_bits()) + throw std::runtime_error("pattern_id exceeds the maximum possible value."); + + if (((std::numeric_limits::max() - pattern_id) >> get_pattern_id_max_bits()) < callback_id) + throw std::runtime_error("Hyperscan pattern_id exceeds the maximum possible value."); + + hyperscan.register_pattern(pattern, (callback_id << get_pattern_id_max_bits()) | pattern_id); + } + + void update_database() + { + hyperscan.update_database(); + } + + void match_pattern(const char *const sequence, const std::size_t sequence_len, UserData &user_data) + { + hyperscan.block_scan( + sequence, sequence_len, + [this](const std::size_t id, + const std::size_t from, + const std::size_t to, + UserData &user_data) { return this->call_callbacks(id, from, to, user_data); }, + user_data); + } +}; + +} // namespace PatternMatching + +#endif \ No newline at end of file