22#include < filesystem>
33#include < mutex>
44#include < ranges>
5+ #include < variant>
56#include < Coordination/Changelog.h>
67#include < Coordination/Keeper4LWInfo.h>
78#include < Coordination/KeeperContext.h>
@@ -47,6 +48,7 @@ namespace ErrorCodes
4748 extern const int NOT_IMPLEMENTED;
4849 extern const int BAD_ARGUMENTS;
4950 extern const int LOGICAL_ERROR;
51+ extern const int SYSTEM_ERROR;
5052}
5153
5254namespace
@@ -125,6 +127,41 @@ Checksum computeRecordChecksum(const ChangelogRecord & record)
125127 return hash.get64 ();
126128}
127129
130+ struct RemoveChangelog
131+ {
132+ };
133+
134+ struct MoveChangelog
135+ {
136+ std::string new_path;
137+ DiskPtr new_disk;
138+ };
139+
140+ }
141+
142+ using ChangelogFileOperationVariant = std::variant<RemoveChangelog, MoveChangelog>;
143+
144+ struct ChangelogFileOperation
145+ {
146+ explicit ChangelogFileOperation (ChangelogFileDescriptionPtr changelog_, ChangelogFileOperationVariant operation_)
147+ : changelog(std::move(changelog_))
148+ , operation(std::move(operation_))
149+ {}
150+
151+ ChangelogFileDescriptionPtr changelog;
152+ ChangelogFileOperationVariant operation;
153+ std::atomic<bool > done = false ;
154+ };
155+
156+ void ChangelogFileDescription::waitAllAsyncOperations ()
157+ {
158+ for (const auto & op : file_operations)
159+ {
160+ if (auto op_locked = op.lock ())
161+ op_locked->done .wait (false );
162+ }
163+
164+ file_operations.clear ();
128165}
129166
130167// / Appendable log writer
@@ -134,17 +171,20 @@ Checksum computeRecordChecksum(const ChangelogRecord & record)
134171// / At least 1 log record should be contained in each log
135172class ChangelogWriter
136173{
174+ using MoveChangelogCallback = std::function<void (ChangelogFileDescriptionPtr, std::string, DiskPtr)>;
137175public:
138176 ChangelogWriter (
139177 std::map<uint64_t , ChangelogFileDescriptionPtr> & existing_changelogs_,
140178 LogEntryStorage & entry_storage_,
141179 KeeperContextPtr keeper_context_,
142- LogFileSettings log_file_settings_)
180+ LogFileSettings log_file_settings_,
181+ MoveChangelogCallback move_changelog_cb_)
143182 : existing_changelogs(existing_changelogs_)
144183 , entry_storage(entry_storage_)
145184 , log_file_settings(log_file_settings_)
146185 , keeper_context(std::move(keeper_context_))
147186 , log(getLogger(" Changelog" ))
187+ , move_changelog_cb(std::move(move_changelog_cb_))
148188 {
149189 }
150190
@@ -190,25 +230,8 @@ class ChangelogWriter
190230 current_file_description->extension );
191231 }
192232
193- if (disk == log_disk)
194- {
195- if (path != new_path)
196- {
197- try
198- {
199- disk->moveFile (path, new_path);
200- }
201- catch (...)
202- {
203- tryLogCurrentException (log, fmt::format (" File rename failed on disk {}" , disk->getName ()));
204- }
205- current_file_description->path = std::move (new_path);
206- }
207- }
208- else
209- {
210- moveChangelogBetweenDisks (log_disk, current_file_description, disk, new_path, keeper_context);
211- }
233+ if (move_changelog_cb)
234+ move_changelog_cb (current_file_description, std::move (new_path), disk);
212235 }
213236 }
214237 else
@@ -516,6 +539,8 @@ class ChangelogWriter
516539 KeeperContextPtr keeper_context;
517540
518541 LoggerPtr const log;
542+
543+ MoveChangelogCallback move_changelog_cb;
519544};
520545
521546namespace
@@ -1452,11 +1477,11 @@ LogEntriesPtr LogEntryStorage::getLogEntriesBetween(uint64_t start, uint64_t end
14521477 if (!read_info)
14531478 return ;
14541479
1455- LOG_TRACE (log, " Reading from path {} {} entries" , read_info->file_description ->path , read_info->count );
14561480 read_info->file_description ->withLock (
14571481 [&]
14581482 {
14591483 const auto & [file_description, start_position, count] = *read_info;
1484+ LOG_TRACE (log, " Reading from path {} {} entries" , file_description->path , read_info->count );
14601485 auto file = file_description->disk ->readFile (file_description->path , getReadSettings ());
14611486 file->seek (start_position, SEEK_SET);
14621487
@@ -1694,13 +1719,19 @@ Changelog::Changelog(
16941719 if (existing_changelogs.empty ())
16951720 LOG_WARNING (log, " No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper." , disk->getPath ());
16961721
1697- clean_log_thread = std::make_unique<ThreadFromGlobalPool>([this ] { cleanLogThread (); });
1722+ background_changelog_operations_thread = std::make_unique<ThreadFromGlobalPool>([this ] { backgroundChangelogOperationsThread (); });
16981723
16991724 write_thread = std::make_unique<ThreadFromGlobalPool>([this ] { writeThread (); });
17001725
17011726 append_completion_thread = std::make_unique<ThreadFromGlobalPool>([this ] { appendCompletionThread (); });
17021727
1703- current_writer = std::make_unique<ChangelogWriter>(existing_changelogs, entry_storage, keeper_context, log_file_settings);
1728+ current_writer = std::make_unique<ChangelogWriter>(
1729+ existing_changelogs,
1730+ entry_storage,
1731+ keeper_context,
1732+ log_file_settings,
1733+ /* move_changelog_cb=*/ [&](ChangelogFileDescriptionPtr changelog, std::string new_path, DiskPtr new_disk)
1734+ { moveChangelogAsync (std::move (changelog), std::move (new_path), std::move (new_disk)); });
17041735 }
17051736 catch (...)
17061737 {
@@ -1844,7 +1875,7 @@ try
18441875 removeAllLogsAfter (last_log_read_result->log_start_index );
18451876
18461877 // / This log, even if it finished with error shouldn't be removed
1847- chassert (existing_changelogs.find (last_log_read_result->log_start_index ) != existing_changelogs. end ( ));
1878+ chassert (existing_changelogs.contains (last_log_read_result->log_start_index ));
18481879 chassert (existing_changelogs.find (last_log_read_result->log_start_index )->first == existing_changelogs.rbegin ()->first );
18491880 };
18501881
@@ -2190,6 +2221,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
21902221 else
21912222 description = std::prev (index_changelog)->second ;
21922223
2224+ description->waitAllAsyncOperations ();
21932225 // / if the changelog is broken at end, we cannot append it with new logs
21942226 // / we create a new file starting with the required index
21952227 if (description->broken_at_end )
@@ -2212,7 +2244,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
22122244 auto to_remove_itr = existing_changelogs.upper_bound (index);
22132245 for (auto itr = to_remove_itr; itr != existing_changelogs.end ();)
22142246 {
2215- itr-> second -> disk -> removeFile (itr->second -> path );
2247+ removeChangelogAsync (itr->second );
22162248 itr = existing_changelogs.erase (itr);
22172249 }
22182250 }
@@ -2247,6 +2279,7 @@ void Changelog::compact(uint64_t up_to_log_index)
22472279 for (auto itr = existing_changelogs.begin (); itr != existing_changelogs.end ();)
22482280 {
22492281 auto & changelog_description = *itr->second ;
2282+ auto path = changelog_description.getPathSafe ();
22502283 // / Remove all completely outdated changelog files
22512284 if (remove_all_logs || changelog_description.to_log_index <= up_to_log_index)
22522285 {
@@ -2255,31 +2288,12 @@ void Changelog::compact(uint64_t up_to_log_index)
22552288 LOG_INFO (
22562289 log,
22572290 " Trying to remove log {} which is current active log for write. Possibly this node recovers from snapshot" ,
2258- changelog_description. path );
2291+ path);
22592292 need_rotate = true ;
22602293 }
22612294
2262- LOG_INFO (log, " Removing changelog {} because of compaction" , changelog_description.path );
2263-
2264- // / If failed to push to queue for background removing, then we will remove it now
2265- if (!log_files_to_delete_queue.tryPush ({changelog_description.path , changelog_description.disk }, 1 ))
2266- {
2267- try
2268- {
2269- changelog_description.disk ->removeFile (changelog_description.path );
2270- LOG_INFO (log, " Removed changelog {} because of compaction." , changelog_description.path );
2271- }
2272- catch (Exception & e)
2273- {
2274- LOG_WARNING (
2275- log, " Failed to remove changelog {} in compaction, error message: {}" , changelog_description.path , e.message ());
2276- }
2277- catch (...)
2278- {
2279- tryLogCurrentException (log);
2280- }
2281- }
2282-
2295+ LOG_INFO (log, " Removing changelog {} because of compaction" , path);
2296+ removeChangelogAsync (itr->second );
22832297 changelog_description.deleted = true ;
22842298
22852299 itr = existing_changelogs.erase (itr);
@@ -2434,11 +2448,11 @@ uint64_t Changelog::size() const
24342448void Changelog::shutdown ()
24352449{
24362450 LOG_DEBUG (log, " Shutting down Changelog" );
2437- if (!log_files_to_delete_queue .isFinished ())
2438- log_files_to_delete_queue .finish ();
2451+ if (!changelog_operation_queue .isFinished ())
2452+ changelog_operation_queue .finish ();
24392453
2440- if (clean_log_thread ->joinable ())
2441- clean_log_thread ->join ();
2454+ if (background_changelog_operations_thread ->joinable ())
2455+ background_changelog_operations_thread ->join ();
24422456
24432457 if (!write_operations.isFinished ())
24442458 write_operations.finish ();
@@ -2482,26 +2496,83 @@ Changelog::~Changelog()
24822496 }
24832497}
24842498
2485- void Changelog::cleanLogThread ()
2499+ void Changelog::backgroundChangelogOperationsThread ()
24862500{
2487- std::pair<std::string, DiskPtr> path_with_disk ;
2488- while (log_files_to_delete_queue .pop (path_with_disk ))
2501+ ChangelogFileOperationPtr changelog_operation ;
2502+ while (changelog_operation_queue .pop (changelog_operation ))
24892503 {
2490- const auto & [path, disk] = path_with_disk;
2491- try
2504+ if (std::holds_alternative<RemoveChangelog>(changelog_operation->operation ))
24922505 {
2493- disk->removeFile (path);
2494- LOG_INFO (log, " Removed changelog {} because of compaction." , path);
2506+ chassert (changelog_operation->changelog );
2507+ const auto & changelog = *changelog_operation->changelog ;
2508+ try
2509+ {
2510+ changelog.disk ->removeFile (changelog.path );
2511+ LOG_INFO (log, " Removed changelog {} because of compaction." , changelog.path );
2512+ }
2513+ catch (Exception & e)
2514+ {
2515+ LOG_WARNING (log, " Failed to remove changelog {} in compaction, error message: {}" , changelog.path , e.message ());
2516+ }
2517+ catch (...)
2518+ {
2519+ tryLogCurrentException (log);
2520+ }
24952521 }
2496- catch (Exception & e )
2522+ else if ( auto * move_operation = std::get_if<MoveChangelog>(&changelog_operation-> operation ) )
24972523 {
2498- LOG_WARNING (log, " Failed to remove changelog {} in compaction, error message: {}" , path, e.message ());
2524+ const auto & changelog = changelog_operation->changelog ;
2525+
2526+ if (move_operation->new_disk == changelog->disk )
2527+ {
2528+ if (move_operation->new_path != changelog->path )
2529+ {
2530+ try
2531+ {
2532+ changelog->disk ->moveFile (changelog->path , move_operation->new_path );
2533+ }
2534+ catch (...)
2535+ {
2536+ tryLogCurrentException (log, fmt::format (" File rename failed on disk {}" , changelog->disk ->getName ()));
2537+ }
2538+ changelog->path = std::move (move_operation->new_path );
2539+ }
2540+ }
2541+ else
2542+ {
2543+ moveChangelogBetweenDisks (changelog->disk , changelog, move_operation->new_disk , move_operation->new_path , keeper_context);
2544+ }
24992545 }
2500- catch (...)
2546+ else
25012547 {
2502- tryLogCurrentException (log);
2548+ LOG_ERROR (log, " Unsupported operation detected for changelog {}" , changelog_operation->changelog ->path );
2549+ chassert (false );
25032550 }
2551+ changelog_operation->done = true ;
2552+ }
2553+ }
2554+
2555+ void Changelog::modifyChangelogAsync (ChangelogFileOperationPtr changelog_operation)
2556+ {
2557+ if (!changelog_operation_queue.tryPush (changelog_operation, 60 * 1000 ))
2558+ {
2559+ throw DB::Exception (
2560+ ErrorCodes::SYSTEM_ERROR, " Background thread for changelog operations is stuck or not keeping up with operations" );
25042561 }
2562+
2563+ changelog_operation->changelog ->file_operations .push_back (changelog_operation);
2564+ }
2565+
2566+ void Changelog::removeChangelogAsync (ChangelogFileDescriptionPtr changelog)
2567+ {
2568+ modifyChangelogAsync (std::make_shared<ChangelogFileOperation>(std::move (changelog), RemoveChangelog{}));
2569+ }
2570+
2571+ void Changelog::moveChangelogAsync (ChangelogFileDescriptionPtr changelog, std::string new_path, DiskPtr new_disk)
2572+ {
2573+ modifyChangelogAsync (
2574+ std::make_shared<ChangelogFileOperation>(
2575+ std::move (changelog), MoveChangelog{.new_path = std::move (new_path), .new_disk = std::move (new_disk)}));
25052576}
25062577
25072578void Changelog::setRaftServer (const nuraft::ptr<nuraft::raft_server> & raft_server_)
0 commit comments