@@ -181,8 +181,9 @@ struct poll_iocb {
181
181
struct file * file ;
182
182
struct wait_queue_head * head ;
183
183
__poll_t events ;
184
- bool done ;
185
184
bool cancelled ;
185
+ bool work_scheduled ;
186
+ bool work_need_resched ;
186
187
struct wait_queue_entry wait ;
187
188
struct work_struct work ;
188
189
};
@@ -1619,6 +1620,51 @@ static void aio_poll_put_work(struct work_struct *work)
1619
1620
iocb_put (iocb );
1620
1621
}
1621
1622
1623
+ /*
1624
+ * Safely lock the waitqueue which the request is on, synchronizing with the
1625
+ * case where the ->poll() provider decides to free its waitqueue early.
1626
+ *
1627
+ * Returns true on success, meaning that req->head->lock was locked, req->wait
1628
+ * is on req->head, and an RCU read lock was taken. Returns false if the
1629
+ * request was already removed from its waitqueue (which might no longer exist).
1630
+ */
1631
+ static bool poll_iocb_lock_wq (struct poll_iocb * req )
1632
+ {
1633
+ wait_queue_head_t * head ;
1634
+
1635
+ /*
1636
+ * While we hold the waitqueue lock and the waitqueue is nonempty,
1637
+ * wake_up_pollfree() will wait for us. However, taking the waitqueue
1638
+ * lock in the first place can race with the waitqueue being freed.
1639
+ *
1640
+ * We solve this as eventpoll does: by taking advantage of the fact that
1641
+ * all users of wake_up_pollfree() will RCU-delay the actual free. If
1642
+ * we enter rcu_read_lock() and see that the pointer to the queue is
1643
+ * non-NULL, we can then lock it without the memory being freed out from
1644
+ * under us, then check whether the request is still on the queue.
1645
+ *
1646
+ * Keep holding rcu_read_lock() as long as we hold the queue lock, in
1647
+ * case the caller deletes the entry from the queue, leaving it empty.
1648
+ * In that case, only RCU prevents the queue memory from being freed.
1649
+ */
1650
+ rcu_read_lock ();
1651
+ head = smp_load_acquire (& req -> head );
1652
+ if (head ) {
1653
+ spin_lock (& head -> lock );
1654
+ if (!list_empty (& req -> wait .entry ))
1655
+ return true;
1656
+ spin_unlock (& head -> lock );
1657
+ }
1658
+ rcu_read_unlock ();
1659
+ return false;
1660
+ }
1661
+
1662
+ static void poll_iocb_unlock_wq (struct poll_iocb * req )
1663
+ {
1664
+ spin_unlock (& req -> head -> lock );
1665
+ rcu_read_unlock ();
1666
+ }
1667
+
1622
1668
static void aio_poll_complete_work (struct work_struct * work )
1623
1669
{
1624
1670
struct poll_iocb * req = container_of (work , struct poll_iocb , work );
@@ -1638,14 +1684,27 @@ static void aio_poll_complete_work(struct work_struct *work)
1638
1684
* avoid further branches in the fast path.
1639
1685
*/
1640
1686
spin_lock_irq (& ctx -> ctx_lock );
1641
- if (!mask && !READ_ONCE (req -> cancelled )) {
1642
- add_wait_queue (req -> head , & req -> wait );
1643
- spin_unlock_irq (& ctx -> ctx_lock );
1644
- return ;
1645
- }
1687
+ if (poll_iocb_lock_wq (req )) {
1688
+ if (!mask && !READ_ONCE (req -> cancelled )) {
1689
+ /*
1690
+ * The request isn't actually ready to be completed yet.
1691
+ * Reschedule completion if another wakeup came in.
1692
+ */
1693
+ if (req -> work_need_resched ) {
1694
+ schedule_work (& req -> work );
1695
+ req -> work_need_resched = false;
1696
+ } else {
1697
+ req -> work_scheduled = false;
1698
+ }
1699
+ poll_iocb_unlock_wq (req );
1700
+ spin_unlock_irq (& ctx -> ctx_lock );
1701
+ return ;
1702
+ }
1703
+ list_del_init (& req -> wait .entry );
1704
+ poll_iocb_unlock_wq (req );
1705
+ } /* else, POLLFREE has freed the waitqueue, so we must complete */
1646
1706
list_del_init (& iocb -> ki_list );
1647
1707
iocb -> ki_res .res = mangle_poll (mask );
1648
- req -> done = true;
1649
1708
spin_unlock_irq (& ctx -> ctx_lock );
1650
1709
1651
1710
iocb_put (iocb );
@@ -1657,13 +1716,14 @@ static int aio_poll_cancel(struct kiocb *iocb)
1657
1716
struct aio_kiocb * aiocb = container_of (iocb , struct aio_kiocb , rw );
1658
1717
struct poll_iocb * req = & aiocb -> poll ;
1659
1718
1660
- spin_lock (& req -> head -> lock );
1661
- WRITE_ONCE (req -> cancelled , true);
1662
- if (!list_empty (& req -> wait .entry )) {
1663
- list_del_init (& req -> wait .entry );
1664
- schedule_work (& aiocb -> poll .work );
1665
- }
1666
- spin_unlock (& req -> head -> lock );
1719
+ if (poll_iocb_lock_wq (req )) {
1720
+ WRITE_ONCE (req -> cancelled , true);
1721
+ if (!req -> work_scheduled ) {
1722
+ schedule_work (& aiocb -> poll .work );
1723
+ req -> work_scheduled = true;
1724
+ }
1725
+ poll_iocb_unlock_wq (req );
1726
+ } /* else, the request was force-cancelled by POLLFREE already */
1667
1727
1668
1728
return 0 ;
1669
1729
}
@@ -1680,21 +1740,27 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1680
1740
if (mask && !(mask & req -> events ))
1681
1741
return 0 ;
1682
1742
1683
- list_del_init (& req -> wait .entry );
1684
-
1685
- if (mask && spin_trylock_irqsave (& iocb -> ki_ctx -> ctx_lock , flags )) {
1743
+ /*
1744
+ * Complete the request inline if possible. This requires that three
1745
+ * conditions be met:
1746
+ * 1. An event mask must have been passed. If a plain wakeup was done
1747
+ * instead, then mask == 0 and we have to call vfs_poll() to get
1748
+ * the events, so inline completion isn't possible.
1749
+ * 2. The completion work must not have already been scheduled.
1750
+ * 3. ctx_lock must not be busy. We have to use trylock because we
1751
+ * already hold the waitqueue lock, so this inverts the normal
1752
+ * locking order. Use irqsave/irqrestore because not all
1753
+ * filesystems (e.g. fuse) call this function with IRQs disabled,
1754
+ * yet IRQs have to be disabled before ctx_lock is obtained.
1755
+ */
1756
+ if (mask && !req -> work_scheduled &&
1757
+ spin_trylock_irqsave (& iocb -> ki_ctx -> ctx_lock , flags )) {
1686
1758
struct kioctx * ctx = iocb -> ki_ctx ;
1687
1759
1688
- /*
1689
- * Try to complete the iocb inline if we can. Use
1690
- * irqsave/irqrestore because not all filesystems (e.g. fuse)
1691
- * call this function with IRQs disabled and because IRQs
1692
- * have to be disabled before ctx_lock is obtained.
1693
- */
1760
+ list_del_init (& req -> wait .entry );
1694
1761
list_del (& iocb -> ki_list );
1695
1762
iocb -> ki_res .res = mangle_poll (mask );
1696
- req -> done = true;
1697
- if (iocb -> ki_eventfd && eventfd_signal_allowed ()) {
1763
+ if (iocb -> ki_eventfd && !eventfd_signal_allowed ()) {
1698
1764
iocb = NULL ;
1699
1765
INIT_WORK (& req -> work , aio_poll_put_work );
1700
1766
schedule_work (& req -> work );
@@ -1703,14 +1769,51 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1703
1769
if (iocb )
1704
1770
iocb_put (iocb );
1705
1771
} else {
1706
- schedule_work (& req -> work );
1772
+ /*
1773
+ * Schedule the completion work if needed. If it was already
1774
+ * scheduled, record that another wakeup came in.
1775
+ *
1776
+ * Don't remove the request from the waitqueue here, as it might
1777
+ * not actually be complete yet (we won't know until vfs_poll()
1778
+ * is called), and we must not miss any wakeups. POLLFREE is an
1779
+ * exception to this; see below.
1780
+ */
1781
+ if (req -> work_scheduled ) {
1782
+ req -> work_need_resched = true;
1783
+ } else {
1784
+ schedule_work (& req -> work );
1785
+ req -> work_scheduled = true;
1786
+ }
1787
+
1788
+ /*
1789
+ * If the waitqueue is being freed early but we can't complete
1790
+ * the request inline, we have to tear down the request as best
1791
+ * we can. That means immediately removing the request from its
1792
+ * waitqueue and preventing all further accesses to the
1793
+ * waitqueue via the request. We also need to schedule the
1794
+ * completion work (done above). Also mark the request as
1795
+ * cancelled, to potentially skip an unneeded call to ->poll().
1796
+ */
1797
+ if (mask & POLLFREE ) {
1798
+ WRITE_ONCE (req -> cancelled , true);
1799
+ list_del_init (& req -> wait .entry );
1800
+
1801
+ /*
1802
+ * Careful: this *must* be the last step, since as soon
1803
+ * as req->head is NULL'ed out, the request can be
1804
+ * completed and freed, since aio_poll_complete_work()
1805
+ * will no longer need to take the waitqueue lock.
1806
+ */
1807
+ smp_store_release (& req -> head , NULL );
1808
+ }
1707
1809
}
1708
1810
return 1 ;
1709
1811
}
1710
1812
1711
1813
struct aio_poll_table {
1712
1814
struct poll_table_struct pt ;
1713
1815
struct aio_kiocb * iocb ;
1816
+ bool queued ;
1714
1817
int error ;
1715
1818
};
1716
1819
@@ -1721,11 +1824,12 @@ aio_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1721
1824
struct aio_poll_table * pt = container_of (p , struct aio_poll_table , pt );
1722
1825
1723
1826
/* multiple wait queues per file are not supported */
1724
- if (unlikely (pt -> iocb -> poll . head )) {
1827
+ if (unlikely (pt -> queued )) {
1725
1828
pt -> error = - EINVAL ;
1726
1829
return ;
1727
1830
}
1728
1831
1832
+ pt -> queued = true;
1729
1833
pt -> error = 0 ;
1730
1834
pt -> iocb -> poll .head = head ;
1731
1835
add_wait_queue (head , & pt -> iocb -> poll .wait );
@@ -1750,12 +1854,14 @@ static int aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
1750
1854
req -> events = demangle_poll (iocb -> aio_buf ) | EPOLLERR | EPOLLHUP ;
1751
1855
1752
1856
req -> head = NULL ;
1753
- req -> done = false;
1754
1857
req -> cancelled = false;
1858
+ req -> work_scheduled = false;
1859
+ req -> work_need_resched = false;
1755
1860
1756
1861
apt .pt ._qproc = aio_poll_queue_proc ;
1757
1862
apt .pt ._key = req -> events ;
1758
1863
apt .iocb = aiocb ;
1864
+ apt .queued = false;
1759
1865
apt .error = - EINVAL ; /* same as no support for IOCB_CMD_POLL */
1760
1866
1761
1867
/* initialized the list so that we can do list_empty checks */
@@ -1764,23 +1870,35 @@ static int aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
1764
1870
1765
1871
mask = vfs_poll (req -> file , & apt .pt ) & req -> events ;
1766
1872
spin_lock_irq (& ctx -> ctx_lock );
1767
- if (likely (req -> head )) {
1768
- spin_lock (& req -> head -> lock );
1769
- if (unlikely (list_empty (& req -> wait .entry ))) {
1770
- if (apt .error )
1873
+ if (likely (apt .queued )) {
1874
+ bool on_queue = poll_iocb_lock_wq (req );
1875
+
1876
+ if (!on_queue || req -> work_scheduled ) {
1877
+ /*
1878
+ * aio_poll_wake() already either scheduled the async
1879
+ * completion work, or completed the request inline.
1880
+ */
1881
+ if (apt .error ) /* unsupported case: multiple queues */
1771
1882
cancel = true;
1772
1883
apt .error = 0 ;
1773
1884
mask = 0 ;
1774
1885
}
1775
1886
if (mask || apt .error ) {
1887
+ /* Steal to complete synchronously. */
1776
1888
list_del_init (& req -> wait .entry );
1777
1889
} else if (cancel ) {
1890
+ /* Cancel if possible (may be too late though). */
1778
1891
WRITE_ONCE (req -> cancelled , true);
1779
- } else if (!req -> done ) { /* actually waiting for an event */
1892
+ } else if (on_queue ) {
1893
+ /*
1894
+ * Actually waiting for an event, so add the request to
1895
+ * active_reqs so that it can be cancelled if needed.
1896
+ */
1780
1897
list_add_tail (& aiocb -> ki_list , & ctx -> active_reqs );
1781
1898
aiocb -> ki_cancel = aio_poll_cancel ;
1782
1899
}
1783
- spin_unlock (& req -> head -> lock );
1900
+ if (on_queue )
1901
+ poll_iocb_unlock_wq (req );
1784
1902
}
1785
1903
if (mask ) { /* no async, we'd stolen it */
1786
1904
aiocb -> ki_res .res = mangle_poll (mask );
0 commit comments