Skip to content

Commit e2840d0

Browse files
committed
[net] Add TParallelMergingFile test with RNTuple
1 parent 91af9d2 commit e2840d0

File tree

2 files changed

+158
-0
lines changed

2 files changed

+158
-0
lines changed

net/net/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
# For the list of contributors see $ROOTSYS/README/CREDITS.
66

77
ROOT_ADD_GTEST(nettests nettests.cxx LIBRARIES Net)
8+
ROOT_ADD_GTEST(testParallelMergingFile testParallelMergingFile.cxx LIBRARIES Net Tree Hist ROOTNTuple ROOTNTupleUtil)
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
#include <gtest/gtest.h>
2+
3+
#include <thread>
4+
#include <memory>
5+
6+
#include <ROOT/RNTuple.hxx>
7+
#include <ROOT/RNTupleWriter.hxx>
8+
#include <ROOT/RNTupleReader.hxx>
9+
#include <ROOT/RNTupleModel.hxx>
10+
#include <ROOT/TestSupport.hxx>
11+
12+
#include "TParallelMergingFile.h"
13+
#include "TServerSocket.h"
14+
#include "TFileMerger.h"
15+
16+
enum EStatusKind {
17+
// These values are dictated by TParallelMergingFile::UploadAndReset().
18+
kStartConnection = 0,
19+
kProtocolVersion = 1,
20+
kProtocol = 1,
21+
};
22+
23+
static void Server(std::unique_ptr<TServerSocket> ss, const std::string &outFile)
24+
{
25+
// NOTE: only accepts a single client.
26+
auto client = std::unique_ptr<TSocket>(ss->Accept());
27+
ASSERT_NE(client, nullptr);
28+
29+
// initial handshake
30+
UInt_t clientIdx = 0;
31+
client->Send(clientIdx, kStartConnection);
32+
client->Send(kProtocolVersion, kProtocol);
33+
34+
TFileMerger merger{/* isLocal = */ false};
35+
merger.SetMergeOptions(TString("rntuple.MergingMode=Union"));
36+
bool ok = merger.OutputFile(outFile.c_str(), "RECREATE");
37+
ASSERT_TRUE(ok);
38+
39+
// Client loop
40+
bool alive = true;
41+
while (alive) {
42+
TMessage *msg = nullptr;
43+
client->Recv(msg);
44+
ASSERT_NE(msg, nullptr);
45+
46+
switch (msg->What()) {
47+
case kMESS_STRING: {
48+
// This is how the TParallelMergingFile terminates its connection (it sends a message containing
49+
// the string "Finished").
50+
char str[64];
51+
msg->ReadString(str, sizeof(str));
52+
alive = false;
53+
} break;
54+
55+
case kMESS_ANY: {
56+
// Receive and merge data
57+
Int_t clientId;
58+
TString filename;
59+
Long64_t length;
60+
msg->ReadInt(clientId);
61+
msg->ReadTString(filename);
62+
msg->ReadLong64(length);
63+
64+
auto input = std::make_unique<TMemFile>((std::string("server_") + filename), msg->Buffer() + msg->Length(),
65+
length, "READ");
66+
67+
msg->SetBufferOffset(msg->Length() + length);
68+
69+
EXPECT_NE(input->Get<ROOT::RNTuple>("ntpl"), nullptr);
70+
71+
merger.AddFile(input.get());
72+
bool ok = merger.PartialMerge(TFileMerger::kAllIncremental | TFileMerger::kKeepCompression);
73+
EXPECT_TRUE(ok);
74+
75+
} break;
76+
77+
default: std::cout << "ignoring message of kind " << msg->What() << "\n";
78+
}
79+
80+
delete msg;
81+
}
82+
}
83+
84+
TEST(TParallelMergingFile, UploadAndResetNonTObject)
85+
{
86+
constexpr auto sockPath = "/tmp/parallelMergeTest.sock";
87+
88+
// Start server
89+
gSystem->Unlink(sockPath);
90+
auto ss = std::make_unique<TServerSocket>(sockPath);
91+
ASSERT_TRUE(ss->IsValid());
92+
93+
struct Cleanup {
94+
const char *fSockPath;
95+
Cleanup(const char *sockPath) : fSockPath(sockPath) {}
96+
~Cleanup() { gSystem->Unlink(fSockPath); }
97+
} cleanup(sockPath);
98+
99+
ROOT::TestSupport::FileRaii fileGuardMerged("parallelFileMerged.root");
100+
101+
std::thread serverThread(Server, std::move(ss), fileGuardMerged.GetPath());
102+
103+
// Create "client-side" file
104+
auto *file = dynamic_cast<TParallelMergingFile *>(
105+
TFile::Open((std::string("parallelMergeTest.root?pmerge=") + sockPath).c_str(), "RECREATE"));
106+
ASSERT_NE(file, nullptr);
107+
file->SetCacheWrite(nullptr);
108+
109+
ROOT::TestSupport::CheckDiagsRAII diags;
110+
diags.optionalDiag(kWarning, "TParallelMergingFile::ResetObjects", "can not be ResetAfterMerge", false);
111+
diags.requiredDiag(kWarning, "TFileMerger::MergeRecursive", "Merging RNTuples is experimental");
112+
113+
constexpr auto kNEntries = 10000;
114+
constexpr auto kNUploads = 4;
115+
{
116+
auto model = ROOT::RNTupleModel::CreateBare();
117+
model->MakeField<float>("px");
118+
model->MakeField<float>("py");
119+
auto writer = ROOT::RNTupleWriter::Append(model->Clone(), "ntpl", *file);
120+
auto entry = writer->CreateEntry();
121+
auto px = entry->GetPtr<float>("px");
122+
auto py = entry->GetPtr<float>("py");
123+
for (int i = 0; i < kNEntries; ++i) {
124+
*px = i;
125+
*py = 2 * i;
126+
writer->Fill(*entry);
127+
if (i > 0 && (i % (kNEntries / kNUploads)) == 0) {
128+
// Force UploadAndReset() by destroying the RNTupleWriter, which in turn will call CommitDataset()
129+
// which ultimately calls file->Write().
130+
// IMPORTANT: writer.reset() must be called explicitly, since we must not create the new RNTuple
131+
// before destroying the old one (otherwise the header will get a stale address and end up corrupted).
132+
writer.reset();
133+
writer = ROOT::RNTupleWriter::Append(model->Clone(), "ntpl", *file);
134+
entry = writer->CreateEntry();
135+
px = entry->GetPtr<float>("px");
136+
py = entry->GetPtr<float>("py");
137+
}
138+
}
139+
}
140+
141+
delete file;
142+
143+
serverThread.join();
144+
145+
{
146+
auto reader = ROOT::RNTupleReader::Open("ntpl", fileGuardMerged.GetPath());
147+
EXPECT_EQ(reader->GetNEntries(), kNEntries);
148+
149+
auto px = reader->GetModel().GetDefaultEntry().GetPtr<float>("px");
150+
auto py = reader->GetModel().GetDefaultEntry().GetPtr<float>("py");
151+
for (auto idx : reader->GetEntryRange()) {
152+
reader->LoadEntry(idx);
153+
EXPECT_FLOAT_EQ(*px, idx);
154+
EXPECT_FLOAT_EQ(*py, 2 * idx);
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)