@@ -765,7 +765,7 @@ void event_handler(evutil_socket_t fd, short which, void *arg) {
765765
766766 if (memcached_shutdown) {
767767 // Someone requested memcached to shut down.
768- if (signal_idle_clients (thr, - 1 , false ) == 0 ) {
768+ if (signal_idle_clients (* thr) == 0 ) {
769769 cb_assert (thr != nullptr );
770770 LOG_INFO (" Stopping worker thread {}" , thr->index );
771771 c->eventBaseLoopbreak ();
@@ -781,7 +781,7 @@ void event_handler(evutil_socket_t fd, short which, void *arg) {
781781 if (memcached_shutdown) {
782782 // Someone requested memcached to shut down. If we don't have
783783 // any connections bound to this thread we can just shut down
784- int connected = signal_idle_clients (thr, - 1 , true );
784+ int connected = signal_idle_clients (* thr);
785785 if (connected == 0 ) {
786786 LOG_INFO (" Stopping worker thread {}" , thr->index );
787787 event_base_loopbreak (thr->base );
@@ -1789,22 +1789,6 @@ void CreateBucketThread::run()
17891789 task->makeRunnable ();
17901790}
17911791
1792- void notify_thread_bucket_deletion (FrontEndThread& me) {
1793- for (size_t ii = 0 ; ii < all_buckets.size (); ++ii) {
1794- bool destroy = false ;
1795- {
1796- std::lock_guard<std::mutex> guard (all_buckets[ii].mutex );
1797- if (all_buckets[ii].state == BucketState::Destroying) {
1798- destroy = true ;
1799- }
1800- }
1801-
1802- if (destroy) {
1803- signal_idle_clients (&me, gsl::narrow<int >(ii), false );
1804- }
1805- }
1806- }
1807-
18081792void DestroyBucketThread::destroy () {
18091793 ENGINE_ERROR_CODE ret = ENGINE_KEY_ENOENT;
18101794 std::unique_lock<std::mutex> all_bucket_lock (buckets_lock);
@@ -1861,58 +1845,114 @@ void DestroyBucketThread::destroy() {
18611845
18621846 perform_callbacks (ON_DELETE_BUCKET, nullptr , &all_buckets[idx]);
18631847
1864- LOG_INFO (" {} Delete bucket [{}]. Wait for clients to disconnect" ,
1865- connection_id,
1866- name);
1867-
18681848 /* If this thread is connected to the requested bucket... release it */
18691849 if (connection != nullptr && idx == size_t (connection->getBucketIndex ())) {
18701850 disassociate_bucket (*connection);
18711851 }
18721852
1873- /* Let all of the worker threads start invalidating connections */
1874- threads_initiate_bucket_deletion ();
1875-
1853+ // Wait until all users disconnected...
18761854 auto & bucket = all_buckets[idx];
1877-
1878- /* Wait until all users disconnected... */
18791855 {
18801856 std::unique_lock<std::mutex> guard (bucket.mutex );
1857+ if (bucket.clients > 0 ) {
1858+ LOG_INFO (" {} Delete bucket [{}]. Wait for {} clients to disconnect" ,
1859+ connection_id,
1860+ name,
1861+ bucket.clients );
18811862
1882- while (bucket.clients > 0 ) {
1883- LOG_INFO (
1884- " {} Delete bucket [{}]. Still waiting: {} clients "
1885- " connected" ,
1886- connection_id.c_str (),
1887- name.c_str (),
1888- bucket.clients );
1889- /* drop the lock and notify the worker threads */
1863+ // Signal clients bound to the bucket before waiting
18901864 guard.unlock ();
1891- threads_notify_bucket_deletion ();
1865+ iterate_all_connections ([&bucket](Connection& connection) {
1866+ if (&connection.getBucket () == &bucket) {
1867+ connection.signalIfIdle ();
1868+ }
1869+ });
18921870 guard.lock ();
1893-
1894- bucket.cond .wait_for (guard,
1895- std::chrono::milliseconds (1000 ),
1896- [&bucket] { return bucket.clients == 0 ; });
18971871 }
1898- }
18991872
1900- /* Tell the worker threads to stop trying to invalidating connections */
1901- threads_complete_bucket_deletion () ;
1873+ using std::chrono::seconds;
1874+ using std::chrono::steady_clock ;
19021875
1903- /*
1904- * We cannot call assert_no_assocations(idx) because it iterates
1905- * over all connections and calls c->getBucketIndex(). The problem
1906- * is that a worker thread can call associate_initial_bucket() or
1907- * associate_bucket() at the same time. This could lead to a call
1908- * to c->setBucketIndex(0) (the "no bucket"), which although safe,
1909- * raises a threadsanitizer warning.
1910-
1911- * Note, if associate_bucket() attempts to associate a connection
1912- * with a bucket that has been destroyed, or is in the process of
1913- * being destroyed, the association will fail because
1914- * BucketState != Ready. See associate_bucket() for more details.
1915- */
1876+ auto nextLog = steady_clock::now () + seconds (30 );
1877+ nlohmann::json prevDump;
1878+
1879+ // We need to disconnect all of the clients before we can delete the
1880+ // bucket. We try to log stuff while we're waiting in order to
1881+ // know why stuff isn't complete. We'll do it in the following way:
1882+ //
1883+ // 1. Don't log anything the first 30 seconds. Then
1884+ // dump the state of all stuck connections.
1885+ // 2. Don't og anything for the next 30 seconds. Then
1886+ // dump the state of all stuck connections which
1887+ // changed since the previous dump.
1888+ // 3. goto 2.
1889+ //
1890+ while (bucket.clients > 0 ) {
1891+ bucket.cond .wait_for (guard, seconds (1 ), [&bucket] {
1892+ return bucket.clients == 0 ;
1893+ });
1894+
1895+ if (bucket.clients == 0 ) {
1896+ break ;
1897+ }
1898+
1899+ if (steady_clock::now () < nextLog) {
1900+ guard.unlock ();
1901+ iterate_all_connections ([&bucket](Connection& connection) {
1902+ if (&connection.getBucket () == &bucket) {
1903+ connection.signalIfIdle ();
1904+ }
1905+ });
1906+ guard.lock ();
1907+ continue ;
1908+ }
1909+
1910+ nextLog = steady_clock::now () + seconds (30 );
1911+
1912+ // drop the lock and notify the worker threads
1913+ guard.unlock ();
1914+
1915+ nlohmann::json json;
1916+ iterate_all_connections ([&bucket, &json](Connection& connection) {
1917+ if (&connection.getBucket () == &bucket) {
1918+ if (!connection.signalIfIdle ()) {
1919+ json[std::to_string (connection.getId ())] =
1920+ connection.toJSON ();
1921+ }
1922+ }
1923+ });
1924+
1925+ // remove all connections which didn't change
1926+ for (auto it = prevDump.begin (); it != prevDump.end (); ++it) {
1927+ auto entry = json.find (it.key ());
1928+ if (entry != json.end ()) {
1929+ auto old = it.value ().dump ();
1930+ auto current = entry->dump ();
1931+ if (old == current) {
1932+ json.erase (entry);
1933+ }
1934+ }
1935+ }
1936+
1937+ prevDump = std::move (json);
1938+ if (prevDump.empty ()) {
1939+ LOG_INFO (
1940+ R"( {} Delete bucket [{}]. Still waiting: {} clients connected (state is unchanged).)" ,
1941+ connection_id,
1942+ name,
1943+ bucket.clients );
1944+ } else {
1945+ LOG_INFO (
1946+ R"( {} Delete bucket [{}]. Still waiting: {} clients connected: {})" ,
1947+ connection_id,
1948+ name,
1949+ bucket.clients ,
1950+ prevDump.dump ());
1951+ }
1952+
1953+ guard.lock ();
1954+ }
1955+ }
19161956
19171957 LOG_INFO (
19181958 " {} Delete bucket [{}]. Shut down the bucket" , connection_id, name);
@@ -1923,7 +1963,7 @@ void DestroyBucketThread::destroy() {
19231963 connection_id,
19241964 name);
19251965
1926- /* Clean up the stats... */
1966+ // Clean up the stats...
19271967 threadlocal_stats_reset (bucket.stats );
19281968
19291969 // Clear any registered event handlers
0 commit comments