|
| 1 | +/// \file |
| 2 | +/// \ingroup tutorial_ntuple |
| 3 | +/// \notebook |
| 4 | +/// Example of efficient multi-threaded reading when multiple threads share a single reader. |
| 5 | +/// In this example, two threads share the work as follows: the first thread processes all the even entries numbers |
| 6 | +/// and the second thread all the odd entry numbers. The second thread works twice as slow as the first one. |
| 7 | +/// |
| 8 | +/// As a result, the threads need the same clusters and pages but at different points in time. |
| 9 | +/// With the naive way of using the reader, this read pattern will result in cache thrashing. |
| 10 | +/// |
| 11 | +/// Using the "active entry token" API, on the other hand, the reader will be informed about which entries |
| 12 | +/// need to be kept in the caches and cache thrashing is prevented. |
| 13 | +/// |
| 14 | +/// \macro_image |
| 15 | +/// \macro_code |
| 16 | +/// |
| 17 | +/// \date October 2025 |
| 18 | +/// \author The ROOT Team |
| 19 | + |
| 20 | +#include <ROOT/RNTupleModel.hxx> |
| 21 | +#include <ROOT/RNTupleReader.hxx> |
| 22 | +#include <ROOT/RNTupleWriter.hxx> |
| 23 | + |
| 24 | +#include <TAxis.h> |
| 25 | +#include <TCanvas.h> |
| 26 | +#include <TGraph.h> |
| 27 | +#include <TLatex.h> |
| 28 | +#include <TLegend.h> |
| 29 | +#include <TRandom3.h> |
| 30 | +#include <TROOT.h> |
| 31 | +#include <TStyle.h> |
| 32 | + |
| 33 | +#include <array> |
| 34 | +#include <atomic> |
| 35 | +#include <chrono> |
| 36 | +#include <exception> |
| 37 | +#include <functional> |
| 38 | +#include <iostream> |
| 39 | +#include <mutex> |
| 40 | +#include <thread> |
| 41 | +#include <vector> |
| 42 | + |
| 43 | +using namespace std::chrono_literals; |
| 44 | + |
| 45 | +// Where to store the ntuple of this example |
| 46 | +constexpr char const *kNTupleFileName = "ntpl017_shared_reader.root"; |
| 47 | + |
| 48 | +/// The sample class that is stored in the RNTuple |
| 49 | +struct Point2D { |
| 50 | + float fX; |
| 51 | + float fY; |
| 52 | +}; |
| 53 | + |
| 54 | +/// Whether we read with setting active entry tokens (informed) or not (naive) |
| 55 | +enum class EReadMode { |
| 56 | + kNaive, |
| 57 | + kInformed |
| 58 | +}; |
| 59 | + |
| 60 | +/// Nicify output of EReadMode values |
| 61 | +std::ostream &operator<<(std::ostream &os, const EReadMode &e) |
| 62 | +{ |
| 63 | + switch (e) { |
| 64 | + case EReadMode::kNaive: os << "naive"; break; |
| 65 | + case EReadMode::kInformed: os << "informed"; break; |
| 66 | + default: os << "???"; |
| 67 | + } |
| 68 | + return os; |
| 69 | +} |
| 70 | + |
| 71 | +// To be reset between ProcessEntries calls to Read() |
| 72 | +static std::atomic<int> gNEntriesDone; |
| 73 | +static std::atomic<int> gThreadId; |
| 74 | + |
| 75 | +/// The read thread's main function |
| 76 | +void ProcessEntries(ROOT::RNTupleReader &reader, EReadMode readMode, const std::chrono::microseconds &usPerEvent, |
| 77 | + std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages, |
| 78 | + std::vector<int> &nClusters, std::vector<int> &nPages) |
| 79 | +{ |
| 80 | + static std::mutex gLock; |
| 81 | + |
| 82 | + const int threadId = gThreadId++; |
| 83 | + |
| 84 | + auto token = reader.CreateActiveEntryToken(); |
| 85 | + for (unsigned int i = threadId; i < reader.GetNEntries(); i += 2) { |
| 86 | + { |
| 87 | + std::lock_guard<std::mutex> guard(gLock); |
| 88 | + |
| 89 | + // The only difference between naive and informed reading: in informed reading, we indicate which |
| 90 | + // entry we are going to use before loading it. |
| 91 | + switch (readMode) { |
| 92 | + case EReadMode::kInformed: token.SetEntryNumber(i); break; |
| 93 | + case EReadMode::kNaive: break; |
| 94 | + default: std::terminate(); // never here |
| 95 | + } |
| 96 | + |
| 97 | + reader.LoadEntry(i); |
| 98 | + } |
| 99 | + |
| 100 | + std::this_thread::sleep_for(usPerEvent); |
| 101 | + |
| 102 | + const int entriesDone = gNEntriesDone++; |
| 103 | + sumLoadedClusters.at(entriesDone) = |
| 104 | + reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nClusterLoaded")->GetValueAsInt(); |
| 105 | + sumUnsealedPages[entriesDone] = |
| 106 | + reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nPageUnsealed")->GetValueAsInt(); |
| 107 | + nClusters[entriesDone] = |
| 108 | + reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.RClusterPool.nCluster")->GetValueAsInt(); |
| 109 | + nPages[entriesDone] = |
| 110 | + reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.RPagePool.nPage")->GetValueAsInt(); |
| 111 | + } |
| 112 | +} |
| 113 | + |
| 114 | +void Read(EReadMode readMode, std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages, |
| 115 | + std::vector<int> &nClusters, std::vector<int> &nPages) |
| 116 | +{ |
| 117 | + auto reader = ROOT::RNTupleReader::Open("ntpl", kNTupleFileName); |
| 118 | + reader->EnableMetrics(); |
| 119 | + |
| 120 | + gNEntriesDone = 0; |
| 121 | + gThreadId = 0; |
| 122 | + |
| 123 | + const auto N = reader->GetNEntries(); |
| 124 | + sumLoadedClusters.resize(N); |
| 125 | + sumUnsealedPages.resize(N); |
| 126 | + nClusters.resize(N); |
| 127 | + nPages.resize(N); |
| 128 | + |
| 129 | + std::array<std::thread, 2> threads; |
| 130 | + threads[0] = std::thread(ProcessEntries, std::ref(*reader), readMode, 100us, std::ref(sumLoadedClusters), |
| 131 | + std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages)); |
| 132 | + threads[1] = std::thread(ProcessEntries, std::ref(*reader), readMode, 200us, std::ref(sumLoadedClusters), |
| 133 | + std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages)); |
| 134 | + for (auto &t : threads) { |
| 135 | + t.join(); |
| 136 | + } |
| 137 | + |
| 138 | + std::cout << "Reading in mode '" << readMode << "':" << std::endl; |
| 139 | + std::cout << "===========================" << std::endl; |
| 140 | + reader->GetMetrics().Print(std::cout); |
| 141 | + std::cout << "===========================" << std::endl << std::endl; |
| 142 | +} |
| 143 | + |
| 144 | +void Write() |
| 145 | +{ |
| 146 | + auto model = ROOT::RNTupleModel::Create(); |
| 147 | + auto ptrPoint = model->MakeField<Point2D>("point"); |
| 148 | + |
| 149 | + auto writer = ROOT::RNTupleWriter::Recreate(std::move(model), "ntpl", kNTupleFileName); |
| 150 | + |
| 151 | + for (int i = 0; i < 1000; ++i) { |
| 152 | + if (i % 100 == 0) |
| 153 | + writer->CommitCluster(); |
| 154 | + |
| 155 | + auto prng = std::make_unique<TRandom3>(); |
| 156 | + prng->SetSeed(); |
| 157 | + |
| 158 | + ptrPoint->fX = prng->Rndm(1); |
| 159 | + ptrPoint->fY = prng->Rndm(1); |
| 160 | + writer->Fill(); |
| 161 | + } |
| 162 | +} |
| 163 | + |
| 164 | +TGraph *GetGraph(const std::vector<int> &counts) |
| 165 | +{ |
| 166 | + auto graph = new TGraph(); |
| 167 | + for (unsigned int i = 0; i < counts.size(); ++i) { |
| 168 | + graph->SetPoint(i, i, counts[i]); |
| 169 | + } |
| 170 | + graph->GetXaxis()->SetTitle("Number of processed entries"); |
| 171 | + return graph; |
| 172 | +} |
| 173 | + |
| 174 | +void ntpl017_shared_reader() |
| 175 | +{ |
| 176 | + ROOT::EnableImplicitMT(); |
| 177 | + |
| 178 | + Write(); |
| 179 | + ROOT::RNTupleReader::Open("ntpl", kNTupleFileName)->PrintInfo(ROOT::ENTupleInfo::kStorageDetails); |
| 180 | + |
| 181 | + std::vector<int> sumLoadedClusters; |
| 182 | + std::vector<int> sumUnsealedPages; |
| 183 | + std::vector<int> nClusters; |
| 184 | + std::vector<int> nPages; |
| 185 | + TLatex latex; |
| 186 | + |
| 187 | + gStyle->SetOptStat(0); |
| 188 | + gStyle->SetLineWidth(2); |
| 189 | + gStyle->SetMarkerStyle(8); |
| 190 | + TCanvas *canvas = new TCanvas("", "Shared Reader Example", 200, 10, 1500, 1000); |
| 191 | + |
| 192 | + canvas->Divide(2, 2); |
| 193 | + |
| 194 | + Read(EReadMode::kNaive, sumLoadedClusters, sumUnsealedPages, nClusters, nPages); |
| 195 | + |
| 196 | + canvas->cd(1); |
| 197 | + auto graph1 = GetGraph(sumUnsealedPages); |
| 198 | + graph1->SetLineColor(kRed); |
| 199 | + graph1->Draw("AL"); |
| 200 | + auto graph2 = GetGraph(sumLoadedClusters); |
| 201 | + graph2->SetLineColor(kBlue); |
| 202 | + graph2->Draw("SAME L"); |
| 203 | + |
| 204 | + auto legend = new TLegend(0.125, 0.725, 0.625, 0.875); |
| 205 | + legend->AddEntry(graph1, "Number of decompressed pages", "l"); |
| 206 | + legend->AddEntry(graph2, "Number of loaded clusters", "l"); |
| 207 | + legend->Draw(); |
| 208 | + |
| 209 | + latex.SetTextAlign(22); |
| 210 | + latex.DrawLatexNDC(0.5, 0.95, "Naive Reading"); |
| 211 | + |
| 212 | + canvas->cd(3); |
| 213 | + |
| 214 | + auto graph3 = GetGraph(nPages); |
| 215 | + graph3->SetMarkerColor(kRed); |
| 216 | + graph3->GetYaxis()->SetNdivisions(8); |
| 217 | + graph3->GetYaxis()->SetRangeUser(-0.5, 14); |
| 218 | + graph3->Draw("AP"); |
| 219 | + |
| 220 | + auto graph4 = GetGraph(nClusters); |
| 221 | + graph4->SetMarkerColor(kBlue); |
| 222 | + graph4->Draw("SAME P"); |
| 223 | + |
| 224 | + legend = new TLegend(0.35, 0.725, 0.85, 0.875); |
| 225 | + legend->AddEntry(graph3, "Number of currently cached pages", "p"); |
| 226 | + legend->AddEntry(graph4, "Number of currently cached clusters", "p"); |
| 227 | + legend->Draw(); |
| 228 | + |
| 229 | + // =============================== |
| 230 | + |
| 231 | + Read(EReadMode::kInformed, sumLoadedClusters, sumUnsealedPages, nClusters, nPages); |
| 232 | + |
| 233 | + canvas->cd(2); |
| 234 | + auto graph5 = GetGraph(sumUnsealedPages); |
| 235 | + graph5->SetLineColor(kRed); |
| 236 | + graph5->Draw("AL"); |
| 237 | + |
| 238 | + auto graph6 = GetGraph(sumLoadedClusters); |
| 239 | + graph6->SetLineColor(kBlue); |
| 240 | + graph6->Draw("SAME L"); |
| 241 | + |
| 242 | + latex.SetTextAlign(22); |
| 243 | + latex.DrawLatexNDC(0.5, 0.95, "Informed Reading"); |
| 244 | + |
| 245 | + canvas->cd(4); |
| 246 | + |
| 247 | + auto graph7 = GetGraph(nPages); |
| 248 | + graph7->SetMarkerColor(kRed); |
| 249 | + graph7->GetYaxis()->SetNdivisions(8); |
| 250 | + graph7->GetYaxis()->SetRangeUser(-0.5, 14); |
| 251 | + graph7->Draw("AP"); |
| 252 | + |
| 253 | + auto graph8 = GetGraph(nClusters); |
| 254 | + graph8->SetMarkerColor(kBlue); |
| 255 | + graph8->Draw("SAME P"); |
| 256 | +} |
0 commit comments