|
| 1 | +#include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h" |
| 2 | +#include "FWCore/MessageLogger/interface/MessageLogger.h" |
| 3 | +#include "FWCore/ParameterSet/interface/ParameterSet.h" |
| 4 | +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" |
| 5 | +#include "FWCore/Utilities/interface/EDMException.h" |
| 6 | +#include "Utilities/StorageFactory/interface/Storage.h" |
| 7 | +#include "Utilities/StorageFactory/interface/StorageProxyMaker.h" |
| 8 | + |
| 9 | +#include <atomic> |
| 10 | +#include <chrono> |
| 11 | +#include <string> |
| 12 | +#include <string_view> |
| 13 | +#include <tuple> |
| 14 | +#include <type_traits> |
| 15 | + |
| 16 | +#include <boost/algorithm/string.hpp> |
| 17 | +#include <fmt/format.h> |
| 18 | + |
| 19 | +namespace edm::storage { |
| 20 | + class StorageTracerProxy : public Storage { |
| 21 | + static constexpr std::string_view kOpen = "o"; |
| 22 | + static constexpr std::string_view kRead = "r"; |
| 23 | + static constexpr std::string_view kReadv = "rv"; |
| 24 | + static constexpr std::string_view kReadvElement = "rve"; |
| 25 | + static constexpr std::string_view kWrite = "w"; |
| 26 | + static constexpr std::string_view kWritev = "wv"; |
| 27 | + static constexpr std::string_view kWritevElement = "wve"; |
| 28 | + static constexpr std::string_view kPosition = "s"; |
| 29 | + static constexpr std::string_view kPrefetch = "p"; |
| 30 | + static constexpr std::string_view kPrefetchElement = "pe"; |
| 31 | + static constexpr std::string_view kResize = "rsz"; |
| 32 | + static constexpr std::string_view kFlush = "f"; |
| 33 | + static constexpr std::string_view kClose = "c"; |
| 34 | + |
| 35 | + public: |
| 36 | + StorageTracerProxy(unsigned id, |
| 37 | + std::string const& tracefile, |
| 38 | + std::string const& storageUrl, |
| 39 | + std::unique_ptr<Storage> storage) |
| 40 | + : file_(tracefile), baseStorage_(std::move(storage)), traceId_(id) { |
| 41 | + using namespace std::literals::string_literals; |
| 42 | + file_.write( |
| 43 | + "# Format\n"s + "# --------\n"s + "# prefixes\n"s + "# #: comment\n"s + |
| 44 | + fmt::format("# {}: file open\n", kOpen) + fmt::format("# {}: singular read\n", kRead) + |
| 45 | + fmt::format("# {}: vector read\n", kReadv) + |
| 46 | + fmt::format("# {}: vector read element of the preceding '{}' line\n", kReadvElement, kReadv) + |
| 47 | + fmt::format("# {}: singular write\n", kWrite) + fmt::format("# {}: vector write\n", kWritev) + |
| 48 | + fmt::format("# {}: vector write element of the preceding '{}' line\n", kWritevElement, kWritev) + |
| 49 | + fmt::format("# {}: position (seek)\n", kPosition) + fmt::format("# {}: prefetch\n", kPrefetch) + |
| 50 | + fmt::format("# {}: prefetch element of the preceding '{}' line\n", kPrefetch, kPrefetchElement) + |
| 51 | + fmt::format("# {}: resize\n", kResize) + fmt::format("# {}: flush\n", kFlush) + |
| 52 | + fmt::format("# {}: close\n", kClose) + "# --------\n"s + "# line formats\n"s + |
| 53 | + fmt::format("# {} <id> <timestamp ms> <file name>\n", kOpen) + |
| 54 | + fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <requested B> <actual B>\n", kRead) + |
| 55 | + fmt::format( |
| 56 | + "# {} <id> <timestamp ms> <duration us> <requested total B> <actual total B> <number of elements>\n", |
| 57 | + kReadv) + |
| 58 | + fmt::format("# {} <index> <offset B> <requested B>\n", kReadvElement) + |
| 59 | + fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <requested B> <actual B>\n", kWrite) + |
| 60 | + fmt::format( |
| 61 | + "# {} <id> <timestamp ms> <duration us> <requested total B> <actual total B> <number of elements>\n", |
| 62 | + kWritev) + |
| 63 | + fmt::format("# {} <index> <offset B> <requested B>\n", kWritevElement) + |
| 64 | + fmt::format("# {} <id> <timestamp ms> <duration us> <offset B> <whence>\n", kPosition) + |
| 65 | + fmt::format("# {} <id> <timestamp ms> <duration us> <requested total B> <number of elements> <supported?>\n", |
| 66 | + kPrefetch) + |
| 67 | + fmt::format("# {} <index> <offset B> <requested B>\n", kPrefetchElement) + |
| 68 | + fmt::format("# {} <id> <timestamp ms> <duration us> <size B>\n", kResize) + |
| 69 | + fmt::format("# {} <id> <timestamp ms> <duration us>\n", kFlush) + |
| 70 | + fmt::format("# {} <id> <timestamp ms> <duration us>\n", kClose) + "# --------\n"s); |
| 71 | + auto const entryId = idCounter_.fetch_add(1); |
| 72 | + file_.write(fmt::format("{} {} {} {}\n", |
| 73 | + kOpen, |
| 74 | + entryId, |
| 75 | + std::chrono::round<std::chrono::milliseconds>(now().time_since_epoch()).count(), |
| 76 | + storageUrl)); |
| 77 | + LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, entryId); |
| 78 | + } |
| 79 | + |
| 80 | + IOSize read(void* into, IOSize n) override { |
| 81 | + auto const offset = baseStorage_->position(); |
| 82 | + auto const [result, message] = operate([this, into, n]() { return baseStorage_->read(into, n); }); |
| 83 | + file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, offset, n, result)); |
| 84 | + return result; |
| 85 | + } |
| 86 | + |
| 87 | + IOSize read(void* into, IOSize n, IOOffset pos) override { |
| 88 | + auto const [result, message] = operate([this, into, n, pos]() { return baseStorage_->read(into, n, pos); }); |
| 89 | + file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, pos, n, result)); |
| 90 | + return result; |
| 91 | + } |
| 92 | + |
| 93 | + IOSize readv(IOBuffer* into, IOSize n) override { |
| 94 | + auto offset = baseStorage_->position(); |
| 95 | + auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); }); |
| 96 | + std::string elements; |
| 97 | + IOSize total = 0; |
| 98 | + for (IOSize i = 0; i < n; ++i) { |
| 99 | + elements += fmt::format("{} {} {} {}\n", kReadvElement, i, offset, into[i].size()); |
| 100 | + total += into[i].size(); |
| 101 | + offset += into[i].size(); |
| 102 | + } |
| 103 | + file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements); |
| 104 | + return result; |
| 105 | + } |
| 106 | + |
| 107 | + IOSize readv(IOPosBuffer* into, IOSize n) override { |
| 108 | + auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); }); |
| 109 | + std::string elements; |
| 110 | + IOSize total = 0; |
| 111 | + for (IOSize i = 0; i < n; ++i) { |
| 112 | + elements += fmt::format("{} {} {} {}\n", kReadvElement, i, into[i].offset(), into[i].size()); |
| 113 | + total += into[i].size(); |
| 114 | + } |
| 115 | + file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements); |
| 116 | + return result; |
| 117 | + } |
| 118 | + |
| 119 | + IOSize write(const void* from, IOSize n) override { |
| 120 | + auto const offset = baseStorage_->position(); |
| 121 | + auto const [result, message] = operate([this, from, n]() { return baseStorage_->write(from, n); }); |
| 122 | + file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, offset, n, result)); |
| 123 | + return result; |
| 124 | + } |
| 125 | + |
| 126 | + IOSize write(const void* from, IOSize n, IOOffset pos) override { |
| 127 | + auto const [result, message] = operate([this, from, n, pos]() { return baseStorage_->write(from, n, pos); }); |
| 128 | + file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, pos, n, result)); |
| 129 | + return result; |
| 130 | + } |
| 131 | + |
| 132 | + IOSize writev(const IOBuffer* from, IOSize n) override { |
| 133 | + auto offset = baseStorage_->position(); |
| 134 | + auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); }); |
| 135 | + std::string elements; |
| 136 | + IOSize total = 0; |
| 137 | + for (IOSize i = 0; i < n; ++i) { |
| 138 | + elements += fmt::format("{} {} {} {}\n", kWritevElement, i, offset, from[i].size()); |
| 139 | + total += from[i].size(); |
| 140 | + offset += from[i].size(); |
| 141 | + } |
| 142 | + file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements); |
| 143 | + return result; |
| 144 | + } |
| 145 | + |
| 146 | + IOSize writev(const IOPosBuffer* from, IOSize n) override { |
| 147 | + auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); }); |
| 148 | + std::string elements; |
| 149 | + IOSize total = 0; |
| 150 | + for (IOSize i = 0; i < n; ++i) { |
| 151 | + elements += fmt::format("{} {} {} {}\n", kWritevElement, i, from[i].offset(), from[i].size()); |
| 152 | + total += from[i].size(); |
| 153 | + } |
| 154 | + file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements); |
| 155 | + return result; |
| 156 | + } |
| 157 | + |
| 158 | + IOOffset position(IOOffset offset, Relative whence) override { |
| 159 | + auto const [result, message] = |
| 160 | + operate([this, offset, whence]() { return baseStorage_->position(offset, whence); }); |
| 161 | + file_.write(fmt::format("{} {} {} {}\n", kPosition, message, offset, static_cast<int>(whence))); |
| 162 | + return result; |
| 163 | + } |
| 164 | + |
| 165 | + void resize(IOOffset size) override { |
| 166 | + auto const message = operate([this, size]() { return baseStorage_->resize(size); }); |
| 167 | + file_.write(fmt::format("{} {} {}\n", kResize, message, size)); |
| 168 | + } |
| 169 | + |
| 170 | + void flush() override { |
| 171 | + auto const message = operate([this]() { return baseStorage_->flush(); }); |
| 172 | + file_.write(fmt::format("{} {}\n", kFlush, message)); |
| 173 | + } |
| 174 | + |
| 175 | + void close() override { |
| 176 | + auto const message = operate([this]() { return baseStorage_->close(); }); |
| 177 | + file_.write(fmt::format("{} {}\n", kClose, message)); |
| 178 | + } |
| 179 | + |
| 180 | + bool prefetch(const IOPosBuffer* what, IOSize n) override { |
| 181 | + auto const [value, message] = operate([this, what, n]() { return baseStorage_->prefetch(what, n); }); |
| 182 | + std::string elements; |
| 183 | + IOSize total = 0; |
| 184 | + for (IOSize i = 0; i < n; ++i) { |
| 185 | + elements += fmt::format("{} {} {} {}\n", kPrefetchElement, i, what[i].offset(), what[i].size()); |
| 186 | + total += what[i].size(); |
| 187 | + } |
| 188 | + file_.write(fmt::format("{} {} {} {} {}\n", kPrefetch, message, total, n, value) + elements); |
| 189 | + return value; |
| 190 | + } |
| 191 | + |
| 192 | + private: |
| 193 | + template <typename F> |
| 194 | + auto operate(F&& func) -> std::tuple<decltype(func()), std::string> { |
| 195 | + auto const id = idCounter_.fetch_add(1); |
| 196 | + auto const begin = now(); |
| 197 | + auto const result = func(); |
| 198 | + auto const end = now(); |
| 199 | + LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id); |
| 200 | + return std::tuple(result, |
| 201 | + fmt::format("{} {} {}", |
| 202 | + id, |
| 203 | + std::chrono::round<std::chrono::milliseconds>(begin.time_since_epoch()).count(), |
| 204 | + std::chrono::round<std::chrono::microseconds>(end - begin).count())); |
| 205 | + } |
| 206 | + |
| 207 | + template <typename F> |
| 208 | + requires std::is_same_v<std::invoke_result_t<F>, void> |
| 209 | + auto operate(F&& func) -> std::string { |
| 210 | + auto const id = idCounter_.fetch_add(1); |
| 211 | + auto const begin = now(); |
| 212 | + func(); |
| 213 | + auto const end = now(); |
| 214 | + LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id); |
| 215 | + return fmt::format("{} {} {}", |
| 216 | + id, |
| 217 | + std::chrono::round<std::chrono::milliseconds>(begin.time_since_epoch()).count(), |
| 218 | + std::chrono::round<std::chrono::microseconds>(end - begin).count()); |
| 219 | + } |
| 220 | + |
| 221 | + static std::chrono::time_point<std::chrono::steady_clock> now() { return std::chrono::steady_clock::now(); } |
| 222 | + |
| 223 | + ThreadSafeOutputFileStream file_; |
| 224 | + std::unique_ptr<Storage> baseStorage_; |
| 225 | + std::atomic<unsigned int> idCounter_{0}; |
| 226 | + unsigned int const traceId_; |
| 227 | + }; |
| 228 | + |
| 229 | + class StorageTracerProxyMaker : public StorageProxyMaker { |
| 230 | + public: |
| 231 | + StorageTracerProxyMaker(edm::ParameterSet const& pset) |
| 232 | + : filenamePattern_(pset.getUntrackedParameter<std::string>("traceFilePattern")) { |
| 233 | + if (filenamePattern_.find("%I") == std::string::npos) { |
| 234 | + throw edm::Exception(edm::errors::Configuration) << "traceFilePattern did not contain '%I'"; |
| 235 | + } |
| 236 | + } |
| 237 | + |
| 238 | + static void fillPSetDescription(edm::ParameterSetDescription& iDesc) { |
| 239 | + iDesc.addUntracked<std::string>("traceFilePattern", "trace_%I.txt") |
| 240 | + ->setComment( |
| 241 | + "Pattern for the output trace file names. Must contain '%I' for the counter of different files."); |
| 242 | + } |
| 243 | + |
| 244 | + std::unique_ptr<Storage> wrap(std::string const& url, std::unique_ptr<Storage> storage) const override { |
| 245 | + auto value = fileCounter_.fetch_add(1); |
| 246 | + std::string fname = filenamePattern_; |
| 247 | + boost::replace_all(fname, "%I", std::to_string(value)); |
| 248 | + return std::make_unique<StorageTracerProxy>(value, fname, url, std::move(storage)); |
| 249 | + } |
| 250 | + |
| 251 | + private: |
| 252 | + mutable std::atomic<unsigned int> fileCounter_{0}; |
| 253 | + std::string const filenamePattern_; |
| 254 | + }; |
| 255 | +} // namespace edm::storage |
| 256 | + |
| 257 | +#include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h" |
| 258 | +#include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h" |
| 259 | +DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory, |
| 260 | + edm::storage::StorageTracerProxyMaker, |
| 261 | + "StorageTracerProxy"); |
0 commit comments