@@ -65,12 +65,10 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
6565 discard_callback(d_cb),
6666 discard_callback_priv(d_cbpriv),
6767 aio_stop(false ),
68- discard_started(false ),
69- discard_stop(false ),
7068 aio_thread(this ),
71- discard_thread(this ),
7269 injecting_crash(0 )
7370{
71+ cct->_conf .add_observer (this );
7472 fd_directs.resize (WRITE_LIFE_MAX, -1 );
7573 fd_buffereds.resize (WRITE_LIFE_MAX, -1 );
7674
@@ -92,6 +90,11 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
9290 }
9391}
9492
93+ KernelDevice::~KernelDevice ()
94+ {
95+ cct->_conf .remove_observer (this );
96+ }
97+
9598int KernelDevice::_lock ()
9699{
97100 // When the block changes, systemd-udevd will open the block,
@@ -131,6 +134,7 @@ int KernelDevice::open(const string& p)
131134{
132135 path = p;
133136 int r = 0 , i = 0 ;
137+ uint64_t num_discard_threads = 0 ;
134138 dout (1 ) << __func__ << " path " << path << dendl;
135139
136140 struct stat statbuf;
@@ -281,7 +285,9 @@ int KernelDevice::open(const string& p)
281285 if (r < 0 ) {
282286 goto out_fail;
283287 }
284- if (support_discard && cct->_conf ->bdev_enable_discard && cct->_conf ->bdev_async_discard ) {
288+
289+ num_discard_threads = cct->_conf .get_val <uint64_t >(" bdev_async_discard_threads" );
290+ if (support_discard && cct->_conf ->bdev_enable_discard && num_discard_threads > 0 ) {
285291 _discard_start ();
286292 }
287293
@@ -330,7 +336,7 @@ void KernelDevice::close()
330336{
331337 dout (1 ) << __func__ << dendl;
332338 _aio_stop ();
333- if (discard_thread. is_started ()) {
339+ if (_discard_started ()) {
334340 _discard_stop ();
335341 }
336342 _pre_close ();
@@ -532,28 +538,55 @@ void KernelDevice::_aio_stop()
532538
533539void KernelDevice::_discard_start ()
534540{
535- discard_thread.create (" bstore_discard" );
541+ uint64_t num = cct->_conf .get_val <uint64_t >(" bdev_async_discard_threads" );
542+ dout (10 ) << __func__ << " starting " << num << " threads" << dendl;
543+
544+ std::unique_lock l (discard_lock);
545+
546+ target_discard_threads = num;
547+ discard_threads.reserve (num);
548+ for (uint64_t i = 0 ; i < num; i++)
549+ {
550+ // All threads created with the same name
551+ discard_threads.emplace_back (new DiscardThread (this , i));
552+ discard_threads.back ()->create (" bstore_discard" );
553+ }
554+
555+ dout (10 ) << __func__ << " started " << num << " threads" << dendl;
536556}
537557
538558void KernelDevice::_discard_stop ()
539559{
540560 dout (10 ) << __func__ << dendl;
561+
562+ // Signal threads to stop, then wait for them to join
541563 {
542564 std::unique_lock l (discard_lock);
543- while (!discard_started ) {
565+ while (discard_threads. empty () ) {
544566 discard_cond.wait (l);
545567 }
546- discard_stop = true ;
568+
569+ for (auto &t : discard_threads) {
570+ t->stop = true ;
571+ }
572+
547573 discard_cond.notify_all ();
548574 }
549- discard_thread.join ();
550- {
551- std::lock_guard l (discard_lock);
552- discard_stop = false ;
553- }
575+
576+ // Threads are shared pointers and are cleaned up for us
577+ for (auto &t : discard_threads)
578+ t->join ();
579+ discard_threads.clear ();
580+
554581 dout (10 ) << __func__ << " stopped" << dendl;
555582}
556583
584+ bool KernelDevice::_discard_started ()
585+ {
586+ std::unique_lock l (discard_lock);
587+ return !discard_threads.empty ();
588+ }
589+
557590void KernelDevice::discard_drain ()
558591{
559592 dout (10 ) << __func__ << dendl;
@@ -567,7 +600,7 @@ static bool is_expected_ioerr(const int r)
567600{
568601 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
569602 return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
570- r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
603+ r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
571604 r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
572605#if defined(__linux__)
573606 r == -EREMCHG || r == -EBADE
@@ -698,52 +731,65 @@ void KernelDevice::_aio_thread()
698731 dout (10 ) << __func__ << " end" << dendl;
699732}
700733
701- void KernelDevice::_discard_thread ()
734+ void KernelDevice::_discard_thread (uint64_t tid )
702735{
736+ dout (10 ) << __func__ << " thread " << tid << " start" << dendl;
737+
738+ // Thread-local list of processing discards
739+ interval_set<uint64_t > discard_processing;
740+
703741 std::unique_lock l (discard_lock);
704- ceph_assert (!discard_started);
705- discard_started = true ;
706742 discard_cond.notify_all ();
743+
744+ // Keeps the shared pointer around until erased from the vector
745+ // and until we leave this function
746+ auto thr = discard_threads[tid];
747+
707748 while (true ) {
708- ceph_assert (discard_finishing .empty ());
749+ ceph_assert (discard_processing .empty ());
709750 if (discard_queued.empty ()) {
710- if (discard_stop )
751+ if (thr-> stop )
711752 break ;
712753 dout (20 ) << __func__ << " sleep" << dendl;
713754 discard_cond.notify_all (); // for the thread trying to drain...
714755 discard_cond.wait (l);
715756 dout (20 ) << __func__ << " wake" << dendl;
716757 } else {
717- discard_finishing.swap (discard_queued);
758+ // Swap the queued discards for a local list we'll process here
759+ // without caring about thread fairness. This allows the current
760+ // thread to wait on the discard running while other threads pick
761+ // up the next-in-queue, and do the same, ultimately issuing more
762+ // discards in parallel, which is the goal.
763+ discard_processing.swap (discard_queued);
718764 discard_running = true ;
719765 l.unlock ();
720766 dout (20 ) << __func__ << " finishing" << dendl;
721- for (auto p = discard_finishing .begin ();p != discard_finishing .end (); ++p) {
722- _discard (p.get_start (), p.get_len ());
767+ for (auto p = discard_processing .begin (); p != discard_processing .end (); ++p) {
768+ _discard (p.get_start (), p.get_len ());
723769 }
724770
725- discard_callback (discard_callback_priv, static_cast <void *>(&discard_finishing ));
726- discard_finishing .clear ();
771+ discard_callback (discard_callback_priv, static_cast <void *>(&discard_processing ));
772+ discard_processing .clear ();
727773 l.lock ();
728774 discard_running = false ;
729775 }
730776 }
731- dout ( 10 ) << __func__ << " finish " << dendl;
732- discard_started = false ;
777+
778+ dout ( 10 ) << __func__ << " thread " << tid << " finish " << dendl ;
733779}
734780
735781int KernelDevice::_queue_discard (interval_set<uint64_t > &to_release)
736782{
737783 // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
738- if (!discard_thread. is_started ())
784+ if (!_discard_started ())
739785 return -1 ;
740786
741787 if (to_release.empty ())
742788 return 0 ;
743789
744790 std::lock_guard l (discard_lock);
745791 discard_queued.insert (to_release);
746- discard_cond.notify_all ();
792+ discard_cond.notify_one ();
747793 return 0 ;
748794}
749795
@@ -754,7 +800,7 @@ bool KernelDevice::try_discard(interval_set<uint64_t> &to_release, bool async)
754800 if (!support_discard || !cct->_conf ->bdev_enable_discard )
755801 return false ;
756802
757- if (async && discard_thread. is_started () ) {
803+ if (async) {
758804 return 0 == _queue_discard (to_release);
759805 } else {
760806 for (auto p = to_release.begin (); p != to_release.end (); ++p) {
@@ -1447,3 +1493,51 @@ int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
14471493 }
14481494 return r;
14491495}
1496+
1497+ const char ** KernelDevice::get_tracked_conf_keys () const
1498+ {
1499+ static const char * KEYS[] = {
1500+ " bdev_async_discard_threads" ,
1501+ NULL
1502+ };
1503+ return KEYS;
1504+ }
1505+
1506+ void KernelDevice::handle_conf_change (const ConfigProxy& conf,
1507+ const std::set <std::string> &changed)
1508+ {
1509+ if (changed.count (" bdev_async_discard_threads" )) {
1510+ std::unique_lock l (discard_lock);
1511+
1512+ uint64_t oldval = target_discard_threads;
1513+ uint64_t newval = cct->_conf .get_val <uint64_t >(" bdev_async_discard_threads" );
1514+
1515+ target_discard_threads = newval;
1516+
1517+ // Increase? Spawn now, it's quick
1518+ if (newval > oldval) {
1519+ dout (10 ) << __func__ << " starting " << (newval - oldval) << " additional discard threads" << dendl;
1520+ discard_threads.reserve (target_discard_threads);
1521+ for (uint64_t i = oldval; i < newval; i++)
1522+ {
1523+ // All threads created with the same name
1524+ discard_threads.emplace_back (new DiscardThread (this , i));
1525+ discard_threads.back ()->create (" bstore_discard" );
1526+ }
1527+ } else {
1528+ // Decrease? Signal threads after telling them to stop
1529+ dout (10 ) << __func__ << " stopping " << (oldval - newval) << " existing discard threads" << dendl;
1530+
1531+ // Signal the last threads to quit, and stop tracking them
1532+ for (uint64_t i = oldval - 1 ; i >= newval; i--)
1533+ {
1534+ // Also detach the thread so we no longer need to join
1535+ discard_threads[i]->stop = true ;
1536+ discard_threads[i]->detach ();
1537+ discard_threads.erase (discard_threads.begin () + i);
1538+ }
1539+
1540+ discard_cond.notify_all ();
1541+ }
1542+ }
1543+ }
0 commit comments