forked from apache/nifi-minifi-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumeWindowsEventLog.h
More file actions
180 lines (157 loc) · 6.15 KB
/
ConsumeWindowsEventLog.h
File metadata and controls
180 lines (157 loc) · 6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/**
* @file ConsumeWindowsEventLog.h
* ConsumeWindowsEventLog class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "core/Core.h"
#include "wel/WindowsEventLog.h"
#include "FlowFileRecord.h"
#include "concurrentqueue.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "pugixml.hpp"
#include <winevt.h>
#include <sstream>
#include <regex>
#include <codecvt>
#include "utils/OsUtils.h"
#include <Objbase.h>
#include <mutex>
#include <unordered_map>
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
struct EventRender {
std::map<std::string, std::string> matched_fields;
std::string xml;
std::string plaintext;
std::string json;
};
class Bookmark;
//! ConsumeWindowsEventLog Class
class ConsumeWindowsEventLog : public core::Processor {
public:
//! Constructor
/*!
* Create a new processor
*/
ConsumeWindowsEventLog(const std::string& name, utils::Identifier uuid = utils::Identifier());
//! Destructor
virtual ~ConsumeWindowsEventLog();
//! Processor Name
static const std::string ProcessorName;
//! Supported Properties
static core::Property Channel;
static core::Property Query;
static core::Property RenderFormatXML;
static core::Property MaxBufferSize;
static core::Property InactiveDurationToReconnect;
static core::Property IdentifierMatcher;
static core::Property IdentifierFunction;
static core::Property ResolveAsAttributes;
static core::Property EventHeaderDelimiter;
static core::Property EventHeader;
static core::Property OutputFormat;
static core::Property JSONFormat;
static core::Property BatchCommitSize;
static core::Property BookmarkRootDirectory;
static core::Property ProcessOldEvents;
//! Supported Relationships
static core::Relationship Success;
public:
/**
* Function that's executed when the processor is scheduled.
* @param context process context.
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
//! OnTrigger method, implemented by NiFi ConsumeWindowsEventLog
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
//! Initialize, overwrite by NiFi ConsumeWindowsEventLog
virtual void initialize(void) override;
void notifyStop() override;
protected:
void refreshTimeZoneData();
void putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session) const;
wel::WindowsEventLogHandler getEventLogHandler(const std::string & name);
bool insertHeaderName(wel::METADATA_NAMES &header, const std::string &key, const std::string &value) const;
void LogWindowsError(std::string error = "Error") const;
bool createEventRender(EVT_HANDLE eventHandle, EventRender& eventRender);
void substituteXMLPercentageItems(pugi::xml_document& doc);
static constexpr const char* XML = "XML";
static constexpr const char* Both = "Both";
static constexpr const char* Plaintext = "Plaintext";
static constexpr const char* JSON = "JSON";
static constexpr const char* JSONRaw = "Raw";
static constexpr const char* JSONSimple = "Simple";
static constexpr const char* JSONFlattened = "Flattened";
private:
struct TimeDiff {
auto operator()() const {
return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
}
const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
};
bool commitAndSaveBookmark(const std::wstring &bookmarkXml, const std::shared_ptr<core::ProcessSession> &session);
std::tuple<size_t, std::wstring> processEventLogs(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session, const EVT_HANDLE& event_query_results);
// Logger
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<core::CoreComponentStateManager> state_manager_;
wel::METADATA_NAMES header_names_;
std::string header_delimiter_;
std::string channel_;
std::wstring wstrChannel_;
std::wstring wstrQuery_;
std::string regex_;
bool resolve_as_attributes_{false};
bool apply_identifier_function_{false};
std::string provenanceUri_;
std::string computerName_;
uint64_t maxBufferSize_{};
DWORD lastActivityTimestamp_{};
std::mutex cache_mutex_;
std::map<std::string, wel::WindowsEventLogHandler > providers_;
uint64_t batch_commit_size_{};
enum class JSONType {None, Raw, Simple, Flattened};
struct OutputFormat {
bool xml{false};
bool plaintext{false};
struct JSON {
JSONType type{JSONType::None};
explicit operator bool() const noexcept {
return type != JSONType::None;
}
} json;
} output_;
std::unique_ptr<Bookmark> bookmark_;
std::mutex on_trigger_mutex_;
std::unordered_map<std::string, std::string> xmlPercentageItemsResolutions_;
HMODULE hMsobjsDll_{};
std::string timezone_name_;
std::string timezone_offset_; // Represented as UTC offset in (+|-)HH:MM format, like +02:00
};
REGISTER_RESOURCE(ConsumeWindowsEventLog, "Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows.");
} // namespace processors
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org