@@ -22,7 +22,6 @@ void ExportPartitionTaskScheduler::run()
2222
2323 for (const auto & [key, entry] : storage.export_merge_tree_partition_task_entries )
2424 {
25-
2625 const auto & manifest = entry.manifest ;
2726 const auto & database = storage.getContext ()->resolveDatabase (manifest.destination_database );
2827 const auto & table = manifest.destination_table ;
@@ -112,11 +111,13 @@ void ExportPartitionTaskScheduler::run()
112111 {
113112 tryLogCurrentException (__PRETTY_FUNCTION__);
114113 zk->tryRemove (fs::path (storage.zookeeper_path ) / " exports" / key / " locks" / zk_part_name);
115- // / todo arthur re-schedule this so we can try later
116114 // / we should not increment retry_count because the node might just be full
117115 }
118116 }
119117 }
118+
119+ // / maybe we failed to schedule or failed to export, need to retry eventually
120+ storage.export_merge_tree_partition_select_task ->scheduleAfter (1000 * 5 );
120121}
121122
122123void ExportPartitionTaskScheduler::handlePartExportCompletion (
@@ -137,7 +138,7 @@ void ExportPartitionTaskScheduler::handlePartExportCompletion(
137138 }
138139 else
139140 {
140- handlePartExportFailure (processing_parts_path, processed_part_path, part_name, export_path, zk, result.exception );
141+ handlePartExportFailure (processing_parts_path, part_name, export_path, zk, result.exception , manifest. max_retries );
141142 }
142143}
143144
@@ -182,7 +183,6 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
182183 // Remove children before parent (order matters for multi operations)
183184 // Maybe a ls + multi rm..
184185 requests.emplace_back (zkutil::makeRemoveRequest (processing_parts_path / part_name / " retry_count" , -1 ));
185- requests.emplace_back (zkutil::makeRemoveRequest (processing_parts_path / part_name / " max_retry" , -1 ));
186186 requests.emplace_back (zkutil::makeRemoveRequest (processing_parts_path / part_name / " status" , -1 ));
187187 requests.emplace_back (zkutil::makeRemoveRequest (processing_parts_path / part_name, -1 ));
188188 }
@@ -223,15 +223,30 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
223223
224224void ExportPartitionTaskScheduler::handlePartExportFailure (
225225 const std::filesystem::path & processing_parts_path,
226- const std::filesystem::path & processed_part_path,
227226 const std::string & part_name,
228227 const std::filesystem::path & export_path,
229228 const zkutil::ZooKeeperPtr & zk,
230- const String & exception
229+ const String & exception,
230+ size_t max_retries
231231)
232232{
233233 tryLogCurrentException (__PRETTY_FUNCTION__);
234234
235+ Coordination::Stat locked_by_stat;
236+ std::string locked_by;
237+
238+ if (!zk->tryGet (export_path / " locks" / part_name, locked_by, &locked_by_stat))
239+ {
240+ LOG_INFO (storage.log , " ExportPartition: Part {} is not locked by any replica, will not increment error counts" , part_name);
241+ return ;
242+ }
243+
244+ if (locked_by != storage.replica_name )
245+ {
246+ LOG_INFO (storage.log , " ExportPartition: Part {} is locked by another replica, will not increment error counts" , part_name);
247+ return ;
248+ }
249+
235250 Coordination::Requests ops;
236251
237252 const auto processing_part_path = processing_parts_path / part_name;
@@ -240,21 +255,17 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
240255 {
241256 std::size_t retry_count = std::stoull (retry_count_string.c_str ()) + 1 ;
242257
243-
244258 ops.emplace_back (zkutil::makeSetRequest (processing_part_path / " retry_count" , std::to_string (retry_count), -1 ));
245- ops.emplace_back (zkutil::makeRemoveRequest (export_path / " locks" / part_name, - 1 ));
259+ ops.emplace_back (zkutil::makeRemoveRequest (export_path / " locks" / part_name, locked_by_stat. version ));
246260
247- if (retry_count >= 3 )
261+ if (retry_count >= max_retries )
248262 {
249- // / remove from processing and create in processed with status FAILED
250-
251- ops.emplace_back (zkutil::makeRemoveRequest (processing_parts_path / part_name, -1 ));
252- ops.emplace_back (zkutil::makeCreateRequest (processed_part_path, " " , zkutil::CreateMode::Persistent));
253- ops.emplace_back (zkutil::makeCreateRequest (processed_part_path / " status" , " FAILED" , zkutil::CreateMode::Persistent));
254- ops.emplace_back (zkutil::makeCreateRequest (processed_part_path / " finished_by" , storage.replica_name , zkutil::CreateMode::Persistent));
263+ // / just set status in processing_part_path and finished_by
264+ ops.emplace_back (zkutil::makeSetRequest (processing_part_path / " status" , " FAILED" , -1 ));
265+ ops.emplace_back (zkutil::makeCreateRequest (processing_part_path / " finished_by" , storage.replica_name , zkutil::CreateMode::Persistent));
255266 ops.emplace_back (zkutil::makeSetRequest (export_path / " status" , " FAILED" , -1 ));
256267
257- LOG_INFO (storage.log , " ExportPartition: Marked part export {} as failed " , part_name);
268+ LOG_INFO (storage.log , " ExportPartition: Retry count limit exceeded for part {}, will try to fail the entire task " , part_name);
258269 }
259270
260271 std::size_t num_exceptions = 0 ;
0 commit comments