@@ -65,12 +65,10 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
6565 discard_callback(d_cb),
6666 discard_callback_priv(d_cbpriv),
6767 aio_stop(false ),
68- discard_started(false ),
69- discard_stop(false ),
7068 aio_thread(this ),
71- discard_thread(this ),
7269 injecting_crash(0 )
7370{
71+ cct->_conf .add_observer (this );
7472 fd_directs.resize (WRITE_LIFE_MAX, -1 );
7573 fd_buffereds.resize (WRITE_LIFE_MAX, -1 );
7674
@@ -92,6 +90,11 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
9290 }
9391}
9492
93+ KernelDevice::~KernelDevice ()
94+ {
95+ cct->_conf .remove_observer (this );
96+ }
97+
9598int KernelDevice::_lock ()
9699{
97100 // When the block changes, systemd-udevd will open the block,
@@ -131,6 +134,7 @@ int KernelDevice::open(const string& p)
131134{
132135 path = p;
133136 int r = 0 , i = 0 ;
137+ uint64_t num_discard_threads = 0 ;
134138 dout (1 ) << __func__ << " path " << path << dendl;
135139
136140 struct stat statbuf;
@@ -281,7 +285,9 @@ int KernelDevice::open(const string& p)
281285 if (r < 0 ) {
282286 goto out_fail;
283287 }
284- if (support_discard && cct->_conf ->bdev_enable_discard && cct->_conf ->bdev_async_discard ) {
288+
289+ num_discard_threads = cct->_conf .get_val <uint64_t >(" bdev_async_discard_threads" );
290+ if (support_discard && cct->_conf ->bdev_enable_discard && num_discard_threads > 0 ) {
285291 _discard_start ();
286292 }
287293
@@ -330,7 +336,7 @@ void KernelDevice::close()
330336{
331337 dout (1 ) << __func__ << dendl;
332338 _aio_stop ();
333- if (discard_thread. is_started ()) {
339+ if (_discard_started ()) {
334340 _discard_stop ();
335341 }
336342 _pre_close ();
@@ -532,28 +538,55 @@ void KernelDevice::_aio_stop()
532538
533539void KernelDevice::_discard_start ()
534540{
535- discard_thread.create (" bstore_discard" );
541+ uint64_t num = cct->_conf .get_val <uint64_t >(" bdev_async_discard_threads" );
542+ dout (10 ) << __func__ << " starting " << num << " threads" << dendl;
543+
544+ std::unique_lock l (discard_lock);
545+
546+ target_discard_threads = num;
547+ discard_threads.reserve (num);
548+ for (uint64_t i = 0 ; i < num; i++)
549+ {
550+ // All threads created with the same name
551+ discard_threads.emplace_back (new DiscardThread (this , i));
552+ discard_threads.back ()->create (" bstore_discard" );
553+ }
554+
555+ dout (10 ) << __func__ << " started " << num << " threads" << dendl;
536556}
537557
538558void KernelDevice::_discard_stop ()
539559{
540560 dout (10 ) << __func__ << dendl;
561+
562+ // Signal threads to stop, then wait for them to join
541563 {
542564 std::unique_lock l (discard_lock);
543- while (!discard_started ) {
565+ while (discard_threads. empty () ) {
544566 discard_cond.wait (l);
545567 }
546- discard_stop = true ;
568+
569+ for (auto &t : discard_threads) {
570+ t->stop = true ;
571+ }
572+
547573 discard_cond.notify_all ();
548574 }
549- discard_thread.join ();
550- {
551- std::lock_guard l (discard_lock);
552- discard_stop = false ;
553- }
575+
576+ // Threads are shared pointers and are cleaned up for us
577+ for (auto &t : discard_threads)
578+ t->join ();
579+ discard_threads.clear ();
580+
554581 dout (10 ) << __func__ << " stopped" << dendl;
555582}
556583
584+ bool KernelDevice::_discard_started ()
585+ {
586+ std::unique_lock l (discard_lock);
587+ return !discard_threads.empty ();
588+ }
589+
557590void KernelDevice::discard_drain ()
558591{
559592 dout (10 ) << __func__ << dendl;
@@ -567,7 +600,7 @@ static bool is_expected_ioerr(const int r)
567600{
568601 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
569602 return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
570- r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
603+ r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
571604 r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
572605#if defined(__linux__)
573606 r == -EREMCHG || r == -EBADE
@@ -698,64 +731,75 @@ void KernelDevice::_aio_thread()
698731 dout (10 ) << __func__ << " end" << dendl;
699732}
700733
701- void KernelDevice::_discard_thread ()
734+ void KernelDevice::_discard_thread (uint64_t tid )
702735{
736+ dout (10 ) << __func__ << " thread " << tid << " start" << dendl;
737+
738+ // Thread-local list of processing discards
739+ interval_set<uint64_t > discard_processing;
740+
703741 std::unique_lock l (discard_lock);
704- ceph_assert (!discard_started);
705- discard_started = true ;
706742 discard_cond.notify_all ();
743+
744+ // Keeps the shared pointer around until erased from the vector
745+ // and until we leave this function
746+ auto thr = discard_threads[tid];
747+
707748 while (true ) {
708- ceph_assert (discard_finishing .empty ());
749+ ceph_assert (discard_processing .empty ());
709750 if (discard_queued.empty ()) {
710- if (discard_stop )
751+ if (thr-> stop )
711752 break ;
712753 dout (20 ) << __func__ << " sleep" << dendl;
713754 discard_cond.notify_all (); // for the thread trying to drain...
714755 discard_cond.wait (l);
715756 dout (20 ) << __func__ << " wake" << dendl;
716757 } else {
717- discard_finishing.swap (discard_queued);
758+ // Swap the queued discards for a local list we'll process here
759+ // without caring about thread fairness. This allows the current
760+ // thread to wait on the discard running while other threads pick
761+ // up the next-in-queue, and do the same, ultimately issuing more
762+ // discards in parallel, which is the goal.
763+ discard_processing.swap (discard_queued);
718764 discard_running = true ;
719765 l.unlock ();
720766 dout (20 ) << __func__ << " finishing" << dendl;
721- for (auto p = discard_finishing .begin ();p != discard_finishing .end (); ++p) {
722- _discard (p.get_start (), p.get_len ());
767+ for (auto p = discard_processing .begin (); p != discard_processing .end (); ++p) {
768+ _discard (p.get_start (), p.get_len ());
723769 }
724770
725- discard_callback (discard_callback_priv, static_cast <void *>(&discard_finishing ));
726- discard_finishing .clear ();
771+ discard_callback (discard_callback_priv, static_cast <void *>(&discard_processing ));
772+ discard_processing .clear ();
727773 l.lock ();
728774 discard_running = false ;
729775 }
730776 }
731- dout ( 10 ) << __func__ << " finish " << dendl;
732- discard_started = false ;
777+
778+ dout ( 10 ) << __func__ << " thread " << tid << " finish " << dendl ;
733779}
734780
735- int KernelDevice::_queue_discard (interval_set<uint64_t > &to_release)
781+ // this is private and is expected that the caller checks that discard
782+ // threads are running via _discard_started()
783+ void KernelDevice::_queue_discard (interval_set<uint64_t > &to_release)
736784{
737- // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
738- if (!discard_thread.is_started ())
739- return -1 ;
740-
741785 if (to_release.empty ())
742- return 0 ;
786+ return ;
743787
744788 std::lock_guard l (discard_lock);
745789 discard_queued.insert (to_release);
746- discard_cond.notify_all ();
747- return 0 ;
790+ discard_cond.notify_one ();
748791}
749792
750- // return true only if _queue_discard succeeded , so caller won't have to do alloc->release
751- // otherwise false
793+ // return true only if discard was queued , so caller won't have to do
794+ // alloc->release, otherwise return false
752795bool KernelDevice::try_discard (interval_set<uint64_t > &to_release, bool async)
753796{
754797 if (!support_discard || !cct->_conf ->bdev_enable_discard )
755798 return false ;
756799
757- if (async && discard_thread.is_started ()) {
758- return 0 == _queue_discard (to_release);
800+ if (async && _discard_started ()) {
801+ _queue_discard (to_release);
802+ return true ;
759803 } else {
760804 for (auto p = to_release.begin (); p != to_release.end (); ++p) {
761805 _discard (p.get_start (), p.get_len ());
@@ -1447,3 +1491,57 @@ int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
14471491 }
14481492 return r;
14491493}
1494+
1495+ const char ** KernelDevice::get_tracked_conf_keys () const
1496+ {
1497+ static const char * KEYS[] = {
1498+ " bdev_async_discard_threads" ,
1499+ NULL
1500+ };
1501+ return KEYS;
1502+ }
1503+
1504+ void KernelDevice::handle_conf_change (const ConfigProxy& conf,
1505+ const std::set <std::string> &changed)
1506+ {
1507+ if (changed.count (" bdev_async_discard_threads" )) {
1508+ std::unique_lock l (discard_lock);
1509+
1510+ uint64_t oldval = target_discard_threads;
1511+ uint64_t newval = cct->_conf .get_val <uint64_t >(" bdev_async_discard_threads" );
1512+
1513+ target_discard_threads = newval;
1514+
1515+ // Increase? Spawn now, it's quick
1516+ if (newval > oldval) {
1517+ dout (10 ) << __func__ << " starting " << (newval - oldval) << " additional discard threads" << dendl;
1518+ discard_threads.reserve (target_discard_threads);
1519+ for (uint64_t i = oldval; i < newval; i++)
1520+ {
1521+ // All threads created with the same name
1522+ discard_threads.emplace_back (new DiscardThread (this , i));
1523+ discard_threads.back ()->create (" bstore_discard" );
1524+ }
1525+ } else {
1526+ // Decrease? Signal threads after telling them to stop
1527+ dout (10 ) << __func__ << " stopping " << (oldval - newval) << " existing discard threads" << dendl;
1528+
1529+ // Decreasing to zero is exactly the same as disabling async discard.
1530+ // Signal all threads to stop
1531+ if (newval == 0 ) {
1532+ _discard_stop ();
1533+ } else {
1534+ // Signal the last threads to quit, and stop tracking them
1535+ for (uint64_t i = oldval - 1 ; i >= newval; i--)
1536+ {
1537+ // Also detach the thread so we no longer need to join
1538+ discard_threads[i]->stop = true ;
1539+ discard_threads[i]->detach ();
1540+ discard_threads.erase (discard_threads.begin () + i);
1541+ }
1542+ }
1543+
1544+ discard_cond.notify_all ();
1545+ }
1546+ }
1547+ }
0 commit comments