Skip to content

Commit 7a12b3a

Browse files
committed
make the zk structure a bit more flat
1 parent 803a91a commit 7a12b3a

File tree

5 files changed

+232
-98
lines changed

5 files changed

+232
-98
lines changed

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,84 @@
99
namespace DB
1010
{
1111

12+
struct ExportReplicatedMergeTreePartitionProcessingPartEntry
13+
{
14+
String part_name;
15+
String status;
16+
size_t retry_count;
17+
String finished_by;
18+
19+
std::string toJsonString() const
20+
{
21+
Poco::JSON::Object json;
22+
23+
json.set("part_name", part_name);
24+
json.set("status", status);
25+
json.set("retry_count", retry_count);
26+
json.set("finished_by", finished_by);
27+
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
28+
oss.exceptions(std::ios::failbit);
29+
Poco::JSON::Stringifier::stringify(json, oss);
30+
31+
return oss.str();
32+
}
33+
34+
static ExportReplicatedMergeTreePartitionProcessingPartEntry fromJsonString(const std::string & json_string)
35+
{
36+
Poco::JSON::Parser parser;
37+
auto json = parser.parse(json_string).extract<Poco::JSON::Object::Ptr>();
38+
chassert(json);
39+
40+
ExportReplicatedMergeTreePartitionProcessingPartEntry entry;
41+
42+
entry.part_name = json->getValue<String>("part_name");
43+
entry.status = json->getValue<String>("status");
44+
entry.retry_count = json->getValue<size_t>("retry_count");
45+
if (json->has("finished_by"))
46+
{
47+
entry.finished_by = json->getValue<String>("finished_by");
48+
}
49+
return entry;
50+
}
51+
};
52+
53+
struct ExportReplicatedMergeTreePartitionProcessedPartEntry
54+
{
55+
String part_name;
56+
String path_in_destination;
57+
String status;
58+
String finished_by;
59+
60+
std::string toJsonString() const
61+
{
62+
Poco::JSON::Object json;
63+
json.set("part_name", part_name);
64+
json.set("path_in_destination", path_in_destination);
65+
json.set("status", status);
66+
json.set("finished_by", finished_by);
67+
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
68+
oss.exceptions(std::ios::failbit);
69+
Poco::JSON::Stringifier::stringify(json, oss);
70+
return oss.str();
71+
}
72+
73+
static ExportReplicatedMergeTreePartitionProcessedPartEntry fromJsonString(const std::string & json_string)
74+
{
75+
Poco::JSON::Parser parser;
76+
auto json = parser.parse(json_string).extract<Poco::JSON::Object::Ptr>();
77+
chassert(json);
78+
79+
ExportReplicatedMergeTreePartitionProcessedPartEntry entry;
80+
81+
entry.part_name = json->getValue<String>("part_name");
82+
entry.path_in_destination = json->getValue<String>("path_in_destination");
83+
entry.status = json->getValue<String>("status");
84+
entry.finished_by = json->getValue<String>("finished_by");
85+
86+
return entry;
87+
}
88+
};
89+
1290
struct ExportReplicatedMergeTreePartitionManifest
1391
{
1492
String transaction_id;

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 91 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -169,64 +169,16 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
169169
{
170170
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} exported successfully", relative_path_in_destination_storage);
171171

172-
Coordination::Stat locked_by_stat;
173-
std::string locked_by;
174-
175-
if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat))
176-
{
177-
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not commit or set it as completed", part_name);
178-
return;
179-
}
180-
181-
/// Is this a good idea? what if the file we just pushed to s3 ends up triggering an exception in the replica that actually locks the part and it does not commit?
182-
/// I guess we should not throw if file already exists for export partition, hard coded.
183-
if (locked_by != storage.replica_name)
184-
{
185-
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked by another replica, will not commit or set it as completed", part_name);
186-
return;
187-
}
188-
189-
Coordination::Requests requests;
190-
191-
if (zk->isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
192-
{
193-
requests.emplace_back(zkutil::makeRemoveRecursiveRequest(*zk, processing_parts_path / part_name, -1));
194-
}
195-
else
196-
{
197-
// Remove children before parent (order matters for multi operations)
198-
// Maybe a ls + multi rm..
199-
requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name / "retry_count", -1));
200-
requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name / "status", -1));
201-
requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name, -1));
202-
}
203-
204-
requests.emplace_back(zkutil::makeCreateRequest(processed_part_path, "", zkutil::CreateMode::Persistent));
205-
requests.emplace_back(zkutil::makeCreateRequest(processed_part_path / "path", relative_path_in_destination_storage, zkutil::CreateMode::Persistent));
206-
requests.emplace_back(zkutil::makeCreateRequest(processed_part_path / "status", "COMPLETED", zkutil::CreateMode::Persistent));
207-
requests.emplace_back(zkutil::makeCreateRequest(processed_part_path / "finished_by", storage.replica_name, zkutil::CreateMode::Persistent));
208-
requests.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version));
209-
210-
Coordination::Responses responses;
211-
if (Coordination::Error::ZOK != zk->tryMulti(requests, responses))
172+
if (!tryToMovePartToProcessed(export_path, processing_parts_path, processed_part_path, part_name, relative_path_in_destination_storage, zk))
212173
{
213-
/// todo arthur remember what to do here
214-
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update export path, skipping");
174+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to move part to processed, will not commit export partition");
215175
return;
216176
}
217177

