|
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | 17 |
|
| 18 | +#undef NDEBUG |
| 19 | + |
18 | 20 | #include "ConsumeWindowsEventLog.h" |
19 | 21 |
|
20 | 22 | #include "core/ConfigurableComponent.h" |
|
24 | 26 | #include "utils/TestUtils.h" |
25 | 27 | #include "utils/file/FileUtils.h" |
26 | 28 | #include "rapidjson/document.h" |
| 29 | +#include "wel/UniqueEvtHandle.h" |
| 30 | +#include "IntegrationTestUtils.h" |
27 | 31 |
|
28 | 32 | #include "CWELTestUtils.h" |
29 | 33 |
|
@@ -58,25 +62,67 @@ bool dispatchCustomEvent(const CustomEventData& event) { |
58 | 62 | return result == ERROR_SUCCESS; |
59 | 63 | } |
60 | 64 |
|
| 65 | +using org::apache::nifi::minifi::wel::unique_evt_handle; |
| 66 | + |
| 67 | +bool advanceBookmark(const unique_evt_handle& hBookmark, const std::string& channel, const std::string& query, bool advance_to_last = false) { |
| 68 | + const auto hEventResults = unique_evt_handle{ EvtQuery(0, std::wstring{channel.begin(), channel.end()}.c_str(), std::wstring{query.begin(), query.end()}.c_str(), EvtQueryChannelPath) }; |
| 69 | + if (!hEventResults) { |
| 70 | + return false; |
| 71 | + } |
| 72 | + |
| 73 | + if (advance_to_last) { |
| 74 | + if (!EvtSeek(hEventResults.get(), 0, 0, 0, EvtSeekRelativeToLast)) { |
| 75 | + return false; |
| 76 | + } |
| 77 | + } else { |
| 78 | + if (!EvtSeek(hEventResults.get(), 1, hBookmark.get(), 0, EvtSeekRelativeToBookmark)) { |
| 79 | + return false; |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + const unique_evt_handle hEvent = [&hEventResults] { |
| 84 | + DWORD dwReturned{}; |
| 85 | + EVT_HANDLE hEvent{ nullptr }; |
| 86 | + EvtNext(hEventResults.get(), 1, &hEvent, INFINITE, 0, &dwReturned); |
| 87 | + return unique_evt_handle{ hEvent }; |
| 88 | + }(); |
| 89 | + |
| 90 | + if (!hEvent) { |
| 91 | + return false; |
| 92 | + } |
| 93 | + |
| 94 | + REQUIRE(EvtUpdateBookmark(hBookmark.get(), hEvent.get())); |
| 95 | + |
| 96 | + return true; |
| 97 | +} |
| 98 | + |
61 | 99 | class CustomProviderController : public OutputFormatTestController { |
62 | 100 | public: |
63 | | - CustomProviderController(std::string format, std::string json_format) : OutputFormatTestController(CUSTOM_CHANNEL, "*", std::move(format), std::move(json_format)) {} |
| 101 | + CustomProviderController(std::string format, std::string json_format) : OutputFormatTestController(CUSTOM_CHANNEL, "*", std::move(format), std::move(json_format)) { |
| 102 | + bookmark_.reset(EvtCreateBookmark(0)); |
| 103 | + advanceBookmark(bookmark_, channel_, query_, true); |
| 104 | + REQUIRE(bookmark_); |
| 105 | + } |
64 | 106 |
|
65 | 107 | protected: |
66 | 108 | void dispatchBookmarkEvent() override { |
67 | 109 | auto binary = reinterpret_cast<const unsigned char*>("\x0c\x10"); |
68 | 110 | REQUIRE(dispatchCustomEvent({L"Bookmark", L"Second", L"Third", 2, binary})); |
69 | | - // even though we are using the API, we still have to wait for the event to appear |
70 | | - // for CWEL processor |
71 | | - std::this_thread::sleep_for(std::chrono::seconds{2}); |
| 111 | + REQUIRE(checkNewEventAvailable()); |
72 | 112 | } |
73 | 113 | void dispatchCollectedEvent() override { |
74 | 114 | auto binary = reinterpret_cast<const unsigned char*>("\x09\x01"); |
75 | 115 | REQUIRE(dispatchCustomEvent({L"Actual event", L"Second", L"Third", 2, binary})); |
76 | | - // even though we are using the API, we still have to wait for the event to appear |
77 | | - // for CWEL processor |
78 | | - std::this_thread::sleep_for(std::chrono::seconds{2}); |
| 116 | + REQUIRE(checkNewEventAvailable()); |
| 117 | + } |
| 118 | + |
| 119 | + private: |
| 120 | + bool checkNewEventAvailable() { |
| 121 | + return org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime(std::chrono::seconds{5}, [&] { |
| 122 | + return advanceBookmark(bookmark_, channel_, query_); |
| 123 | + }); |
79 | 124 | } |
| 125 | + unique_evt_handle bookmark_; |
80 | 126 | }; |
81 | 127 |
|
82 | 128 | const std::string EVENT_DATA_JSON = R"( |
|
0 commit comments