218178
LOG_INFO(storage.log, "ExportPartition scheduler task: Marked part export {} as completed", part_name);
219-
220-
Strings parts_in_processing_or_pending;
221-
if (Coordination::Error::ZOK != zk->tryGetChildren(export_path / "processing", parts_in_processing_or_pending))
222-
{
223-
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get parts in processing or pending, will not try to commit export partition");
224-
return;
225-
}
226179

227-
if (!parts_in_processing_or_pending.empty())
180+
if (!areAllPartsProcessed(export_path, zk))
228181
{
229-
LOG_INFO(storage.log, "ExportPartition scheduler task: There are still parts in processing or pending, will not try to commit export partition");
230182
return;
231183
}
232184

@@ -262,21 +214,32 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
262214
Coordination::Requests ops;
263215

264216
const auto processing_part_path = processing_parts_path / part_name;
265-
std::string retry_count_string;
266-
if (zk->tryGet(processing_part_path / "retry_count", retry_count_string))
217+
218+
std::string processing_part_string;
219+
220+
if (!zk->tryGet(processing_part_path, processing_part_string))
267221
{
268-
std::size_t retry_count = std::stoull(retry_count_string.c_str()) + 1;
222+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get processing part, will not increment error counts");
223+
return;
224+
}
269225

270-
ops.emplace_back(zkutil::makeSetRequest(processing_part_path / "retry_count", std::to_string(retry_count), -1));
226+
/// todo arthur could this have been cached?
227+
auto processing_part_entry = ExportReplicatedMergeTreePartitionProcessingPartEntry::fromJsonString(processing_part_string);
228+
229+
processing_part_entry.retry_count++;
230+
231+
if (processing_part_entry.retry_count)
232+
{
271233
ops.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version));
234+
ops.emplace_back(zkutil::makeSetRequest(processing_part_path, processing_part_entry.toJsonString(), -1));
272235

273-
if (retry_count >= max_retries)
236+
if (processing_part_entry.retry_count >= max_retries)
274237
{
275238
/// just set status in processing_part_path and finished_by
276-
ops.emplace_back(zkutil::makeSetRequest(processing_part_path / "status", "FAILED", -1));
277-
ops.emplace_back(zkutil::makeCreateRequest(processing_part_path / "finished_by", storage.replica_name, zkutil::CreateMode::Persistent));
278-
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", "FAILED", -1));
239+
processing_part_entry.status = "FAILED";
240+
processing_part_entry.finished_by = storage.replica_name;
279241

242+
ops.emplace_back(zkutil::makeSetRequest(export_path / "status", "FAILED", -1));
280243
LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit exceeded for part {}, will try to fail the entire task", part_name);
281244
}
282245

@@ -316,4 +279,73 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
316279
}
317280
}
318281

282+
bool ExportPartitionTaskScheduler::tryToMovePartToProcessed(
283+
const std::filesystem::path & export_path,
284+
const std::filesystem::path & processing_parts_path,
285+
const std::filesystem::path & processed_part_path,
286+
const std::string & part_name,
287+
const String & relative_path_in_destination_storage,
288+
const zkutil::ZooKeeperPtr & zk
289+
)
290+
{
291+
Coordination::Stat locked_by_stat;
292+
std::string locked_by;
293+
294+
if (!zk->tryGet(export_path / "locks" / part_name, locked_by, &locked_by_stat))
295+
{
296+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is not locked by any replica, will not commit or set it as completed", part_name);
297+
return false;
298+
}
299+
300+
/// Is this a good idea? what if the file we just pushed to s3 ends up triggering an exception in the replica that actually locks the part and it does not commit?
301+
/// I guess we should not throw if file already exists for export partition, hard coded.
302+
if (locked_by != storage.replica_name)
303+
{
304+
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked by another replica, will not commit or set it as completed", part_name);
305+
return false;
306+
}
307+
308+
Coordination::Requests requests;
309+
310+
ExportReplicatedMergeTreePartitionProcessedPartEntry processed_part_entry;
311+
processed_part_entry.part_name = part_name;
312+
processed_part_entry.path_in_destination = relative_path_in_destination_storage;
313+
processed_part_entry.status = "SUCCESS";
314+
processed_part_entry.finished_by = storage.replica_name;
315+
316+
requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name, -1));
317+
requests.emplace_back(zkutil::makeCreateRequest(processed_part_path, processed_part_entry.toJsonString(), zkutil::CreateMode::Persistent));
318+
requests.emplace_back(zkutil::makeRemoveRequest(export_path / "locks" / part_name, locked_by_stat.version));
319+
320+
Coordination::Responses responses;
321+
if (Coordination::Error::ZOK != zk->tryMulti(requests, responses))
322+
{
323+
/// todo arthur remember what to do here
324+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update export path, skipping");
325+
return false;
326+
}
327+
328+
return true;
329+
}
330+
331+
bool ExportPartitionTaskScheduler::areAllPartsProcessed(
332+
const std::filesystem::path & export_path,
333+
const zkutil::ZooKeeperPtr & zk)
334+
{
335+
Strings parts_in_processing_or_pending;
336+
if (Coordination::Error::ZOK != zk->tryGetChildren(export_path / "processing", parts_in_processing_or_pending))
337+
{
338+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get parts in processing or pending, will not try to commit export partition");
339+
return false;
340+
}
341+
342+
if (!parts_in_processing_or_pending.empty())
343+
{
344+
LOG_INFO(storage.log, "ExportPartition scheduler task: There are still parts in processing or pending, will not try to commit export partition");
345+
return false;
346+
}
347+
348+
return true;
349+
}
350+
319351
}

src/Storages/MergeTree/ExportPartitionTaskScheduler.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,20 @@ class ExportPartitionTaskScheduler
4646
const zkutil::ZooKeeperPtr & zk,
4747
const String & exception,
4848
size_t max_retries);
49+
50+
bool tryToMovePartToProcessed(
51+
const std::filesystem::path & export_path,
52+
const std::filesystem::path & processing_parts_path,
53+
const std::filesystem::path & processed_part_path,
54+
const std::string & part_name,
55+
const String & relative_path_in_destination_storage,
56+
const zkutil::ZooKeeperPtr & zk
57+
);
58+
59+
bool areAllPartsProcessed(
60+
const std::filesystem::path & export_path,
61+
const zkutil::ZooKeeperPtr & zk
62+
);
4963
};
5064

5165
}

0 commit comments

Comments
 (0)